* refactor(runners): extract jobs to separate entity
This commit is contained in:
Denis Gukov 2023-08-27 18:02:51 +02:00 committed by GitHub
parent bb4a9d9b8e
commit b5a99eba7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 447 additions and 10 deletions

54
api-docs-runners.yml Normal file
View File

@ -0,0 +1,54 @@
swagger: '3.0'
info:
title: SEMAPHORE
description: Semaphore Runner API
version: "2.2.0"
host: localhost:3000
consumes:
- application/json
produces:
- application/json
- text/plain; charset=utf-8
tags:
- name: authentication
description: Authentication, Logout & API Tokens
- name: project
description: Everything related to a project
- name: user
description: User-related API
schemes:
- http
- https
basePath: /api/runners
definitions:
paths:
/register:
post:
requestBody:
content:
application/json:
schema:
type: object
required:
- registrationToken
properties:
registrationToken: { type: string }
responses:
200:
description: API Token
/unregister:
post:
/status:
put:
/jobs:
get:

52
api/runners/handler.go Normal file
View File

@ -0,0 +1,52 @@
package runners
import (
"github.com/ansible-semaphore/semaphore/api/helpers"
"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/util"
"github.com/gorilla/mux"
"net/http"
"strings"
)
func RunnerRoute() *mux.Router {
r := mux.NewRouter()
webPath := "/"
if util.WebHostURL != nil {
webPath = util.WebHostURL.Path
if !strings.HasSuffix(webPath, "/") {
webPath += "/"
}
}
pingRouter := r.Path(webPath + "api/runners/register").Subrouter()
pingRouter.Methods("POST", "HEAD").HandlerFunc(registerRunner)
return r
}
func registerRunner(w http.ResponseWriter, r *http.Request) {
var register struct {
RegistrationToken string `json:"registration_token" binding:"required"`
}
if !helpers.Bind(w, r, &register) {
return
}
if register.RegistrationToken != util.Config.RegistrationToken {
return
}
runner, err := helpers.Store(r).CreateRunner(db.Runner{
State: db.RunnerActive,
})
if err != nil {
return
}
helpers.WriteJSON(w, http.StatusOK, runner)
}

25
cli/cmd/runner.go Normal file
View File

@ -0,0 +1,25 @@
package cmd
import (
"github.com/ansible-semaphore/semaphore/services/runners"
"github.com/spf13/cobra"
)
func init() {
rootCmd.AddCommand(runnerCmd)
}
func runRunner() {
taskPool := runners.JobPool{}
go taskPool.Run()
}
var runnerCmd = &cobra.Command{
Use: "runner",
Short: "Run in runner mode",
Run: func(cmd *cobra.Command, args []string) {
runRunner()
},
}

15
db/Runner.go Normal file
View File

@ -0,0 +1,15 @@
package db
type RunnerState string
const (
RunnerOffline RunnerState = "offline"
RunnerActive RunnerState = "active"
)
type Runner struct {
ID int `db:"id" json:"-"`
Token string `db:"token" json:"token"`
ProjectID *int `db:"project_id" json:"project_id"`
State RunnerState `db:"state" json:"state"`
}

View File

