feat(runners): use 1 atomic for check and send

This commit is contained in:
Denis Gukov 2023-09-20 03:01:28 +02:00
parent 7f6173bf14
commit bc3210994a

View File

@ -100,16 +100,13 @@ type JobPool struct {
// register channel used to put tasks to queue.
register chan *job
resourceLocker chan *resourceLock
runningJobs map[int]*runningJob
queue []*job
config *RunnerConfig
checking int32
sending int32
processing int32
}
type RunnerRegistration struct {
@ -180,6 +177,7 @@ func (p *JobPool) Run() {
defer func() {
queueTicker.Stop()
requestTimer.Stop()
}()
for {
@ -196,8 +194,6 @@ func (p *JobPool) Run() {
for {
select {
//case j := <-p.register: // new task created by API or schedule
// p.queue = append(p.queue, j)
case <-queueTicker.C: // timer 5 seconds: get task from queue and run it
if len(p.queue) == 0 {
@ -212,9 +208,6 @@ func (p *JobPool) Run() {
break
}
//log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id))
//p.resourceLocker <- &resourceLock{lock: true, holder: t}
p.runningJobs[t.job.Task.ID] = &runningJob{
job: t.job,
}
@ -246,13 +239,22 @@ func (p *JobPool) Run() {
case <-requestTimer.C:
go p.sendProgress()
go func() {
if util.Config.Runner.OneOff && len(p.runningJobs) > 0 && !p.hasRunningJobs() {
os.Exit(0)
}
if !atomic.CompareAndSwapInt32(&p.processing, 0, 1) {
return
}
go p.checkNewJobs()
defer atomic.StoreInt32(&p.processing, 0)
p.sendProgress()
if util.Config.Runner.OneOff && len(p.runningJobs) > 0 && !p.hasRunningJobs() {
os.Exit(0)
}
p.checkNewJobs()
}()
}
}
@ -260,10 +262,6 @@ func (p *JobPool) Run() {
func (p *JobPool) sendProgress() {
if !atomic.CompareAndSwapInt32(&p.sending, 0, 1) {
return
}
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
@ -273,6 +271,7 @@ func (p *JobPool) sendProgress() {
}
for id, j := range p.runningJobs {
body.Jobs = append(body.Jobs, JobProgress{
ID: id,
LogRecords: j.logRecords,
@ -280,6 +279,10 @@ func (p *JobPool) sendProgress() {
})
j.logRecords = make([]LogRecord, 0)
if j.status.IsFinished() {
delete(p.runningJobs, id)
}
}
jsonBytes, err := json.Marshal(body)
@ -297,7 +300,6 @@ func (p *JobPool) sendProgress() {
}
defer resp.Body.Close()
defer atomic.StoreInt32(&p.sending, 0)
}
func (p *JobPool) tryRegisterRunner() bool {
@ -390,12 +392,6 @@ func (p *JobPool) tryRegisterRunner() bool {
// checkNewJobs tries to find runner to queued jobs
func (p *JobPool) checkNewJobs() {
if !atomic.CompareAndSwapInt32(&p.checking, 0, 1) {
return
}
defer atomic.StoreInt32(&p.checking, 0)
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
@ -488,5 +484,6 @@ func (p *JobPool) checkNewJobs() {
}
p.queue = append(p.queue, &taskRunner)
log.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " added from queue")
}
}