Semaphore/services/tasks/logging.go
Denis Gukov d1b7ad021e
Runners (#1444)
* feat(runners): add register endpoint

* feat(runners): add remote runner

* refactor(runners): move functionality TaskRunner -> AnsibleJobRunner

* fix(runners): init job

* chore(runners): remote unused field

* feat(runners): use external logger from AnsibleJobRunner

* refactor(runners): remove status field

* refactor(runners): remove mutation from job

* feat(runners): pass username and verison to task

* test(runners): fix tests

* fix(runners): params for Run

* feat(runners): implement runner selection

* feat(runners): fill required fields

* fix(runners): session block

* feat(runners): kill process

* refactor(runners): rename fields to public

* feat(runners): remote runner functionallity

* refactor(runners): remove unused class

* fix(runners): send json

* feat(runners): runner registration

* feat(runners): logging

* feat(runners): server <-> running communication works

* feat(runners): pass creds to runenr
2023-08-29 00:51:04 +02:00

82 lines
1.6 KiB
Go

package tasks
import (
"bufio"
"encoding/json"
log "github.com/Sirupsen/logrus"
"github.com/ansible-semaphore/semaphore/api/sockets"
"github.com/ansible-semaphore/semaphore/util"
"os/exec"
"time"
)
func (t *TaskRunner) Log2(msg string, now time.Time) {
for _, user := range t.users {
b, err := json.Marshal(&map[string]interface{}{
"type": "log",
"output": msg,
"time": now,
"task_id": t.Task.ID,
"project_id": t.Task.ProjectID,
})
util.LogPanic(err)
sockets.Message(user, b)
}
t.pool.logger <- logRecord{
task: t,
output: msg,
time: now,
}
}
func (t *TaskRunner) Log(msg string) {
t.Log2(msg, time.Now())
}
// Readln reads from the pipe
func Readln(r *bufio.Reader) (string, error) {
var (
isPrefix = true
err error
line, ln []byte
)
for isPrefix && err == nil {
line, isPrefix, err = r.ReadLine()
ln = append(ln, line...)
}
return string(ln), err
}
func (t *TaskRunner) logPipe(reader *bufio.Reader) {
line, err := Readln(reader)
for err == nil {
t.Log(line)
line, err = Readln(reader)
}
if err != nil && err.Error() != "EOF" {
//don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error
util.LogWarningWithFields(err, log.Fields{"error": "Failed to read TaskRunner output"})
}
}
func (t *TaskRunner) LogCmd(cmd *exec.Cmd) {
stderr, _ := cmd.StderrPipe()
stdout, _ := cmd.StdoutPipe()
go t.logPipe(bufio.NewReader(stderr))
go t.logPipe(bufio.NewReader(stdout))
}
func (t *TaskRunner) panicOnError(err error, msg string) {
if err != nil {
t.Log(msg)
util.LogPanicWithFields(err, log.Fields{"error": msg})
}
}