Semaphore/services/runners/JobPool.go

549 lines
12 KiB
Go
Raw Normal View History

//
// Runner's job pool. NOT SERVER!!!
// Runner gets jobs from the server and put them to this pool.
//
package runners
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"github.com/ansible-semaphore/semaphore/db"
2023-09-23 17:12:35 +02:00
"github.com/ansible-semaphore/semaphore/db_lib"
"github.com/ansible-semaphore/semaphore/lib"
"github.com/ansible-semaphore/semaphore/services/tasks"
"github.com/ansible-semaphore/semaphore/util"
2024-03-10 20:07:19 +01:00
log "github.com/sirupsen/logrus"
"io"
"net/http"
"os"
"os/exec"
"strconv"
2023-09-20 02:17:41 +02:00
"sync/atomic"
"time"
)
type jobLogRecord struct {
taskID int
record LogRecord
}
type resourceLock struct {
lock bool
holder *job
}
// job presents current job on semaphore server.
type job struct {
username string
incomingVersion *string
// job presents remote or local job information
job *tasks.LocalJob
2023-09-23 17:12:35 +02:00
status lib.TaskStatus
args []string
environmentVars []string
}
type JobData struct {
Username string
IncomingVersion *string
Task db.Task `json:"task" binding:"required"`
Template db.Template `json:"template" binding:"required"`
Inventory db.Inventory `json:"inventory" binding:"required"`
Repository db.Repository `json:"repository" binding:"required"`
Environment db.Environment `json:"environment" binding:"required"`
}
type RunnerState struct {
CurrentJobs []JobState
NewJobs []JobData `json:"new_jobs" binding:"required"`
AccessKeys map[int]db.AccessKey `json:"access_keys" binding:"required"`
}
type JobState struct {
2023-09-23 17:12:35 +02:00
ID int `json:"id" binding:"required"`
Status lib.TaskStatus `json:"status" binding:"required"`
}
type LogRecord struct {
Time time.Time `json:"time" binding:"required"`
Message string `json:"message" binding:"required"`
}
type RunnerProgress struct {
Jobs []JobProgress
}
type JobProgress struct {
ID int
2023-09-23 17:12:35 +02:00
Status lib.TaskStatus
LogRecords []LogRecord
}
type runningJob struct {
2023-09-23 17:12:35 +02:00
status lib.TaskStatus
logRecords []LogRecord
job *tasks.LocalJob
}
type JobPool struct {
// logger channel used to putting log records to database.
logger chan jobLogRecord
// register channel used to put tasks to queue.
register chan *job
runningJobs map[int]*runningJob
queue []*job
config *util.RunnerConfig
2023-09-20 02:17:41 +02:00
processing int32
}
type RunnerRegistration struct {
RegistrationToken string `json:"registration_token" binding:"required"`
Webhook string `json:"webhook"`
MaxParallelTasks int `db:"max_parallel_tasks" json:"max_parallel_tasks"`
}
func (p *runningJob) Log2(msg string, now time.Time) {
p.logRecords = append(p.logRecords, LogRecord{Time: now, Message: msg})
}
func (p *JobPool) existsInQueue(taskID int) bool {
for _, j := range p.queue {
if j.job.Task.ID == taskID {
return true
}
}
return false
}
2023-09-12 19:40:22 +02:00
func (p *JobPool) hasRunningJobs() bool {
for _, j := range p.runningJobs {
if !j.status.IsFinished() {
return true
}
}
return false
}
func (p *runningJob) Log(msg string) {
p.Log2(msg, time.Now())
}
2023-09-23 17:12:35 +02:00
func (p *runningJob) SetStatus(status lib.TaskStatus) {
2024-03-12 02:20:30 +01:00
if p.status == status {
return
}
2023-09-12 19:40:22 +02:00
p.status = status
2024-03-12 01:44:04 +01:00
p.job.SetStatus(status)
2023-09-12 19:40:22 +02:00
}
func (p *runningJob) LogCmd(cmd *exec.Cmd) {
stderr, _ := cmd.StderrPipe()
stdout, _ := cmd.StdoutPipe()
go p.logPipe(bufio.NewReader(stderr))
go p.logPipe(bufio.NewReader(stdout))
}
func (p *runningJob) logPipe(reader *bufio.Reader) {
line, err := tasks.Readln(reader)
for err == nil {
p.Log(line)
line, err = tasks.Readln(reader)
}
if err != nil && err.Error() != "EOF" {
//don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error
util.LogWarningWithFields(err, log.Fields{"error": "Failed to read TaskRunner output"})
}
}
func (p *JobPool) Unregister() (err error) {
config, err := util.LoadRunnerSettings(util.Config.Runner.ConfigFile)
if err != nil {
return
}
if config.Token == "" {
return fmt.Errorf("runner is not registered")
}
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners"
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return
}
resp, err := client.Do(req)
if err != nil {
return
}
if resp.StatusCode >= 400 && resp.StatusCode != 404 {
err = fmt.Errorf("encountered error while unregistering runner; server returned code %d", resp.StatusCode)
return
}
err = os.Remove(util.Config.Runner.ConfigFile)
if err != nil {
return
}
return
}
func (p *JobPool) Run() {
queueTicker := time.NewTicker(5 * time.Second)
requestTimer := time.NewTicker(1 * time.Second)
p.runningJobs = make(map[int]*runningJob)
defer func() {
queueTicker.Stop()
requestTimer.Stop()
}()
for {
if p.tryRegisterRunner() {
log.Info("The runner has been started")
break
}
time.Sleep(5_000_000_000)
}
for {
select {
case <-queueTicker.C: // timer 5 seconds: get task from queue and run it
if len(p.queue) == 0 {
break
}
t := p.queue[0]
2023-09-23 17:12:35 +02:00
if t.status == lib.TaskFailStatus {
//delete failed TaskRunner from queue
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued (failed)")
break
}
p.runningJobs[t.job.Task.ID] = &runningJob{
job: t.job,
}
2024-03-12 01:44:04 +01:00
t.job.Logger = t.job.App.SetLogger(p.runningJobs[t.job.Task.ID])
go func(runningJob *runningJob) {
2023-09-23 17:12:35 +02:00
runningJob.SetStatus(lib.TaskRunningStatus)
err := runningJob.job.Run(t.username, t.incomingVersion)
if runningJob.status.IsFinished() {
return
}
if err != nil {
2023-09-23 17:12:35 +02:00
if runningJob.status == lib.TaskStoppingStatus {
runningJob.SetStatus(lib.TaskStoppedStatus)
} else {
2023-09-23 17:12:35 +02:00
runningJob.SetStatus(lib.TaskFailStatus)
}
} else {
2023-09-23 17:12:35 +02:00
runningJob.SetStatus(lib.TaskSuccessStatus)
}
log.Info("Task " + strconv.Itoa(runningJob.job.Task.ID) + " finished (" + string(runningJob.status) + ")")
}(p.runningJobs[t.job.Task.ID])
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " dequeued")
log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " started")
case <-requestTimer.C:
go func() {
if !atomic.CompareAndSwapInt32(&p.processing, 0, 1) {
return
}
2023-09-12 19:40:22 +02:00
defer atomic.StoreInt32(&p.processing, 0)
p.sendProgress()
if util.Config.Runner.OneOff && len(p.runningJobs) > 0 && !p.hasRunningJobs() {
os.Exit(0)
}
2023-09-12 19:40:22 +02:00
p.checkNewJobs()
}()
2023-09-12 19:40:22 +02:00
}
}
}
func (p *JobPool) sendProgress() {
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
body := RunnerProgress{
Jobs: nil,
}
for id, j := range p.runningJobs {
body.Jobs = append(body.Jobs, JobProgress{
ID: id,
LogRecords: j.logRecords,
Status: j.status,
})
j.logRecords = make([]LogRecord, 0)
if j.status.IsFinished() {
log.Info("Task " + strconv.Itoa(id) + " removed from running list")
delete(p.runningJobs, id)
}
}
jsonBytes, err := json.Marshal(body)
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(jsonBytes))
if err != nil {
fmt.Println("Error creating request:", err)
return
}
2024-01-07 20:50:37 +01:00
req.Header.Set("X-API-Token", p.config.Token)
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return
}
defer resp.Body.Close()
}
func (p *JobPool) tryRegisterRunner() bool {
if p.config != nil {
return true
}
2024-03-27 12:04:30 +01:00
log.Info("Attempting to register on the server")
2024-01-07 20:50:37 +01:00
config, err := util.LoadRunnerSettings(util.Config.Runner.ConfigFile)
2024-01-07 18:25:52 +01:00
if err != nil {
panic(err)
2024-01-07 20:50:37 +01:00
}
if config.Token != "" {
p.config = &config
return true
}
// Can not restore runner configuration. Register new runner on the server.
if util.Config.Runner.RegistrationToken == "" {
panic("registration token cannot be empty")
}
2024-03-27 12:04:30 +01:00
log.Info("Registering a new runner")
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners"
jsonBytes, err := json.Marshal(RunnerRegistration{
RegistrationToken: util.Config.Runner.RegistrationToken,
Webhook: util.Config.Runner.Webhook,
MaxParallelTasks: util.Config.Runner.MaxParallelTasks,
})
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
if err != nil {
2024-01-07 14:36:48 +01:00
log.Error("Error creating request:", err)
return false
}
resp, err := client.Do(req)
if err != nil || resp.StatusCode != 200 {
2024-01-07 14:36:48 +01:00
log.Error("Error making request:", err)
return false
}
2024-03-10 20:07:19 +01:00
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
return false
}
err = json.Unmarshal(body, &config)
if err != nil {
fmt.Println("Error parsing JSON:", err)
return false
}
configBytes, err := json.Marshal(config)
if err != nil {
panic("cannot save runner config")
}
err = os.WriteFile(util.Config.Runner.ConfigFile, configBytes, 0644)
p.config = &config
defer resp.Body.Close()
return true
}
// checkNewJobs tries to find runner to queued jobs
func (p *JobPool) checkNewJobs() {
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
req, err := http.NewRequest("GET", url, nil)
2024-01-07 17:35:02 +01:00
req.Header.Set("X-API-Token", p.config.Token)
if err != nil {
fmt.Println("Error creating request:", err)
return
}
resp, err := client.Do(req)
2024-01-07 14:36:48 +01:00
if err != nil {
fmt.Println("Error making request:", err)
return
}
2024-01-07 14:36:48 +01:00
2024-01-07 14:37:22 +01:00
if resp.StatusCode >= 400 {
2024-03-27 12:04:30 +01:00
log.Error("Encountered error while checking for new jobs; server returned code ", resp.StatusCode)
2024-01-07 14:36:48 +01:00
return
}
defer resp.Body.Close()
2024-03-10 20:07:19 +01:00
body, err := io.ReadAll(resp.Body)
if err != nil {
2024-03-27 12:04:30 +01:00
log.Error("Encountered error while checking for new jobs; unable to read response body:", err)
return
}
var response RunnerState
err = json.Unmarshal(body, &response)
if err != nil {
2024-01-07 14:36:48 +01:00
log.Error("Checking new jobs, parsing JSON error:", err)
return
}
for _, currJob := range response.CurrentJobs {
runJob, exists := p.runningJobs[currJob.ID]
if !exists {
continue
}
2023-09-23 17:12:35 +02:00
if runJob.status == lib.TaskStoppingStatus || runJob.status == lib.TaskStoppedStatus {
p.runningJobs[currJob.ID].job.Kill()
}
if runJob.status.IsFinished() {
continue
}
switch runJob.status {
case lib.TaskRunningStatus:
if currJob.Status == lib.TaskStartingStatus || currJob.Status == lib.TaskWaitingStatus {
continue
}
case lib.TaskStoppingStatus:
if !currJob.Status.IsFinished() {
continue
}
case lib.TaskConfirmed:
if currJob.Status == lib.TaskWaitingConfirmation {
continue
}
}
runJob.SetStatus(currJob.Status)
}
2023-09-12 19:40:22 +02:00
if util.Config.Runner.OneOff {
if len(p.queue) > 0 || len(p.runningJobs) > 0 {
return
}
}
for _, newJob := range response.NewJobs {
if _, exists := p.runningJobs[newJob.Task.ID]; exists {
continue
}
if p.existsInQueue(newJob.Task.ID) {
continue
}
taskRunner := job{
username: newJob.Username,
incomingVersion: newJob.IncomingVersion,
job: &tasks.LocalJob{
Task: newJob.Task,
Template: newJob.Template,
Inventory: newJob.Inventory,
Repository: newJob.Repository,
Environment: newJob.Environment,
2024-02-04 21:38:15 +01:00
App: db_lib.CreateApp(newJob.Template, newJob.Repository, nil),
},
}
taskRunner.job.Repository.SSHKey = response.AccessKeys[taskRunner.job.Repository.SSHKeyID]
if taskRunner.job.Inventory.SSHKeyID != nil {
taskRunner.job.Inventory.SSHKey = response.AccessKeys[*taskRunner.job.Inventory.SSHKeyID]
}
if taskRunner.job.Inventory.BecomeKeyID != nil {
taskRunner.job.Inventory.BecomeKey = response.AccessKeys[*taskRunner.job.Inventory.BecomeKeyID]
}
2023-09-11 02:00:10 +02:00
if taskRunner.job.Template.VaultKeyID != nil {
taskRunner.job.Template.VaultKey = response.AccessKeys[*taskRunner.job.Template.VaultKeyID]
}
p.queue = append(p.queue, &taskRunner)
log.Info("Task " + strconv.Itoa(taskRunner.job.Task.ID) + " enqueued")
}
}