@ -199,6 +199,15 @@ type Store interface {
CreateView(view View) (View, error) CreateView(view View) (View, error)
DeleteView(projectID int, viewID int) error DeleteView(projectID int, viewID int) error
SetViewPositions(projectID int, viewPositions map[int]int) error SetViewPositions(projectID int, viewPositions map[int]int) error
GetRunner(projectID int, runnerID int) (Runner, error)
GetRunners(projectID int) ([]Runner, error)
DeleteRunner(projectID int, runnerID int) error
GetGlobalRunner(runnerID int) (Runner, error)
GetGlobalRunners() ([]Runner, error)
DeleteGlobalRunner(runnerID int) error
UpdateRunner(runner Runner) error
CreateRunner(runner Runner) (Runner, error)
} }
var AccessKeyProps = ObjectProps{ var AccessKeyProps = ObjectProps{
@ -304,6 +313,13 @@ var ViewProps = ObjectProps{
DefaultSortingColumn: "position", DefaultSortingColumn: "position",
} }
var RunnerProps = ObjectProps{
TableName: "runner",
Type: reflect.TypeOf(Runner{}),
PrimaryColumnName: "id",
IsGlobal: true,
}
func (p ObjectProps) GetReferringFieldsFrom(t reflect.Type) (fields []string, err error) { func (p ObjectProps) GetReferringFieldsFrom(t reflect.Type) (fields []string, err error) {
n := t.NumField() n := t.NumField()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {

35
db/bolt/runner.go Normal file
View File

@ -0,0 +1,35 @@
package bolt
import "github.com/ansible-semaphore/semaphore/db"
func (d *BoltDb) GetRunner(projectID int, runnerID int) (runner db.Runner, err error) {
return
}
func (d *BoltDb) GetRunners(projectID int) (runners []db.Runner, err error) {
return
}
func (d *BoltDb) DeleteRunner(projectID int, runnerID int) (err error) {
return
}
func (d *BoltDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) {
return
}
func (d *BoltDb) GetGlobalRunners() (runners []db.Runner, err error) {
return
}
func (d *BoltDb) DeleteGlobalRunner(runnerID int) (err error) {
return
}
func (d *BoltDb) UpdateRunner(runner db.Runner) (err error) {
return
}
func (d *BoltDb) CreateRunner(runner db.Runner) (newRunner db.Runner, err error) {
return
}

35
db/sql/runner.go Normal file
View File

@ -0,0 +1,35 @@
package sql
import "github.com/ansible-semaphore/semaphore/db"
func (d *SqlDb) GetRunner(projectID int, runnerID int) (runner db.Runner, err error) {
return
}
func (d *SqlDb) GetRunners(projectID int) (runners []db.Runner, err error) {
return
}
func (d *SqlDb) DeleteRunner(projectID int, runnerID int) (err error) {
return
}
func (d *SqlDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) {
return
}
func (d *SqlDb) GetGlobalRunners() (runners []db.Runner, err error) {
return
}
func (d *SqlDb) DeleteGlobalRunner(runnerID int) (err error) {
return
}
func (d *SqlDb) UpdateRunner(runner db.Runner) (err error) {
return
}
func (d *SqlDb) CreateRunner(runner db.Runner) (newRunner db.Runner, err error) {
return
}

158
services/runners/JobPool.go Normal file
View File

@ -0,0 +1,158 @@
//
// Runner's job pool. NOT SERVER!!!
// Runner gets jobs from the server and put them to this pool.
//
package runners
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/services/tasks"
"io/ioutil"
"net/http"
"strconv"
"time"
)
type logRecord struct {
job *job
output string
time time.Time
}
type resourceLock struct {
lock bool
holder *job
}
// job presents current job on semaphore server.
type job struct {
// job presents remote or local job information
job *tasks.LocalAnsibleJob
Status db.TaskStatus
kind jobType
args []string
environmentVars []string
id int
}
type jobType int
type Response struct {
Message string `json:"message"`
Status int `json:"status"`
}
const (
playbook jobType = iota
galaxy
)
func (j *job) run() {
var err error
switch j.kind {
case playbook:
err = j.job.RunPlaybook(j.args, &j.environmentVars, nil)
case galaxy:
err = j.job.RunGalaxy(j.args)
default:
panic("Unknown job type")
}
if err != nil {
// TODO: some logging
}
}
type JobPool struct {
// logger channel used to putting log records to database.
logger chan logRecord
// register channel used to put tasks to queue.
register chan *job
resourceLocker chan *resourceLock
logRecords []logRecord
queue []*job
}
func (p *JobPool) Run() {
ticker := time.NewTicker(5 * time.Second)
defer func() {
ticker.Stop()
}()
for {
select {
case record := <-p.logger: // new log message which should be put to database
p.logRecords = append(p.logRecords, record)
case job := <-p.register: // new task created by API or schedule
p.queue = append(p.queue, job)
case <-ticker.C: // timer 5 seconds: get task from queue and run it
if len(p.queue) == 0 {
break
}
t := p.queue[0]
if t.Status == db.TaskFailStatus {
//delete failed TaskRunner from queue
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.id) + " removed from queue")
break
}
log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id))
p.resourceLocker <- &resourceLock{lock: true, holder: t}
go t.run()
p.queue = p.queue[1:]
log.Info("Task " + strconv.Itoa(t.id) + " removed from queue")
}
}
}
// checkNewJobs tries to find runner to queued jobs
func (p *JobPool) checkNewJobs() {
client := &http.Client{}
url := "https://example.com"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
fmt.Println("Error creating request:", err)
return
}
resp, err := client.Do(req)
if err != nil {
fmt.Println("Error making request:", err)
return
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
return
}
var response Response
err = json.Unmarshal(body, &response)
if err != nil {
fmt.Println("Error parsing JSON:", err)
return
}
taskRunner := job{
job: &tasks.LocalAnsibleJob{},
}
p.register <- &taskRunner
}

