diff --git a/services/runners/JobPool.go b/services/runners/JobPool.go index 4f054477..df44353f 100644 --- a/services/runners/JobPool.go +++ b/services/runners/JobPool.go @@ -100,16 +100,13 @@ type JobPool struct { // register channel used to put tasks to queue. register chan *job - resourceLocker chan *resourceLock - runningJobs map[int]*runningJob queue []*job config *RunnerConfig - checking int32 - sending int32 + processing int32 } type RunnerRegistration struct { @@ -180,6 +177,7 @@ func (p *JobPool) Run() { defer func() { queueTicker.Stop() + requestTimer.Stop() }() for { @@ -196,8 +194,6 @@ func (p *JobPool) Run() { for { select { - //case j := <-p.register: // new task created by API or schedule - // p.queue = append(p.queue, j) case <-queueTicker.C: // timer 5 seconds: get task from queue and run it if len(p.queue) == 0 { @@ -212,9 +208,6 @@ func (p *JobPool) Run() { break } - //log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id)) - //p.resourceLocker <- &resourceLock{lock: true, holder: t} - p.runningJobs[t.job.Task.ID] = &runningJob{ job: t.job, } @@ -246,13 +239,22 @@ func (p *JobPool) Run() { case <-requestTimer.C: - go p.sendProgress() + go func() { - if util.Config.Runner.OneOff && len(p.runningJobs) > 0 && !p.hasRunningJobs() { - os.Exit(0) - } + if !atomic.CompareAndSwapInt32(&p.processing, 0, 1) { + return + } - go p.checkNewJobs() + defer atomic.StoreInt32(&p.processing, 0) + + p.sendProgress() + + if util.Config.Runner.OneOff && len(p.runningJobs) > 0 && !p.hasRunningJobs() { + os.Exit(0) + } + + p.checkNewJobs() + }() } } @@ -260,10 +262,6 @@ func (p *JobPool) Run() { func (p *JobPool) sendProgress() { - if !atomic.CompareAndSwapInt32(&p.sending, 0, 1) { - return - } - client := &http.Client{} url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID) @@ -273,6 +271,7 @@ func (p *JobPool) sendProgress() { } for id, j := range p.runningJobs { + body.Jobs = append(body.Jobs, JobProgress{ ID: id, LogRecords: j.logRecords, @@ -280,6 +279,10 @@ func (p *JobPool) sendProgress() { }) j.logRecords = make([]LogRecord, 0) + + if j.status.IsFinished() { + delete(p.runningJobs, id) + } } jsonBytes, err := json.Marshal(body) @@ -297,7 +300,6 @@ func (p *JobPool) sendProgress() { } defer resp.Body.Close() - defer atomic.StoreInt32(&p.sending, 0) } func (p *JobPool) tryRegisterRunner() bool { @@ -390,12 +392,6 @@ func (p *JobPool) tryRegisterRunner() bool { // checkNewJobs tries to find runner to queued jobs func (p *JobPool) checkNewJobs() { - if !atomic.CompareAndSwapInt32(&p.checking, 0, 1) { - return - } - - defer atomic.StoreInt32(&p.checking, 0) - client := &http.Client{} url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID) @@ -488,5 +484,6 @@ func (p *JobPool) checkNewJobs() { } p.queue = append(p.queue, &taskRunner) + log.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " added from queue") } }