refactor(runner): add ContextLogger

This commit is contained in:
Denis Gukov 2024-10-13 12:17:25 +00:00
parent fd5cdbbd9a
commit e69ee19960

View File

@ -20,6 +20,30 @@ import (
log "github.com/sirupsen/logrus"
)
type ContextLogger struct {
Context string
}
func (e *ContextLogger) Error(err error, action string, message string) {
util.LogErrorWithFields(err, log.Fields{
"context": e.Context,
"action": action,
"error": message,
})
}
func (e *ContextLogger) Info(message string) {
log.WithFields(log.Fields{
"context": e.Context,
}).Info(message)
}
func (e *ContextLogger) Panic(err error, action string, message string) {
log.WithFields(log.Fields{
"context": e.Context,
}).Panic(message)
}
type JobPool struct {
// logger channel used to putting log records to database.
logger chan jobLogRecord
@ -107,9 +131,10 @@ func (p *JobPool) Unregister() (err error) {
}
func (p *JobPool) Run() {
logger := ContextLogger{Context: "running"}
if util.Config.Runner.Token == "" {
panic("runner token required. Please register runner first or create it from web interface.")
logger.Panic(fmt.Errorf("no token provided"), "read input", "can not retrieve runner token")
}
queueTicker := time.NewTicker(5 * time.Second)
@ -162,12 +187,12 @@ func (p *JobPool) Run() {
runningJob.SetStatus(task_logger.TaskSuccessStatus)
}
log.Info("Task " + strconv.Itoa(runningJob.job.Task.ID) + " finished (" + string(runningJob.status) + ")")
logger.Info("Task " + strconv.Itoa(runningJob.job.Task.ID) + " finished (" + string(runningJob.status) + ")")
}(p.runningJobs[t.job.Task.ID])
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued")
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " started")
logger.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued")
logger.Info("Task " + strconv.Itoa(t.job.Task.ID) + " started")
case <-requestTimer.C:
@ -194,6 +219,8 @@ func (p *JobPool) Run() {
func (p *JobPool) sendProgress() {
logger := ContextLogger{Context: "sending_progress"}
client := &http.Client{}
url := util.Config.WebHost + "/api/internal/runners"
@ -213,7 +240,7 @@ func (p *JobPool) sendProgress() {
j.logRecords = make([]LogRecord, 0)
if j.status.IsFinished() {
log.Info("Task " + strconv.Itoa(id) + " removed from running list")
logger.Info("Task " + strconv.Itoa(id) + " removed from running list")
delete(p.runningJobs, id)
}
}
@ -221,20 +248,13 @@ func (p *JobPool) sendProgress() {
jsonBytes, err := json.Marshal(body)
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "sending progress",
"action": "form request body",
"error": "can not marshal json",
})
logger.Error(err, "form request body", "can not marshal json")
return
}
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(jsonBytes))
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "sending progress",
"action": "create request",
"error": "can not create request to the server",
})
logger.Error(err, "create request", "can not create request to the server")
return
}
@ -242,12 +262,8 @@ func (p *JobPool) sendProgress() {
resp, err := client.Do(req)
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "sending progress",
"action": "send request",
"error": "server returned an error",
"status": resp.StatusCode,
})
logger.Error(err, "send request", "the server returned error "+strconv.Itoa(resp.StatusCode))
return
}
@ -256,16 +272,12 @@ func (p *JobPool) sendProgress() {
func (p *JobPool) tryRegisterRunner() bool {
logger := ContextLogger{Context: "registration"}
log.Info("Registering a new runner")
if util.Config.Runner.RegistrationToken == "" {
util.LogErrorWithFields(fmt.Errorf("registration token cannot be empty"), log.Fields{
"context": "registration",
"action": "form request",
"error": "can not retrieve registration token",
})
logger.Error(fmt.Errorf("registration token cannot be empty"), "read input", "can not retrieve registration token")
}
client := &http.Client{}
@ -279,45 +291,26 @@ func (p *JobPool) tryRegisterRunner() bool {
})
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "form request",
"error": "can not marshal json",
})
logger.Error(err, "form request", "can not marshal json")
return false
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "create request",
"error": "can not create request to the server",
})
logger.Error(err, "create request", "can not create request to the server")
return false
}
resp, err := client.Do(req)
if err != nil || resp.StatusCode != 200 {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "execute request",
"error": "the server returned an error",
"status": resp.StatusCode,
})
logger.Error(err, "send request", "the server returned error "+strconv.Itoa(resp.StatusCode))
return false
}
body, err := io.ReadAll(resp.Body)
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "read response body",
"error": "can not read server's response body",
})
logger.Error(err, "read response body", "can not read server's response body")
return false
}
@ -327,23 +320,14 @@ func (p *JobPool) tryRegisterRunner() bool {
err = json.Unmarshal(body, &res)
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "parsing result json",
"error": "server's response has invalid format",
})
logger.Error(err, "parsing result json", "server's response has invalid format")
return false
}
err = os.WriteFile(util.Config.Runner.TokenFile, []byte(res.Token), 0644)
if err != nil {
util.LogErrorWithFields(err, log.Fields{
"context": "registration",
"action": "storing token",
"error": "can not store token to the file",
})
logger.Error(err, "store token", "can not store token to the file")
return false
}
@ -355,8 +339,10 @@ func (p *JobPool) tryRegisterRunner() bool {
// checkNewJobs tries to find runner to queued jobs
func (p *JobPool) checkNewJobs() {
logger := ContextLogger{Context: "checking new jobs"}
if util.Config.Runner.Token == "" {
fmt.Println("Error creating request:", "no token provided")
logger.Error(fmt.Errorf("no token provided"), "read input", "can not retrieve runner token")
return
}
@ -366,22 +352,23 @@ func (p *JobPool) checkNewJobs() {
req, err := http.NewRequest("GET", url, nil)
req.Header.Set("X-Runner-Token", util.Config.Runner.Token)
if err != nil {
fmt.Println("Error creating request:", err)
logger.Error(err, "create request", "can not create request to the server")
return
}
req.Header.Set("X-Runner-Token", util.Config.Runner.Token)
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
logger.Error(err, "send request", "the server returned an error"+strconv.Itoa(resp.StatusCode))
return
}
if resp.StatusCode >= 400 {
log.Error("Encountered error while checking for new jobs; server returned code ", resp.StatusCode)
logger.Error(fmt.Errorf("error status code"), "send request", "the server returned an error"+strconv.Itoa(resp.StatusCode))
return
}
@ -389,14 +376,14 @@ func (p *JobPool) checkNewJobs() {
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("Encountered error while checking for new jobs; unable to read response body:", err)
logger.Error(err, "read response body", "can not read server's response body")
return
}
var response RunnerState
err = json.Unmarshal(body, &response)
if err != nil {
log.Error("Checking new jobs, parsing JSON error:", err)
logger.Error(err, "parsing result json", "server's response has invalid format")
return
}
@ -494,6 +481,6 @@ func (p *JobPool) checkNewJobs() {
}
p.queue = append(p.queue, &taskRunner)
log.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " enqueued")
logger.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " enqueued")
}
}