diff --git a/services/runners/job_pool.go b/services/runners/job_pool.go index 3de5a442..5347bf78 100644 --- a/services/runners/job_pool.go +++ b/services/runners/job_pool.go @@ -20,6 +20,30 @@ import ( log "github.com/sirupsen/logrus" ) +type ContextLogger struct { + Context string +} + +func (e *ContextLogger) Error(err error, action string, message string) { + util.LogErrorWithFields(err, log.Fields{ + "context": e.Context, + "action": action, + "error": message, + }) +} + +func (e *ContextLogger) Info(message string) { + log.WithFields(log.Fields{ + "context": e.Context, + }).Info(message) +} + +func (e *ContextLogger) Panic(err error, action string, message string) { + log.WithFields(log.Fields{ + "context": e.Context, + }).Panic(message) +} + type JobPool struct { // logger channel used to putting log records to database. logger chan jobLogRecord @@ -107,9 +131,10 @@ func (p *JobPool) Unregister() (err error) { } func (p *JobPool) Run() { + logger := ContextLogger{Context: "running"} if util.Config.Runner.Token == "" { - panic("runner token required. Please register runner first or create it from web interface.") + logger.Panic(fmt.Errorf("no token provided"), "read input", "can not retrieve runner token") } queueTicker := time.NewTicker(5 * time.Second) @@ -162,12 +187,12 @@ func (p *JobPool) Run() { runningJob.SetStatus(task_logger.TaskSuccessStatus) } - log.Info("Task " + strconv.Itoa(runningJob.job.Task.ID) + " finished (" + string(runningJob.status) + ")") + logger.Info("Task " + strconv.Itoa(runningJob.job.Task.ID) + " finished (" + string(runningJob.status) + ")") }(p.runningJobs[t.job.Task.ID]) p.queue = p.queue[1:] - log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued") - log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " started") + logger.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued") + logger.Info("Task " + strconv.Itoa(t.job.Task.ID) + " started") case <-requestTimer.C: @@ -194,6 +219,8 @@ func (p *JobPool) Run() { func (p *JobPool) sendProgress() { + logger := ContextLogger{Context: "sending_progress"} + client := &http.Client{} url := util.Config.WebHost + "/api/internal/runners" @@ -213,7 +240,7 @@ func (p *JobPool) sendProgress() { j.logRecords = make([]LogRecord, 0) if j.status.IsFinished() { - log.Info("Task " + strconv.Itoa(id) + " removed from running list") + logger.Info("Task " + strconv.Itoa(id) + " removed from running list") delete(p.runningJobs, id) } } @@ -221,20 +248,13 @@ func (p *JobPool) sendProgress() { jsonBytes, err := json.Marshal(body) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "sending progress", - "action": "form request body", - "error": "can not marshal json", - }) + logger.Error(err, "form request body", "can not marshal json") + return } req, err := http.NewRequest("PUT", url, bytes.NewBuffer(jsonBytes)) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "sending progress", - "action": "create request", - "error": "can not create request to the server", - }) + logger.Error(err, "create request", "can not create request to the server") return } @@ -242,12 +262,8 @@ func (p *JobPool) sendProgress() { resp, err := client.Do(req) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "sending progress", - "action": "send request", - "error": "server returned an error", - "status": resp.StatusCode, - }) + + logger.Error(err, "send request", "the server returned error "+strconv.Itoa(resp.StatusCode)) return } @@ -256,16 +272,12 @@ func (p *JobPool) sendProgress() { func (p *JobPool) tryRegisterRunner() bool { + logger := ContextLogger{Context: "registration"} + log.Info("Registering a new runner") if util.Config.Runner.RegistrationToken == "" { - - util.LogErrorWithFields(fmt.Errorf("registration token cannot be empty"), log.Fields{ - "context": "registration", - "action": "form request", - "error": "can not retrieve registration token", - }) - + logger.Error(fmt.Errorf("registration token cannot be empty"), "read input", "can not retrieve registration token") } client := &http.Client{} @@ -279,45 +291,26 @@ func (p *JobPool) tryRegisterRunner() bool { }) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "form request", - "error": "can not marshal json", - }) + logger.Error(err, "form request", "can not marshal json") return false } req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "create request", - "error": "can not create request to the server", - }) + logger.Error(err, "create request", "can not create request to the server") return false } resp, err := client.Do(req) if err != nil || resp.StatusCode != 200 { - - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "execute request", - "error": "the server returned an error", - "status": resp.StatusCode, - }) + logger.Error(err, "send request", "the server returned error "+strconv.Itoa(resp.StatusCode)) return false } body, err := io.ReadAll(resp.Body) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "read response body", - "error": "can not read server's response body", - }) - + logger.Error(err, "read response body", "can not read server's response body") return false } @@ -327,23 +320,14 @@ func (p *JobPool) tryRegisterRunner() bool { err = json.Unmarshal(body, &res) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "parsing result json", - "error": "server's response has invalid format", - }) - + logger.Error(err, "parsing result json", "server's response has invalid format") return false } err = os.WriteFile(util.Config.Runner.TokenFile, []byte(res.Token), 0644) if err != nil { - util.LogErrorWithFields(err, log.Fields{ - "context": "registration", - "action": "storing token", - "error": "can not store token to the file", - }) + logger.Error(err, "store token", "can not store token to the file") return false } @@ -355,8 +339,10 @@ func (p *JobPool) tryRegisterRunner() bool { // checkNewJobs tries to find runner to queued jobs func (p *JobPool) checkNewJobs() { + logger := ContextLogger{Context: "checking new jobs"} + if util.Config.Runner.Token == "" { - fmt.Println("Error creating request:", "no token provided") + logger.Error(fmt.Errorf("no token provided"), "read input", "can not retrieve runner token") return } @@ -366,22 +352,23 @@ func (p *JobPool) checkNewJobs() { req, err := http.NewRequest("GET", url, nil) - req.Header.Set("X-Runner-Token", util.Config.Runner.Token) - if err != nil { - fmt.Println("Error creating request:", err) + logger.Error(err, "create request", "can not create request to the server") return } + req.Header.Set("X-Runner-Token", util.Config.Runner.Token) + resp, err := client.Do(req) if err != nil { - fmt.Println("Error making request:", err) + logger.Error(err, "send request", "the server returned an error"+strconv.Itoa(resp.StatusCode)) return } if resp.StatusCode >= 400 { - log.Error("Encountered error while checking for new jobs; server returned code ", resp.StatusCode) + + logger.Error(fmt.Errorf("error status code"), "send request", "the server returned an error"+strconv.Itoa(resp.StatusCode)) return } @@ -389,14 +376,14 @@ func (p *JobPool) checkNewJobs() { body, err := io.ReadAll(resp.Body) if err != nil { - log.Error("Encountered error while checking for new jobs; unable to read response body:", err) + logger.Error(err, "read response body", "can not read server's response body") return } var response RunnerState err = json.Unmarshal(body, &response) if err != nil { - log.Error("Checking new jobs, parsing JSON error:", err) + logger.Error(err, "parsing result json", "server's response has invalid format") return } @@ -494,6 +481,6 @@ func (p *JobPool) checkNewJobs() { } p.queue = append(p.queue, &taskRunner) - log.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " enqueued") + logger.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " enqueued") } }