Semaphore/services/tasks/TaskRunner.go

339 lines
7.5 KiB
Go
Raw Normal View History

package tasks
import (
"encoding/json"
"errors"
"os"
"strconv"
"strings"
"time"
2021-12-14 16:24:17 +01:00
"github.com/ansible-semaphore/semaphore/api/sockets"
"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/pkg/task_logger"
2021-12-14 16:24:17 +01:00
"github.com/ansible-semaphore/semaphore/util"
2024-03-12 01:44:04 +01:00
log "github.com/sirupsen/logrus"
)
type Job interface {
Run(username string, incomingVersion *string) error
Kill()
}
type TaskRunner struct {
Task db.Task
Template db.Template
Inventory db.Inventory
Repository db.Repository
Environment db.Environment
users []int
alert bool
alertChat *string
pool *TaskPool
// job executes Ansible and returns stdout to Semaphore logs
job Job
RunnerID int
Username string
IncomingVersion *string
statusListeners []task_logger.StatusListener
logListeners []task_logger.LogListener
}
func (t *TaskRunner) AddStatusListener(l task_logger.StatusListener) {
t.statusListeners = append(t.statusListeners, l)
}
func (t *TaskRunner) AddLogListener(l task_logger.LogListener) {
t.logListeners = append(t.logListeners, l)
}
2023-08-30 11:13:48 +02:00
func (t *TaskRunner) saveStatus() {
2021-08-25 17:37:19 +02:00
for _, user := range t.users {
b, err := json.Marshal(&map[string]interface{}{
"type": "update",
"start": t.Task.Start,
"end": t.Task.End,
"status": t.Task.Status,
"task_id": t.Task.ID,
"template_id": t.Task.TemplateID,
"project_id": t.Task.ProjectID,
"version": t.Task.Version,
2021-08-25 17:37:19 +02:00
})
util.LogPanic(err)
sockets.Message(user, b)
}
if err := t.pool.store.UpdateTask(t.Task); err != nil {
t.panicOnError(err, "Failed to update TaskRunner status")
2021-08-25 17:37:19 +02:00
}
}
func (t *TaskRunner) kill() {
t.job.Kill()
2016-04-17 02:20:23 +02:00
}
func (t *TaskRunner) createTaskEvent() {
objType := db.EventTask
desc := "Task ID " + strconv.Itoa(t.Task.ID) + " (" + t.Template.Name + ")" + " finished - " + strings.ToUpper(string(t.Task.Status))
2021-08-30 21:42:11 +02:00
_, err := t.pool.store.CreateEvent(db.Event{
UserID: t.Task.UserID,
ProjectID: &t.Task.ProjectID,
2021-08-30 21:42:11 +02:00
ObjectType: &objType,
ObjectID: &t.Task.ID,
2021-08-30 21:42:11 +02:00
Description: &desc,
})
if err != nil {
t.panicOnError(err, "Fatal error inserting an event")
}
}
func (t *TaskRunner) run() {
2022-11-20 09:15:29 +01:00
if !t.pool.store.PermanentConnection() {
t.pool.store.Connect("run task " + strconv.Itoa(t.Task.ID))
defer t.pool.store.Close("run task " + strconv.Itoa(t.Task.ID))
2022-11-20 09:15:29 +01:00
}
defer func() {
log.Info("Stopped running TaskRunner " + strconv.Itoa(t.Task.ID))
log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.Task.ID))
t.pool.resourceLocker <- &resourceLock{lock: false, holder: t}
now := time.Now()
t.Task.End = &now
2023-08-30 11:13:48 +02:00
t.saveStatus()
t.createTaskEvent()
}()
2023-09-12 20:58:44 +02:00
// Mark task as stopped if user stopped task during preparation (before task run).
if t.Task.Status == task_logger.TaskStoppingStatus {
t.SetStatus(task_logger.TaskStoppedStatus)
2021-08-25 17:37:19 +02:00
return
}
t.SetStatus(task_logger.TaskStartingStatus)
objType := db.EventTask
desc := "Task ID " + strconv.Itoa(t.Task.ID) + " (" + t.Template.Name + ")" + " is running"
_, err := t.pool.store.CreateEvent(db.Event{
UserID: t.Task.UserID,
ProjectID: &t.Task.ProjectID,
ObjectType: &objType,
ObjectID: &t.Task.ID,
Description: &desc,
})
if err != nil {
t.Log("Fatal error inserting an event")
panic(err)
}
t.Log("Started: " + strconv.Itoa(t.Task.ID))
t.Log("Run TaskRunner with template: " + t.Template.Name + "\n")
var username string
var incomingVersion *string
if t.Task.UserID != nil {
var user db.User
user, err = t.pool.store.GetUser(*t.Task.UserID)
if err == nil {
username = user.Username
}
}
if t.Template.Type != db.TemplateTask {
incomingVersion = t.Task.GetIncomingVersion(t.pool.store)
}
err = t.job.Run(username, incomingVersion)
2022-01-19 20:35:59 +01:00
if err != nil {
t.Log("Running playbook failed: " + err.Error())
t.SetStatus(task_logger.TaskFailStatus)
return
}
2016-04-17 02:20:23 +02:00
if t.Task.Status == task_logger.TaskRunningStatus {
t.SetStatus(task_logger.TaskSuccessStatus)
}
2022-01-19 20:35:59 +01:00
templates, err := t.pool.store.GetTemplates(t.Task.ProjectID, db.TemplateFilter{
BuildTemplateID: &t.Task.TemplateID,
2022-01-19 20:35:59 +01:00
AutorunOnly: true,
}, db.RetrieveQueryParams{})
if err != nil {
t.Log("Running playbook failed: " + err.Error())
2022-01-19 20:35:59 +01:00
return
}
for _, tpl := range templates {
_, err = t.pool.AddTask(db.Task{
2022-01-19 20:35:59 +01:00
TemplateID: tpl.ID,
ProjectID: tpl.ProjectID,
BuildTaskID: &t.Task.ID,
2022-01-19 20:35:59 +01:00
}, nil, tpl.ProjectID)
if err != nil {
t.Log("Running playbook failed: " + err.Error())
continue
2022-01-19 20:35:59 +01:00
}
}
}
func (t *TaskRunner) prepareError(err error, errMsg string) error {
if errors.Is(err, db.ErrNotFound) {
t.Log(errMsg)
return err
}
if err != nil {
t.SetStatus(task_logger.TaskFailStatus)
panic(err)
}
return nil
}
// nolint: gocyclo
func (t *TaskRunner) populateDetails() error {
// get template
var err error
t.Template, err = t.pool.store.GetTemplate(t.Task.ProjectID, t.Task.TemplateID)
if err != nil {
return t.prepareError(err, "Template not found!")
}
2017-04-18 16:36:09 +02:00
// get project alert setting
project, err := t.pool.store.GetProject(t.Template.ProjectID)
if err != nil {
return t.prepareError(err, "Project not found!")
2017-03-10 07:25:42 +01:00
}
t.alert = project.Alert
t.alertChat = project.AlertChat
2016-04-17 12:41:36 +02:00
// get project users
projectUsers, err := t.pool.store.GetProjectUsers(t.Template.ProjectID, db.RetrieveQueryParams{})
2021-03-12 21:30:17 +01:00
if err != nil {
return t.prepareError(err, "Users not found!")
}
2016-04-17 12:41:36 +02:00
users := make(map[int]bool)
for _, user := range projectUsers {
users[user.ID] = true
2016-04-17 12:41:36 +02:00
}
2024-04-02 23:50:52 +02:00
admins, err := t.pool.store.GetAllAdmins()
if err != nil {
return err
}
for _, admin := range admins {
users[admin.ID] = true
}
t.users = []int{}
for userID := range users {
t.users = append(t.users, userID)
2024-04-02 23:50:52 +02:00
}
// get inventory
if t.Template.InventoryID != nil {
t.Inventory, err = t.pool.store.GetInventory(t.Template.ProjectID, *t.Template.InventoryID)
if err != nil {
return t.prepareError(err, "Template Inventory not found!")
}
}
// get repository
t.Repository, err = t.pool.store.GetRepository(t.Template.ProjectID, t.Template.RepositoryID)
if err != nil {
return err
}
err = t.Repository.SSHKey.DeserializeSecret()
if err != nil {
return err
}
// get environment
if t.Template.EnvironmentID != nil {
t.Environment, err = t.pool.store.GetEnvironment(t.Template.ProjectID, *t.Template.EnvironmentID)
if err != nil {
return err
}
var secrets []db.AccessKey
secrets, err = t.pool.store.GetEnvironmentSecrets(t.Template.ProjectID, *t.Template.EnvironmentID)
if err != nil {
return err
}
for _, s := range secrets {
err = s.DeserializeSecret()
if err != nil {
return err
}
t.Environment.Secrets = append(t.Environment.Secrets, db.EnvironmentSecret{
ID: s.ID,
Name: s.Name,
Secret: s.String,
})
}
}
if t.Task.Environment != "" {
environment := make(map[string]interface{})
if t.Environment.JSON != "" {
err = json.Unmarshal([]byte(t.Task.Environment), &environment)
if err != nil {
return err
}
}
taskEnvironment := make(map[string]interface{})
err = json.Unmarshal([]byte(t.Environment.JSON), &taskEnvironment)
if err != nil {
return err
}
for k, v := range taskEnvironment {
environment[k] = v
}
var ev []byte
ev, err = json.Marshal(environment)
if err != nil {
return err
}
t.Environment.JSON = string(ev)
}
return nil
}
// checkTmpDir checks to see if the temporary directory exists
// and if it does not attempts to create it
func checkTmpDir(path string) error {
var err error
if _, err = os.Stat(path); err != nil {
if os.IsNotExist(err) {
2018-02-28 10:02:54 +01:00
return os.MkdirAll(path, 0700)
}
}
return err
}