diff --git a/api/tasks/pool.go b/api/tasks/pool.go index 19c38a8a..60f6ea6e 100644 --- a/api/tasks/pool.go +++ b/api/tasks/pool.go @@ -3,49 +3,118 @@ package tasks import ( "fmt" "time" + + "github.com/ansible-semaphore/semaphore/util" ) type taskPool struct { - queue []*task - register chan *task - running *task + queue []*task + register chan *task + activeProj map[int]*task + activeNodes map[string]*task + running int } var pool = taskPool{ - queue: make([]*task, 0), - register: make(chan *task), - running: nil, + queue: make([]*task, 0), + register: make(chan *task), + activeProj: make(map[int]*task), + activeNodes: make(map[string]*task), + running: 0, } +type resourceLock struct { + lock bool + holder *task +} + +var resourceLocker = make(chan *resourceLock) + func (p *taskPool) run() { - ticker := time.NewTicker(10 * time.Second) + ticker := time.NewTicker(5 * time.Second) defer func() { + close(resourceLocker) ticker.Stop() }() + // Lock or unlock resources when running a task + go func(locker <-chan *resourceLock) { + for l := range locker { + t := l.holder + + if l.lock { + if p.blocks(t) { + panic("Trying to lock an already locked resource!") + } + + p.activeProj[t.projectID] = t + + for _, node := range t.hosts { + p.activeNodes[node] = t + } + + p.running++ + continue + } + + if p.activeProj[t.projectID] == t { + delete(p.activeProj, t.projectID) + } + + for _, node := range t.hosts { + delete(p.activeNodes, node) + } + + p.running-- + } + }(resourceLocker) + for { select { case task := <-p.register: fmt.Println(task) - if p.running == nil { - go task.run() - continue - } - + go task.prepareRun() p.queue = append(p.queue, task) case <-ticker.C: - if len(p.queue) == 0 || p.running != nil { + if len(p.queue) == 0 { + continue + } else if t := p.queue[0]; t.task.Status != "error" && (!t.prepared || p.blocks(t)) { + p.queue = append(p.queue[1:], t) continue } - fmt.Println("Running a task.") - go pool.queue[0].run() + if t := pool.queue[0]; t.task.Status != "error" { + fmt.Println("Running a task.") + resourceLocker <- &resourceLock{lock: true, holder: t} + go t.run() + } pool.queue = pool.queue[1:] } } } +func (p *taskPool) blocks(t *task) bool { + if p.running >= util.Config.MaxParallelTasks { + return true + } + + switch util.Config.ConcurrencyMode { + case "project": + return p.activeProj[t.projectID] != nil + case "node": + for _, node := range t.hosts { + if p.activeNodes[node] != nil { + return true + } + } + + return false + default: + return p.running > 0 + } +} + func StartRunner() { pool.run() } diff --git a/api/tasks/runner.go b/api/tasks/runner.go index d5a3f462..b6b409d3 100644 --- a/api/tasks/runner.go +++ b/api/tasks/runner.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "os/exec" + "regexp" "strconv" "strings" "time" @@ -26,6 +27,8 @@ type task struct { users []int projectID int alert bool + hosts []string + prepared bool alert_chat string } @@ -36,12 +39,80 @@ func (t *task) fail() { t.sendTelegramAlert() } -func (t *task) run() { - pool.running = t +func (t *task) prepareRun() { + t.prepared = false + defer func() { + fmt.Println("Stopped preparing task") + + objType := "task" + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status) + if err := (db.Event{ + ProjectID: &t.projectID, + ObjectType: &objType, + ObjectID: &t.task.ID, + Description: &desc, + }.Insert()); err != nil { + t.log("Fatal error inserting an event") + panic(err) + } + }() + + t.log("Preparing: " + strconv.Itoa(t.task.ID)) + + if err := t.populateDetails(); err != nil { + t.log("Error: " + err.Error()) + t.fail() + return + } + + objType := "task" + desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is preparing" + if err := (db.Event{ + ProjectID: &t.projectID, + ObjectType: &objType, + ObjectID: &t.task.ID, + Description: &desc, + }.Insert()); err != nil { + t.log("Fatal error inserting an event") + panic(err) + } + + t.log("Prepare task with template: " + t.template.Alias + "\n") + + if err := t.installKey(t.repository.SshKey); err != nil { + t.log("Failed installing ssh key for repository access: " + err.Error()) + t.fail() + return + } + + if err := t.updateRepository(); err != nil { + t.log("Failed updating repository: " + err.Error()) + t.fail() + return + } + + if err := t.installInventory(); err != nil { + t.log("Failed to install inventory: " + err.Error()) + t.fail() + return + } + + // todo: write environment + + if err := t.listPlaybookHosts(); err != nil { + t.log("Listing playbook hosts failed: " + err.Error()) + t.fail() + return + } + + t.prepared = true +} + +func (t *task) run() { defer func() { fmt.Println("Stopped running tasks") - pool.running = nil + resourceLocker <- &resourceLock{lock: false, holder: t} now := time.Now() t.task.End = &now @@ -60,14 +131,7 @@ func (t *task) run() { } }() - if err := t.populateDetails(); err != nil { - t.log("Error: " + err.Error()) - t.fail() - return - } - { - fmt.Println(t.users) now := time.Now() t.task.Status = "running" t.task.Start = &now @@ -90,26 +154,6 @@ func (t *task) run() { t.log("Started: " + strconv.Itoa(t.task.ID)) t.log("Run task with template: " + t.template.Alias + "\n") - if err := t.installKey(t.repository.SshKey); err != nil { - t.log("Failed installing ssh key for repository access: " + err.Error()) - t.fail() - return - } - - if err := t.updateRepository(); err != nil { - t.log("Failed updating repository: " + err.Error()) - t.fail() - return - } - - if err := t.installInventory(); err != nil { - t.log("Failed to install inventory: " + err.Error()) - t.fail() - return - } - - // todo: write environment - if err := t.runGalaxy(); err != nil { t.log("Running galaxy failed: " + err.Error()) t.fail() @@ -296,7 +340,42 @@ func (t *task) runGalaxy() error { return cmd.Run() } +func (t *task) listPlaybookHosts() error { + args, err := t.getPlaybookArgs() + if err != nil { + return err + } + args = append(args, "--list-hosts") + + cmd := exec.Command("ansible-playbook", args...) + cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) + cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) + + out, err := cmd.Output() + re := regexp.MustCompile("(?m)^\\s{6}(.*)$") + matches := re.FindAllSubmatch(out, 20) + hosts := make([]string, len(matches)) + for i, _ := range matches { + hosts[i] = string(matches[i][1]) + } + t.hosts = hosts + return err +} + func (t *task) runPlaybook() error { + args, err := t.getPlaybookArgs() + if err != nil { + return err + } + cmd := exec.Command("ansible-playbook", args...) + cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) + cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) + + t.logCmd(cmd) + return cmd.Run() +} + +func (t *task) getPlaybookArgs() ([]string, error) { playbookName := t.task.Playbook if len(playbookName) == 0 { playbookName = t.template.Playbook @@ -323,7 +402,7 @@ func (t *task) runPlaybook() error { err := json.Unmarshal([]byte(t.environment.JSON), &js) if err != nil { t.log("JSON is not valid") - return err + return nil, err } args = append(args, "--extra-vars", t.environment.JSON) @@ -334,7 +413,7 @@ func (t *task) runPlaybook() error { err := json.Unmarshal([]byte(*t.template.Arguments), &extraArgs) if err != nil { t.log("Could not unmarshal arguments to []string") - return err + return nil, err } } @@ -344,13 +423,7 @@ func (t *task) runPlaybook() error { args = append(args, extraArgs...) args = append(args, playbookName) } - - cmd := exec.Command("ansible-playbook", args...) - cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID) - cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil) - - t.logCmd(cmd) - return cmd.Run() + return args, nil } func (t *task) envVars(home string, pwd string, gitSSHCommand *string) []string { diff --git a/util/config.go b/util/config.go index 35beb21f..f92fdf25 100644 --- a/util/config.go +++ b/util/config.go @@ -69,6 +69,10 @@ type configType struct { TelegramAlert bool `json:"telegram_alert"` TelegramChat string `json:"telegram_chat"` TelegramToken string `json:"telegram_token"` + + // task concurrency + ConcurrencyMode string `json:"concurrency_mode"` + MaxParallelTasks int `json:"max_parallel_tasks"` } var Config *configType @@ -151,6 +155,10 @@ func init() { Config.TmpPath = "/tmp/semaphore" } + if Config.MaxParallelTasks < 1 { + Config.MaxParallelTasks = 10 + } + var encryption []byte encryption = nil @@ -342,7 +350,6 @@ func (conf *configType) Scan() { if len(conf.LdapMappings.Mail) == 0 { conf.LdapMappings.Mail = "mail" } - } else { conf.LdapEnable = false }