View File

@ -0,0 +1,23 @@
package tasks
import (
"github.com/ansible-semaphore/semaphore/lib"
"os"
)
type AnsibleJob interface {
RunGalaxy(args []string) error
RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error
}
type LocalAnsibleJob struct {
playbook *lib.AnsiblePlaybook
}
func (j *LocalAnsibleJob) RunGalaxy(args []string) error {
return j.playbook.RunGalaxy(args)
}
func (j *LocalAnsibleJob) RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error {
return j.playbook.RunPlaybook(args, environmentVars, cb)
}

View File

@ -0,0 +1,12 @@
package tasks
import "github.com/ansible-semaphore/semaphore/lib"
// RunnerPool is a collection of the registered runners.
type RunnerPool struct {
}
func (p *RunnerPool) CreateJob(playbook *lib.AnsiblePlaybook) (AnsibleJob, error) {
return &LocalAnsibleJob{playbook: playbook}, nil
}

View File

@ -2,6 +2,7 @@ package tasks
import ( import (
"github.com/ansible-semaphore/semaphore/db" "github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/lib"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
@ -40,6 +41,8 @@ type TaskPool struct {
store db.Store store db.Store
resourceLocker chan *resourceLock resourceLocker chan *resourceLock
runners RunnerPool
} }
func (p *TaskPool) GetTask(id int) (task *TaskRunner) { func (p *TaskPool) GetTask(id int) (task *TaskRunner) {
@ -331,6 +334,18 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask
return return
} }
job, err := p.runners.CreateJob(&lib.AnsiblePlaybook{
Logger: &taskRunner,
TemplateID: taskRunner.template.ID,
Repository: taskRunner.repository,
})
if err != nil {
return
}
taskRunner.job = job
p.register <- &taskRunner p.register <- &taskRunner
objType := db.EventTask objType := db.EventTask

View File

@ -33,6 +33,9 @@ type TaskRunner struct {
prepared bool prepared bool
process *os.Process process *os.Process
pool *TaskPool pool *TaskPool
// job executes Ansible and returns stdout to Semaphore logs
job AnsibleJob
} }
func getMD5Hash(filepath string) (string, error) { func getMD5Hash(filepath string) (string, error) {
@ -573,11 +576,7 @@ func (t *TaskRunner) installRequirements() error {
} }
func (t *TaskRunner) runGalaxy(args []string) error { func (t *TaskRunner) runGalaxy(args []string) error {
return lib.AnsiblePlaybook{ return t.job.RunGalaxy(args)
Logger: t,
TemplateID: t.template.ID,
Repository: t.repository,
}.RunGalaxy(args)
} }
func (t *TaskRunner) runPlaybook() (err error) { func (t *TaskRunner) runPlaybook() (err error) {
@ -591,11 +590,7 @@ func (t *TaskRunner) runPlaybook() (err error) {
return return
} }
return lib.AnsiblePlaybook{ return t.job.RunPlaybook(args, &environmentVariables, func(p *os.Process) { t.process = p })
Logger: t,
TemplateID: t.template.ID,
Repository: t.repository,
}.RunPlaybook(args, &environmentVariables, func(p *os.Process) { t.process = p })
} }
func (t *TaskRunner) getEnvironmentENV() (arr []string, err error) { func (t *TaskRunner) getEnvironmentENV() (arr []string, err error) {

View File

@ -150,6 +150,8 @@ type ConfigType struct {
// task concurrency // task concurrency
MaxParallelTasks int `json:"max_parallel_tasks"` MaxParallelTasks int `json:"max_parallel_tasks"`
RegistrationToken string `json:"registration_token"`
// feature switches // feature switches
PasswordLoginDisable bool `json:"password_login_disable"` PasswordLoginDisable bool `json:"password_login_disable"`
NonAdminCanCreateProject bool `json:"non_admin_can_create_project"` NonAdminCanCreateProject bool `json:"non_admin_can_create_project"`