Semaphore/services/schedules/pool.go

160 lines
2.9 KiB
Go
Raw Normal View History

2021-09-06 13:05:10 +02:00
package schedules
import (
log "github.com/Sirupsen/logrus"
2021-09-06 13:05:10 +02:00
"github.com/ansible-semaphore/semaphore/db"
2022-01-30 18:43:15 +01:00
"github.com/ansible-semaphore/semaphore/lib"
"github.com/ansible-semaphore/semaphore/services/tasks"
2021-09-06 13:05:10 +02:00
"github.com/robfig/cron/v3"
"sync"
2021-09-06 13:05:10 +02:00
)
type ScheduleRunner struct {
2022-01-31 11:43:13 +01:00
projectID int
scheduleID int
pool *SchedulePool
2021-09-06 13:05:10 +02:00
}
2022-01-31 11:43:13 +01:00
func (r ScheduleRunner) tryUpdateScheduleCommitHash(schedule db.Schedule) (updated bool, err error) {
repo, err := r.pool.store.GetRepository(schedule.ProjectID, *schedule.RepositoryID)
if err != nil {
return
}
2022-01-30 18:43:15 +01:00
err = repo.SSHKey.DeserializeSecret()
if err != nil {
return
}
2022-01-31 11:43:13 +01:00
remoteHash, err := lib.GitRepository{
Logger: nil,
TemplateID: schedule.TemplateID,
Repository: repo,
}.GetLastRemoteCommitHash()
2022-01-30 18:43:15 +01:00
2022-01-31 11:43:13 +01:00
if err != nil {
return
}
2022-01-30 18:43:15 +01:00
2022-01-31 11:43:13 +01:00
if schedule.LastCommitHash != nil && remoteHash == *schedule.LastCommitHash {
return
}
err = r.pool.store.SetScheduleCommitHash(schedule.ProjectID, schedule.ID, remoteHash)
if err != nil {
return
}
updated = true
return
}
2022-01-30 18:43:15 +01:00
2022-01-31 11:43:13 +01:00
func (r ScheduleRunner) Run() {
schedule, err := r.pool.store.GetSchedule(r.projectID, r.scheduleID)
if err != nil {
log.Error(err)
return
}
if schedule.RepositoryID != nil {
var updated bool
updated, err = r.tryUpdateScheduleCommitHash(schedule)
2022-01-30 18:43:15 +01:00
if err != nil {
log.Error(err)
return
}
2022-01-31 11:43:13 +01:00
if !updated {
return
}
2022-01-30 18:43:15 +01:00
}
2022-01-31 11:43:13 +01:00
_, err = r.pool.taskPool.AddTask(db.Task{
TemplateID: schedule.TemplateID,
ProjectID: schedule.ProjectID,
}, nil, schedule.ProjectID)
2022-01-30 18:43:15 +01:00
if err != nil {
log.Error(err)
}
2021-09-06 13:05:10 +02:00
}
type SchedulePool struct {
cron *cron.Cron
locker sync.Locker
store db.Store
taskPool *tasks.TaskPool
2021-09-06 13:05:10 +02:00
}
func (p *SchedulePool) init() {
2021-09-06 13:05:10 +02:00
p.cron = cron.New()
p.locker = &sync.Mutex{}
}
2021-09-06 13:05:10 +02:00
func (p *SchedulePool) Refresh() {
defer p.locker.Unlock()
schedules, err := p.store.GetSchedules()
2021-09-06 13:05:10 +02:00
if err != nil {
log.Error(err)
2021-09-06 13:05:10 +02:00
return
}
p.locker.Lock()
p.clear()
2021-09-06 13:05:10 +02:00
for _, schedule := range schedules {
_, err := p.addRunner(ScheduleRunner{
2022-01-31 11:43:13 +01:00
projectID: schedule.ProjectID,
scheduleID: schedule.ID,
pool: p,
}, schedule.CronFormat)
2021-09-06 13:05:10 +02:00
if err != nil {
log.Error(err)
2021-09-06 13:05:10 +02:00
}
}
}
2022-01-31 11:43:13 +01:00
func (p *SchedulePool) addRunner(runner ScheduleRunner, cronFormat string) (int, error) {
id, err := p.cron.AddJob(cronFormat, runner)
2021-09-06 13:05:10 +02:00
if err != nil {
return 0, err
2021-09-06 13:05:10 +02:00
}
return int(id), nil
2021-09-06 13:05:10 +02:00
}
func (p *SchedulePool) Run() {
2021-09-06 13:05:10 +02:00
p.cron.Run()
}
func (p *SchedulePool) clear() {
runners := p.cron.Entries()
for _, r := range runners {
p.cron.Remove(r.ID)
}
}
func (p *SchedulePool) Destroy() {
defer p.locker.Unlock()
p.locker.Lock()
p.cron.Stop()
p.clear()
p.cron = nil
}
2021-09-06 13:05:10 +02:00
func CreateSchedulePool(store db.Store, taskPool *tasks.TaskPool) SchedulePool {
pool := SchedulePool{
store: store,
taskPool: taskPool,
}
pool.init()
pool.Refresh()
return pool
2021-09-06 13:05:10 +02:00
}
func ValidateCronFormat(cronFormat string) error {
_, err := cron.ParseStandard(cronFormat)
return err
}