feat(be): use chan for storing logs to database

This commit is contained in:
Denis Gukov 2021-08-31 03:12:33 +05:00
parent 21c14d82dc
commit c1c8a9ebf9
3 changed files with 47 additions and 30 deletions

View File

@ -3,7 +3,6 @@ package tasks
import (
"bufio"
"encoding/json"
"github.com/ansible-semaphore/semaphore/db"
"os/exec"
"time"
@ -29,15 +28,11 @@ func (t *task) log(msg string) {
sockets.Message(user, b)
}
go func() {
_, err := t.store.CreateTaskOutput(db.TaskOutput{
TaskID: t.task.ID,
Output: msg,
Time: now,
})
util.LogPanicWithFields(err, log.Fields{"error": "Failed to insert task output"})
}()
pool.logger <- logRecord{
task: t,
output: msg,
time: now,
}
}
// Readln reads from the pipe

View File

@ -1,6 +1,7 @@
package tasks
import (
"github.com/ansible-semaphore/semaphore/db"
"strconv"
"time"
@ -8,6 +9,12 @@ import (
"github.com/ansible-semaphore/semaphore/util"
)
type logRecord struct {
task *task
output string
time time.Time
}
type taskPool struct {
queue []*task
register chan *task
@ -15,6 +22,7 @@ type taskPool struct {
activeNodes map[string]*task
running int
runningTasks map[int]*task
logger chan logRecord
}
type resourceLock struct {
@ -23,12 +31,13 @@ type resourceLock struct {
}
var pool = taskPool{
queue: make([]*task, 0),
register: make(chan *task),
queue: make([]*task, 0), // queue of waiting tasks
register: make(chan *task), // add task to queue
activeProj: make(map[int]*task),
activeNodes: make(map[string]*task),
running: 0,
runningTasks: make(map[int]*task),
running: 0, // number of running tasks
runningTasks: make(map[int]*task), // working tasks
logger: make(chan logRecord, 1000), // store log records to database
}
var resourceLocker = make(chan *resourceLock)
@ -78,12 +87,20 @@ func (p *taskPool) run() {
for {
select {
case record := <-p.logger:
err, _ := record.task.store.CreateTaskOutput(db.TaskOutput{
TaskID: record.task.task.ID,
Output: record.output,
Time: record.time,
})
log.Error(err)
case task := <-p.register:
p.queue = append(p.queue, task)
log.Debug(task)
msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue"
task.log(msg)
log.Info(msg)
case <-ticker.C:
if len(p.queue) == 0 {
continue

View File

@ -89,6 +89,17 @@ func (t *task) fail() {
t.sendTelegramAlert()
}
func (t *task) destroyKeys() {
err := t.destroyKey(t.repository.SSHKey)
if err != nil {
t.log("Can't destroy repository SSH key, error: " + err.Error())
}
err = t.destroyKey(t.inventory.SSHKey)
if err != nil {
t.log("Can't destroy inventory SSH key, error: " + err.Error())
}
}
func (t *task) createTaskEvent() {
objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
@ -193,22 +204,8 @@ func (t *task) run() {
now := time.Now()
t.task.End = &now
t.updateStatus()
objType := taskTypeID
desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Alias + ")" + " finished - " + strings.ToUpper(t.task.Status)
_, err := t.store.CreateEvent(db.Event{
UserID: t.task.UserID,
ProjectID: &t.projectID,
ObjectType: &objType,
ObjectID: &t.task.ID,
Description: &desc,
})
if err != nil {
t.log("Fatal error inserting an event")
panic(err)
}
t.createTaskEvent()
t.destroyKeys()
}()
if t.task.Status == taskStoppingStatus {
@ -330,6 +327,14 @@ func (t *task) populateDetails() error {
return nil
}
func (t *task) destroyKey(key db.AccessKey) error {
path := key.GetPath()
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil
}
return os.Remove(path)
}
func (t *task) installKey(key db.AccessKey) error {
t.log("access key " + key.Name + " installed")