refactor(logs): use Scanner

This commit is contained in:
Denis Gukov 2025-01-05 15:29:35 +05:00
parent a7e65448fb
commit e64ffae03c
No known key found for this signature in database
GPG Key ID: 044381366A5D4731
6 changed files with 75 additions and 21 deletions

View File

@ -109,7 +109,7 @@ func (d *SqlDb) UpdateTask(task db.Task) error {
func (d *SqlDb) CreateTaskOutput(output db.TaskOutput) (db.TaskOutput, error) {
_, err := d.exec(
"insert into task__output (task_id, task, output, time) VALUES (?, '', ?, ?)",
"insert into task__output (task_id, output, time) VALUES (?, ?, ?)",
output.TaskID,
output.Output,
output.Time.UTC())

View File

@ -141,7 +141,13 @@ func (t *TerraformApp) init(environmentVars []string, params *db.TerraformTaskPa
return err
}
return cmd.Wait()
err = cmd.Wait()
if err != nil {
return err
}
t.Logger.WaitLog()
return nil
}
func (t *TerraformApp) isWorkspacesSupported(environmentVars []string) bool {
@ -157,12 +163,19 @@ func (t *TerraformApp) isWorkspacesSupported(environmentVars []string) bool {
func (t *TerraformApp) selectWorkspace(workspace string, environmentVars []string) error {
cmd := t.makeCmd(t.Name, []string{"workspace", "select", "-or-create=true", workspace}, environmentVars)
t.Logger.LogCmd(cmd)
err := cmd.Start()
if err != nil {
return err
}
return cmd.Wait()
err = cmd.Wait()
if err != nil {
return err
}
t.Logger.WaitLog()
return nil
}
func (t *TerraformApp) InstallRequirements(environmentVars []string, params interface{}) (err error) {
@ -204,8 +217,16 @@ func (t *TerraformApp) Plan(args []string, environmentVars []string, inputs map[
if err != nil {
return err
}
cb(cmd.Process)
return cmd.Wait()
err = cmd.Wait()
if err != nil {
return err
}
t.Logger.WaitLog()
return nil
}
func (t *TerraformApp) Apply(args []string, environmentVars []string, inputs map[string]string, cb func(*os.Process)) error {
@ -218,7 +239,14 @@ func (t *TerraformApp) Apply(args []string, environmentVars []string, inputs map
return err
}
cb(cmd.Process)
return cmd.Wait()
err = cmd.Wait()
if err != nil {
return err
}
t.Logger.WaitLog()
return nil
}
func (t *TerraformApp) Run(args LocalAppRunningArgs) error {

View File

@ -60,4 +60,6 @@ type Logger interface {
AddLogListener(l LogListener)
SetCommit(hash, message string)
WaitLog()
}

View File

@ -3,7 +3,9 @@ package runners
import (
"bufio"
"fmt"
"io"
"os/exec"
"sync"
"time"
"github.com/semaphoreui/semaphore/pkg/task_logger"
@ -20,6 +22,8 @@ type runningJob struct {
statusListeners []task_logger.StatusListener
logListeners []task_logger.LogListener
logWG sync.WaitGroup
}
func (p *runningJob) AddStatusListener(l task_logger.StatusListener) {
@ -59,8 +63,12 @@ func (p *runningJob) LogCmd(cmd *exec.Cmd) {
stderr, _ := cmd.StderrPipe()
stdout, _ := cmd.StdoutPipe()
go p.logPipe(bufio.NewReader(stderr))
go p.logPipe(bufio.NewReader(stdout))
go p.logPipe(stderr)
go p.logPipe(stdout)
}
func (p *runningJob) WaitLog() {
p.logWG.Wait()
}
func (p *runningJob) SetCommit(hash, message string) {
@ -83,15 +91,19 @@ func (p *runningJob) SetStatus(status task_logger.TaskStatus) {
}
}
func (p *runningJob) logPipe(reader *bufio.Reader) {
line, err := tasks.Readln(reader)
for err == nil {
func (p *runningJob) logPipe(reader io.Reader) {
p.logWG.Add(1)
defer p.logWG.Done()
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
p.Log(line)
line, err = tasks.Readln(reader)
}
if err != nil && err.Error() != "EOF" {
if scanner.Err() != nil && scanner.Err().Error() != "EOF" {
//don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error
util.LogWarningF(err, log.Fields{"error": "Failed to read TaskRunner output"})
util.LogWarningF(scanner.Err(), log.Fields{"error": "Failed to read TaskRunner output"})
}
}

View File

@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/semaphoreui/semaphore/api/sockets"
@ -43,6 +44,8 @@ type TaskRunner struct {
logListeners []task_logger.LogListener
Alias string
logWG sync.WaitGroup
}
func (t *TaskRunner) AddStatusListener(l task_logger.StatusListener) {

View File

@ -4,6 +4,7 @@ import (
"bufio"
"encoding/json"
"fmt"
"io"
"os/exec"
"time"
@ -54,8 +55,12 @@ func (t *TaskRunner) LogCmd(cmd *exec.Cmd) {
stderr, _ := cmd.StderrPipe()
stdout, _ := cmd.StdoutPipe()
go t.logPipe(bufio.NewReader(stderr))
go t.logPipe(bufio.NewReader(stdout))
go t.logPipe(stderr)
go t.logPipe(stdout)
}
func (t *TaskRunner) WaitLog() {
t.logWG.Wait()
}
func (t *TaskRunner) SetCommit(hash, message string) {
@ -131,17 +136,21 @@ func (t *TaskRunner) panicOnError(err error, msg string) {
}
}
func (t *TaskRunner) logPipe(reader *bufio.Reader) {
line, err := Readln(reader)
func (t *TaskRunner) logPipe(reader io.Reader) {
t.logWG.Add(1)
defer t.logWG.Done()
for err == nil {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
fmt.Println(line)
t.Log(line)
line, err = Readln(reader)
}
if err.Error() != "EOF" {
if scanner.Err() != nil && scanner.Err().Error() != "EOF" {
//don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error
util.LogWarningF(err, log.Fields{"error": "Failed to read TaskRunner output"})
util.LogWarningF(scanner.Err(), log.Fields{"error": "Failed to read TaskRunner output"})
}
}