Merge branch 'TeliaSweden-develop' into develop

This commit is contained in:
Matej Kramny 2017-08-19 09:46:51 +01:00
commit b9c2dbe1e5
No known key found for this signature in database
GPG Key ID: DA0642A6671F72FD
3 changed files with 204 additions and 55 deletions

View File

@ -3,49 +3,118 @@ package tasks
import (
"fmt"
"time"
"github.com/ansible-semaphore/semaphore/util"
)
type taskPool struct {
queue []*task
register chan *task
running *task
queue []*task
register chan *task
activeProj map[int]*task
activeNodes map[string]*task
running int
}
var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
running: nil,
queue: make([]*task, 0),
register: make(chan *task),
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
}
type resourceLock struct {
lock bool
holder *task
}
var resourceLocker = make(chan *resourceLock)
func (p *taskPool) run() {
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(5 * time.Second)
defer func() {
close(resourceLocker)
ticker.Stop()
}()
// Lock or unlock resources when running a task
go func(locker <-chan *resourceLock) {
for l := range locker {
t := l.holder
if l.lock {
if p.blocks(t) {
panic("Trying to lock an already locked resource!")
}
p.activeProj[t.projectID] = t
for _, node := range t.hosts {
p.activeNodes[node] = t
}
p.running++
continue
}
if p.activeProj[t.projectID] == t {
delete(p.activeProj, t.projectID)
}
for _, node := range t.hosts {
delete(p.activeNodes, node)
}
p.running--
}
}(resourceLocker)
for {
select {
case task := <-p.register:
fmt.Println(task)
if p.running == nil {
go task.run()
continue
}
go task.prepareRun()
p.queue = append(p.queue, task)
case <-ticker.C:
if len(p.queue) == 0 || p.running != nil {
if len(p.queue) == 0 {
continue
} else if t := p.queue[0]; t.task.Status != "error" && (!t.prepared || p.blocks(t)) {
p.queue = append(p.queue[1:], t)
continue
}
fmt.Println("Running a task.")
go pool.queue[0].run()
if t := pool.queue[0]; t.task.Status != "error" {
fmt.Println("Running a task.")
resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
}
pool.queue = pool.queue[1:]
}
}
}
func (p *taskPool) blocks(t *task) bool {
if p.running >= util.Config.MaxParallelTasks {
return true
}
switch util.Config.ConcurrencyMode {
case "project":
return p.activeProj[t.projectID] != nil
case "node":
for _, node := range t.hosts {
if p.activeNodes[node] != nil {
return true
}
}
return false
default:
return p.running > 0
}
}
func StartRunner() {
pool.run()
}

View File

@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
@ -26,6 +27,8 @@ type task struct {
users []int
projectID int
alert bool
hosts []string
prepared bool
alert_chat string
}
@ -36,12 +39,80 @@ func (t *task) fail() {
t.sendTelegramAlert()
}
func (t *task) run() {
pool.running = t
func (t *task) prepareRun() {
t.prepared = false
defer func() {
fmt.Println("Stopped preparing task")
objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
}()
t.log("Preparing: " + strconv.Itoa(t.task.ID))
if err := t.populateDetails(); err != nil {
t.log("Error: " + err.Error())
t.fail()
return
}
objType := "task"
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " is preparing"
if err := (db.Event{
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
}.Insert()); err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
t.log("Prepare task with template: " + t.template.Alias + "\n")
if err := t.installKey(t.repository.SshKey); err != nil {
t.log("Failed installing ssh key for repository access: " + err.Error())
t.fail()
return
}
if err := t.updateRepository(); err != nil {
t.log("Failed updating repository: " + err.Error())
t.fail()
return
}
if err := t.installInventory(); err != nil {
t.log("Failed to install inventory: " + err.Error())
t.fail()
return
}
// todo: write environment
if err := t.listPlaybookHosts(); err != nil {
t.log("Listing playbook hosts failed: " + err.Error())
t.fail()
return
}
t.prepared = true
}
func (t *task) run() {
defer func() {
fmt.Println("Stopped running tasks")
pool.running = nil
resourceLocker <- &resourceLock{lock: false, holder: t}
now := time.Now()
t.task.End = &now
@ -60,14 +131,7 @@ func (t *task) run() {
}
}()
if err := t.populateDetails(); err != nil {
t.log("Error: " + err.Error())
t.fail()
return
}
{
fmt.Println(t.users)
now := time.Now()
t.task.Status = "running"
t.task.Start = &now
@ -90,26 +154,6 @@ func (t *task) run() {
t.log("Started: " + strconv.Itoa(t.task.ID))
t.log("Run task with template: " + t.template.Alias + "\n")
if err := t.installKey(t.repository.SshKey); err != nil {
t.log("Failed installing ssh key for repository access: " + err.Error())
t.fail()
return
}
if err := t.updateRepository(); err != nil {
t.log("Failed updating repository: " + err.Error())
t.fail()
return
}
if err := t.installInventory(); err != nil {
t.log("Failed to install inventory: " + err.Error())
t.fail()
return
}
// todo: write environment
if err := t.runGalaxy(); err != nil {
t.log("Running galaxy failed: " + err.Error())
t.fail()
@ -296,7 +340,42 @@ func (t *task) runGalaxy() error {
return cmd.Run()
}
func (t *task) listPlaybookHosts() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
args = append(args, "--list-hosts")
cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)
out, err := cmd.Output()
re := regexp.MustCompile("(?m)^\\s{6}(.*)$")
matches := re.FindAllSubmatch(out, 20)
hosts := make([]string, len(matches))
for i, _ := range matches {
hosts[i] = string(matches[i][1])
}
t.hosts = hosts
return err
}
func (t *task) runPlaybook() error {
args, err := t.getPlaybookArgs()
if err != nil {
return err
}
cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)
t.logCmd(cmd)
return cmd.Run()
}
func (t *task) getPlaybookArgs() ([]string, error) {
playbookName := t.task.Playbook
if len(playbookName) == 0 {
playbookName = t.template.Playbook
@ -323,7 +402,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(t.environment.JSON), &js)
if err != nil {
t.log("JSON is not valid")
return err
return nil, err
}
args = append(args, "--extra-vars", t.environment.JSON)
@ -334,7 +413,7 @@ func (t *task) runPlaybook() error {
err := json.Unmarshal([]byte(*t.template.Arguments), &extraArgs)
if err != nil {
t.log("Could not unmarshal arguments to []string")
return err
return nil, err
}
}
@ -344,13 +423,7 @@ func (t *task) runPlaybook() error {
args = append(args, extraArgs...)
args = append(args, playbookName)
}
cmd := exec.Command("ansible-playbook", args...)
cmd.Dir = util.Config.TmpPath + "/repository_" + strconv.Itoa(t.repository.ID)
cmd.Env = t.envVars(util.Config.TmpPath, cmd.Dir, nil)
t.logCmd(cmd)
return cmd.Run()
return args, nil
}
func (t *task) envVars(home string, pwd string, gitSSHCommand *string) []string {

View File

@ -69,6 +69,10 @@ type configType struct {
TelegramAlert bool `json:"telegram_alert"`
TelegramChat string `json:"telegram_chat"`
TelegramToken string `json:"telegram_token"`
// task concurrency
ConcurrencyMode string `json:"concurrency_mode"`
MaxParallelTasks int `json:"max_parallel_tasks"`
}
var Config *configType
@ -151,6 +155,10 @@ func init() {
Config.TmpPath = "/tmp/semaphore"
}
if Config.MaxParallelTasks < 1 {
Config.MaxParallelTasks = 10
}
var encryption []byte
encryption = nil
@ -342,7 +350,6 @@ func (conf *configType) Scan() {
if len(conf.LdapMappings.Mail) == 0 {
conf.LdapMappings.Mail = "mail"
}
} else {
conf.LdapEnable = false
}