Semaphore/api/tasks/pool.go

143 lines
2.7 KiB
Go
Raw Normal View History

2016-04-04 15:44:34 +02:00
package tasks
import (
"strconv"
"time"
log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/util"
2016-04-04 15:44:34 +02:00
)
type taskPool struct {
2021-08-25 17:37:19 +02:00
queue []*task
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
runningTasks map[int]*task
}
type resourceLock struct {
2017-08-19 10:45:01 +02:00
lock bool
holder *task
2016-04-04 15:44:34 +02:00
}
2021-08-25 17:37:19 +02:00
var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
runningTasks: make(map[int]*task),
}
var resourceLocker = make(chan *resourceLock)
//nolint: gocyclo
2016-04-04 15:44:34 +02:00
func (p *taskPool) run() {
ticker := time.NewTicker(5 * time.Second)
defer func() {
2017-08-19 10:45:01 +02:00
close(resourceLocker)
ticker.Stop()
}()
// Lock or unlock resources when running a task
2017-08-19 10:45:01 +02:00
go func(locker <-chan *resourceLock) {
for l := range locker {
t := l.holder
2017-08-19 10:45:01 +02:00
if l.lock {
if p.blocks(t) {
panic("Trying to lock an already locked resource!")
}
2017-08-19 10:45:01 +02:00
p.activeProj[t.projectID] = t
2017-08-19 10:45:01 +02:00
for _, node := range t.hosts {
p.activeNodes[node] = t
}
2017-08-19 10:45:01 +02:00
p.running++
2021-08-25 17:37:19 +02:00
p.runningTasks[t.task.ID] = t
2017-08-19 10:45:01 +02:00
continue
}
if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}
2017-08-19 10:45:01 +02:00
for _, node := range t.hosts {
delete(p.activeNodes, node)
}
p.running--
2021-08-25 17:37:19 +02:00
delete(p.runningTasks, t.task.ID)
}
}(resourceLocker)
2016-04-04 15:44:34 +02:00
for {
select {
case task := <-p.register:
p.queue = append(p.queue, task)
log.Debug(task)
msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue"
task.log(msg)
log.Info(msg)
case <-ticker.C:
if len(p.queue) == 0 {
continue
}
//get task from top of queue
t := p.queue[0]
if t.task.Status == taskFailStatus {
//delete failed task from queue
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue")
continue
}
if p.blocks(t) {
//move blocked task to end of queue
p.queue = append(p.queue[1:], t)
continue
}
2021-03-12 21:20:18 +01:00
log.Info("Set resource locker with task " + strconv.Itoa(t.task.ID))
resourceLocker <- &resourceLock{lock: true, holder: t}
if !t.prepared {
go t.prepareRun()
continue
}
go t.run()
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue")
2016-04-04 15:44:34 +02:00
}
}
}
func (p *taskPool) blocks(t *task) bool {
if p.running >= util.Config.MaxParallelTasks {
return true
}
2017-08-19 10:45:01 +02:00
switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.projectID] != nil
case "node":
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
2017-08-19 10:45:01 +02:00
return true
}
}
2017-08-19 10:45:01 +02:00
return false
default:
return p.running > 0
}
}
// StartRunner begins the task pool, used as a goroutine
2016-04-04 15:44:34 +02:00
func StartRunner() {
pool.run()
}