refactor(runners): runner <-> server comminication protocol

This commit is contained in:
Denis Gukov 2024-09-29 14:57:02 +05:00
parent 859cfea44e
commit 1ce8dd08a3
7 changed files with 79 additions and 90 deletions

View File

@ -93,9 +93,9 @@ func Route() *mux.Router {
runnersAPI := internalAPI.PathPrefix("/runners").Subrouter()
runnersAPI.Use(runners.RunnerMiddleware)
runnersAPI.Path("/{runner_id}").HandlerFunc(runners.GetRunner).Methods("GET", "HEAD")
runnersAPI.Path("/{runner_id}").HandlerFunc(runners.UpdateRunner).Methods("PUT")
runnersAPI.Path("/{runner_id}").HandlerFunc(runners.UnregisterRunner).Methods("DELETE")
runnersAPI.Path("/").HandlerFunc(runners.GetRunner).Methods("GET", "HEAD")
runnersAPI.Path("/").HandlerFunc(runners.UpdateRunner).Methods("PUT")
runnersAPI.Path("/").HandlerFunc(runners.UnregisterRunner).Methods("DELETE")
publicWebHookRouter := r.PathPrefix(webPath + "api").Subrouter()
publicWebHookRouter.Use(StoreMiddleware, JSONMiddleware)

View File

@ -199,11 +199,12 @@ func RegisterRunner(w http.ResponseWriter, r *http.Request) {
return
}
res := util.RunnerConfig{
RunnerID: runner.ID,
Token: runner.Token,
var res struct {
Token string `json:"token"`
}
res.Token = runner.Token
helpers.WriteJSON(w, http.StatusOK, res)
}

View File

@ -256,6 +256,7 @@ type Store interface {
GetRunner(projectID int, runnerID int) (Runner, error)
GetRunners(projectID int) ([]Runner, error)
DeleteRunner(projectID int, runnerID int) error
GetGlobalRunnerByToken(token string) (Runner, error)
GetGlobalRunner(runnerID int) (Runner, error)
GetGlobalRunners(activeOnly bool) ([]Runner, error)
DeleteGlobalRunner(runnerID int) error

View File

@ -17,6 +17,28 @@ func (d *BoltDb) DeleteRunner(projectID int, runnerID int) (err error) {
return
}
func (d *BoltDb) GetGlobalRunnerByToken(token string) (runner db.Runner, err error) {
runners := make([]db.Runner, 0)
err = d.getObjects(0, db.GlobalRunnerProps, db.RetrieveQueryParams{}, func(i interface{}) bool {
r := i.(db.Runner)
return r.Token == token
}, &runners)
if err != nil {
return
}
if len(runners) == 0 {
err = db.ErrNotFound
return
}
runner = runners[0]
return
}
func (d *BoltDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) {
err = d.getObject(0, db.GlobalRunnerProps, intObjectID(runnerID), &runner)

View File

@ -19,6 +19,27 @@ func (d *SqlDb) DeleteRunner(projectID int, runnerID int) (err error) {
return
}
func (d *SqlDb) GetGlobalRunnerByToken(token string) (runner db.Runner, err error) {
runners := make([]db.Runner, 0)
err = d.getObjects(0, db.GlobalRunnerProps, db.RetrieveQueryParams{}, func(builder squirrel.SelectBuilder) squirrel.SelectBuilder {
return builder.Where("token=?", token)
}, &runners)
if err != nil {
return
}
if len(runners) == 0 {
err = db.ErrNotFound
return
}
runner = runners[0]
return
}
func (d *SqlDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) {
err = d.getObject(0, db.GlobalRunnerProps, runnerID, &runner)
return

View File

@ -29,7 +29,7 @@ type JobPool struct {
queue []*job
config *util.RunnerConfig
token *string
processing int32
}
@ -56,13 +56,7 @@ func (p *JobPool) hasRunningJobs() bool {
func (p *JobPool) Unregister() (err error) {
config, err := util.LoadRunnerSettings(util.Config.Runner.ConfigFile)
if err != nil {
return
}
if config.Token == "" {
if util.Config.Runner.Token == "" {
return fmt.Errorf("runner is not registered")
}
@ -85,9 +79,8 @@ func (p *JobPool) Unregister() (err error) {
return
}
err = os.Remove(util.Config.Runner.ConfigFile)
if err != nil {
return
if util.Config.Runner.TokenFile != "" {
err = os.Remove(util.Config.Runner.TokenFile)
}
return
@ -190,7 +183,7 @@ func (p *JobPool) sendProgress() {
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
url := util.Config.Runner.ApiURL + "/runners/" + *p.token
body := RunnerProgress{
Jobs: nil,
@ -220,7 +213,7 @@ func (p *JobPool) sendProgress() {
return
}
req.Header.Set("X-Runner-Token", p.config.Token)
req.Header.Set("X-Runner-Token", *p.token)
resp, err := client.Do(req)
if err != nil {
@ -232,20 +225,20 @@ func (p *JobPool) sendProgress() {
}
func (p *JobPool) tryRegisterRunner() bool {
if p.config != nil {
if p.token != nil {
return true
}
log.Info("Attempting to register on the server")
config, err := util.LoadRunnerSettings(util.Config.Runner.ConfigFile)
//config, err := util.LoadRunnerSettings(util.Config.Runner.ConfigFile)
//
//if err != nil {
// panic(err)
//}
if err != nil {
panic(err)
}
if config.Token != "" {
p.config = &config
if util.Config.Runner.Token != "" {
p.token = &util.Config.Runner.Token
return true
}
@ -285,21 +278,19 @@ func (p *JobPool) tryRegisterRunner() bool {
return false
}
err = json.Unmarshal(body, &config)
var res struct {
Token string `json:"token"`
}
err = json.Unmarshal(body, &res)
if err != nil {
fmt.Println("Error parsing JSON:", err)
return false
}
configBytes, err := json.Marshal(config)
err = os.WriteFile(util.Config.Runner.TokenFile, []byte(res.Token), 0644)
if err != nil {
panic("cannot save runner config")
}
err = os.WriteFile(util.Config.Runner.ConfigFile, configBytes, 0644)
p.config = &config
p.token = &res.Token
defer resp.Body.Close()
@ -309,13 +300,18 @@ func (p *JobPool) tryRegisterRunner() bool {
// checkNewJobs tries to find runner to queued jobs
func (p *JobPool) checkNewJobs() {
if p.token == nil {
fmt.Println("Error creating request:", "no token provided")
return
}
client := &http.Client{}
url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID)
url := util.Config.Runner.ApiURL + "/runners/" + *p.token
req, err := http.NewRequest("GET", url, nil)
req.Header.Set("X-Runner-Token", p.config.Token)
req.Header.Set("X-Runner-Token", *p.token)
if err != nil {
fmt.Println("Error creating request:", err)

View File

@ -91,19 +91,10 @@ const (
//
// */
// Deprecated
type RunnerConfig struct {
RunnerID int `json:"runner_id" env:"SEMAPHORE_RUNNER_ID"`
Token string `json:"token" env:"SEMAPHORE_RUNNER_TOKEN"`
}
type RunnerSettings struct {
ApiURL string `json:"api_url" env:"SEMAPHORE_RUNNER_API_URL"`
RegistrationToken string `json:"registration_token" env:"SEMAPHORE_RUNNER_REGISTRATION_TOKEN"`
// Deprecated
ConfigFile string `json:"config_file" env:"SEMAPHORE_RUNNER_CONFIG_FILE"`
Token string `json:"token" env:"SEMAPHORE_RUNNER_TOKEN"`
TokenFile string `json:"token_file" env:"SEMAPHORE_RUNNER_TOKEN_FILE"`
@ -219,49 +210,6 @@ func (conf *ConfigType) ToJSON() ([]byte, error) {
return json.MarshalIndent(&conf, " ", "\t")
}
func LoadRunnerSettings(path string) (config RunnerConfig, err error) {
configFileExists := false
if path != "" {
_, err = os.Stat(path)
if os.IsNotExist(err) {
configFileExists = false
} else if err != nil {
return
} else {
configFileExists = true
}
}
if configFileExists {
var configBytes []byte
configBytes, err = os.ReadFile(path)
if err != nil {
return
}
err = json.Unmarshal(configBytes, &config)
if err != nil {
return
}
}
err = loadEnvironmentToObject(&config)
if err != nil {
return
}
err = loadDefaultsToObject(&config)
return
}
// ConfigInit reads in cli flags, and switches actions appropriately on them
func ConfigInit(configPath string, noConfigFile bool) {
fmt.Println("Loading config")