diff --git a/README.md b/README.md index 131c8be..46ded0f 100644 --- a/README.md +++ b/README.md @@ -281,6 +281,43 @@ pipelines: tasks: # as usual ``` +### Error handling with on_error + +When a pipeline fails due to a task error, you can optionally configure an `on_error` task that will be executed +to handle the failure. This is useful for sending notifications, cleanup operations, or logging failure details. + +```yaml +pipelines: + deployment: + tasks: + build: + script: + - npm run build + deploy: + script: + - ./deploy.sh + depends_on: + - build + on_error: + script: + - echo "Deployment failed! Notifying team..." + - curl -d "Deployment of {{.failedTaskName}} failed" ntfy.sh/mytopic +``` + +The on_error task has access to special variables containing information about the failed task: + +* `failedTaskName`: Name of the task that failed (key from `pipelines`) +* `failedTaskExitCode`: Exit code of the failed task +* `failedTaskError`: Error message of the failed task +* `failedTaskStdout`: Standard output of the failed task +* `failedTaskStderr`: Standard error output of the failed task + +#### Important notes: + +* The `on_error` task only runs when a "normal" task in the pipeline fails +* If the `on_error` task itself fails, the error will be logged but won't trigger another error handler +* The `on_error` task has access to all the same job variables as regular tasks +* Environment variables can be configured for the `on_error` task just like regular tasks ### Configuring retention period diff --git a/definition/pipelines.go b/definition/pipelines.go index 37989f1..e5d3114 100644 --- a/definition/pipelines.go +++ b/definition/pipelines.go @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool { return true } +// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling. +type OnErrorTaskDef struct { + // Script is a list of shell commands that are executed if an error occurs in a "normal" task + Script []string `yaml:"script"` + + // Env sets/overrides environment variables for this task (takes precedence over pipeline environment) + Env map[string]string `yaml:"env"` +} + type PipelineDef struct { // Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1) Concurrency int `yaml:"concurrency"` @@ -60,8 +69,19 @@ type PipelineDef struct { // Env sets/overrides environment variables for all tasks (takes precedence over process environment) Env map[string]string `yaml:"env"` + // Tasks is a map of task names to task definitions Tasks map[string]TaskDef `yaml:"tasks"` + // Task to be added and executed if this pipeline fails, e.g. for notifications. + // + // In this task, you have the following variables set: + // - failedTaskName: Name of the failed task (key from pipelines.yml) + // - failedTaskExitCode: Exit code of the failed task + // - failedTaskError: Error message of the failed task + // - failedTaskStdout: Stdout of the failed task + // - failedTaskStderr: Stderr of the failed task + OnError *OnErrorTaskDef `yaml:"on_error"` + // SourcePath stores the source path where the pipeline was defined SourcePath string } diff --git a/examples/pipelines.yml b/examples/pipelines.yml index cb70203..00b2b54 100644 --- a/examples/pipelines.yml +++ b/examples/pipelines.yml @@ -60,6 +60,9 @@ pipelines: no-work: script: - go for a walk + on_error: + script: + - echo "Something went wrong, let's handle the error from {{.failedTaskName}}" queue_it: concurrency: 2 diff --git a/prunner.go b/prunner.go index 1ee864d..54531ab 100644 --- a/prunner.go +++ b/prunner.go @@ -3,6 +3,7 @@ package prunner import ( "context" "fmt" + "io" "sort" "sync" "time" @@ -40,8 +41,9 @@ type PipelineRunner struct { // persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see NewPipelineRunner) // externally, call requestPersist() persistRequests chan struct{} + persistLoopDone chan struct{} - // Mutex for reading or writing jobs and job state + // Mutex for reading or writing pipeline definitions (defs), jobs and job state mx sync.RWMutex createTaskRunner func(j *PipelineJob) taskctl.Runner @@ -49,6 +51,8 @@ type PipelineRunner struct { wg sync.WaitGroup // Flag if the runner is shutting down isShuttingDown bool + // shutdownCancel is the cancel function for the shutdown context (will stop persist loop) + shutdownCancel context.CancelFunc // Poll interval for completed jobs for graceful shutdown ShutdownPollInterval time.Duration @@ -56,6 +60,8 @@ type PipelineRunner struct { // NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, createTaskRunner func(j *PipelineJob) taskctl.Runner, store store.DataStore, outputStore taskctl.OutputStore) (*PipelineRunner, error) { + ctx, cancel := context.WithCancel(ctx) + pRunner := &PipelineRunner{ defs: defs, // jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled. @@ -68,6 +74,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat outputStore: outputStore, // Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking persistRequests: make(chan struct{}, 1), + persistLoopDone: make(chan struct{}), + shutdownCancel: cancel, createTaskRunner: createTaskRunner, ShutdownPollInterval: 3 * time.Second, } @@ -79,6 +87,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat } go func() { + defer close(pRunner.persistLoopDone) // Signal that the persist loop is done on shutdown + for { select { case <-ctx.Done(): @@ -103,7 +113,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat // It can be scheduled (in the waitListByPipeline of PipelineRunner), // or currently running (jobsByID / jobsByPipeline in PipelineRunner). type PipelineJob struct { - ID uuid.UUID + ID uuid.UUID + // Identifier of the pipeline (from the YAML file) Pipeline string Env map[string]string Variables map[string]interface{} @@ -121,6 +132,8 @@ type PipelineJob struct { // Tasks is an in-memory representation with state of tasks, sorted by dependencies Tasks jobTasks LastError error + // firstFailedTask is a reference to the first task that failed in this job + firstFailedTask *jobTask sched *taskctl.Scheduler taskRunner runner.Runner @@ -194,6 +207,10 @@ var ErrJobNotFound = errors.New("job not found") var errJobAlreadyCompleted = errors.New("job is already completed") var ErrShuttingDown = errors.New("runner is shutting down") +// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it. +// "pipeline" is the pipeline ID from the YAML file. +// +// the returned PipelineJob is the individual execution context. func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) { r.mx.Lock() defer r.mx.Unlock() @@ -395,14 +412,142 @@ func (r *PipelineRunner) startJob(job *PipelineJob) { // Run graph asynchronously r.wg.Add(1) - go func() { + go func(sched *taskctl.Scheduler) { defer r.wg.Done() - lastErr := job.sched.Schedule(graph) - r.JobCompleted(job.ID, lastErr) - }() + lastErr := sched.Schedule(graph) + if lastErr != nil { + r.RunJobErrorHandler(job) + } + r.JobCompleted(job, lastErr) + }(job.sched) +} +func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) { + r.mx.Lock() + errorGraph, err := r.buildErrorGraph(job) + r.mx.Unlock() + if err != nil { + log. + WithError(err). + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Error("Failed to build error pipeline graph") + // At this point, an error with the error handling happened - duh... + // Nothing we can do at this point. + return + } + + // if errorGraph is nil (and no error); no error handling configured for task. + if errorGraph == nil { + return + } + + // re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down + // if ContinueRunningTasksAfterFailure == false) + r.mx.Lock() + r.initScheduler(job) + r.mx.Unlock() + + err = job.sched.Schedule(errorGraph) + + if err != nil { + log. + WithError(err). + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Error("Failed to run error handling for job") + } else { + log. + WithField("jobID", job.ID). + WithField("pipeline", job.Pipeline). + Info("error handling completed") + } +} + +const OnErrorTaskName = "on_error" + +func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) { + pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline] + if !pipelineDefExists { + return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline) + } + onErrorTaskDef := pipelineDef.OnError + if onErrorTaskDef == nil { + // no error, but no error handling configured + return nil, nil + } + + failedTask := job.firstFailedTask + + failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout") + failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr") + + onErrorVariables := make(map[string]interface{}) + for key, value := range job.Variables { + onErrorVariables[key] = value + } + + if failedTask != nil { + onErrorVariables["failedTaskName"] = failedTask.Name + onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode + onErrorVariables["failedTaskError"] = failedTask.Error + onErrorVariables["failedTaskStdout"] = string(failedTaskStdout) + onErrorVariables["failedTaskStderr"] = string(failedTaskStderr) + } + + onErrorJobTask := jobTask{ + TaskDef: definition.TaskDef{ + Script: onErrorTaskDef.Script, + // AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log) + AllowFailure: false, + Env: onErrorTaskDef.Env, + }, + Name: OnErrorTaskName, + Status: toStatus(scheduler.StatusWaiting), + } + job.Tasks = append(job.Tasks, onErrorJobTask) + + return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables) } -// HandleTaskChange will be called when the task state changes in the task runner +func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte { + if task == nil || job == nil { + return []byte(nil) + } + + rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", job.ID.String()). + WithField("pipeline", job.Pipeline). + WithField("failedTaskName", task.Name). + WithField("outputName", outputName). + WithError(err). + Debug("Could not create stderrReader for failed task") + return []byte(nil) + } else { + defer func(rc io.ReadCloser) { + _ = rc.Close() + }(rc) + outputAsBytes, err := io.ReadAll(rc) + if err != nil { + log. + WithField("component", "runner"). + WithField("jobID", job.ID.String()). + WithField("pipeline", job.Pipeline). + WithField("failedTaskName", task.Name). + WithField("outputName", outputName). + WithError(err). + Debug("Could not read output of task") + } + + return outputAsBytes + } + +} + +// HandleTaskChange will be called when the task state changes in the task runner (taskctl) +// it is short-lived and updates our JobTask state accordingly. func (r *PipelineRunner) HandleTaskChange(t *task.Task) { r.mx.Lock() defer r.mx.Unlock() @@ -437,11 +582,15 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) { jt.Error = t.Error } - // if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE), + // If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false), // then we directly abort all other tasks of the job. // NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only // if one task failed, and we want to kill the other tasks. if jt.Errored { + if j.firstFailedTask == nil { + // Remember the first failed task for later use in the error handling + j.firstFailedTask = jt + } pipelineDef, found := r.defs.Pipelines[j.Pipeline] if found && !pipelineDef.ContinueRunningTasksAfterFailure { log. @@ -484,15 +633,10 @@ func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) { r.requestPersist() } -func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) { +func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) { r.mx.Lock() defer r.mx.Unlock() - job := r.jobsByID[id] - if job == nil { - return - } - job.deinitScheduler() job.Completed = true @@ -508,7 +652,7 @@ func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) { pipeline := job.Pipeline log. WithField("component", "runner"). - WithField("jobID", id). + WithField("jobID", job.ID). WithField("pipeline", pipeline). Debug("Job completed") @@ -710,9 +854,6 @@ func (r *PipelineRunner) initialLoadFromStore() error { } func (r *PipelineRunner) SaveToStore() { - r.wg.Add(1) - defer r.wg.Done() - log. WithField("component", "runner"). Debugf("Saving job state to data store") @@ -807,15 +948,21 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error { log. WithField("component", "runner"). Debugf("Shutting down, waiting for pending operations...") + // Wait for all running jobs to have called JobCompleted r.wg.Wait() + // Wait until the persist loop is done + <-r.persistLoopDone + // Do a final save to include the state of recently completed jobs r.SaveToStore() }() r.mx.Lock() r.isShuttingDown = true + r.shutdownCancel() + // Cancel all jobs on wait list for pipelineName, jobs := range r.waitListByPipeline { for _, job := range jobs { diff --git a/prunner_test.go b/prunner_test.go index cb1d1e1..83d2c89 100644 --- a/prunner_test.go +++ b/prunner_test.go @@ -161,6 +161,7 @@ func TestPipelineRunner_ScheduleAsync_WithEmptyScriptTask(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) } func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { @@ -213,6 +214,7 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { require.NoError(t, err) waitForCompletedJob(t, pRunner, job.ID) + assert.NoError(t, job.LastError) pipelineVarTaskOutput := store.GetBytes(job.ID.String(), "pipeline_var", "stdout") assert.Equal(t, "from pipeline,from pipeline,from process", string(pipelineVarTaskOutput), "output of pipeline_var") @@ -221,6 +223,203 @@ func TestPipelineRunner_ScheduleAsync_WithEnvVars(t *testing.T) { assert.Equal(t, "from task,from pipeline,from process", string(taskVarTaskOutput), "output of task_var") } +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "c": { + Script: []string{"sleep 10; exit 42"}, + DependsOn: []string{"b"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + DependsOn: []string{"a"}, + }, + "wait": { + DependsOn: []string{"a", "b", "c"}, + }, + }, + OnError: &definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "echo 'Failed Task Name: {{ .failedTaskName }}'", + "echo 'Failed Task Exit Code: {{ .failedTaskExitCode }}'", + "echo 'Failed Task Error: {{ .failedTaskError }}'", + "echo 'Failed Task Stdout: {{ .failedTaskStdout }}'", + "echo 'Failed Task Stderr: {{ .failedTaskStderr }}'", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + res := mockOutputStore.GetBytes(job.ID.String(), "on_error", "stdout") + assert.Error(t, job.LastError) + assert.Equal(t, `ON_ERROR +Failed Task Name: b +Failed Task Exit Code: 42 +Failed Task Error: exit status 42 +Failed Task Stdout: stdoutContent + +Failed Task Stderr: This message goes to stderr + +`, string(res)) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") + assert.False(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "done", jt.Status, "task has status done") + assert.Nil(t, jt.Error, "task has no error set") + } +} + +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookFails(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: &definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "exit 1", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + assert.Error(t, job.LastError) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") + assert.True(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "error", jt.Status, "task has status done") + assert.NotNil(t, jt.Error, "task has no error set") + } +} + +func TestPipelineRunner_ScheduleAsync_WithFailingScript_TriggersOnErrorHook_AndSetsStateCorrectlyIfErrorHookUsesUnknownVariables(t *testing.T) { + var defs = &definition.PipelinesDef{ + Pipelines: map[string]definition.PipelineDef{ + "erroring_script": { + // Concurrency of 1 is the default for a single concurrent execution + Concurrency: 1, + QueueLimit: nil, + Tasks: map[string]definition.TaskDef{ + "a": { + Script: []string{"echo A"}, + }, + "b": { + Script: []string{ + "echo stdoutContent", + "echo This message goes to stderr >&2", + "exit 42", + }, + }, + "wait": { + DependsOn: []string{"a", "b"}, + }, + }, + OnError: &definition.OnErrorTaskDef{ + Script: []string{ + "echo ON_ERROR", + "echo {{ .does_not_exist_and_error }}", + }, + }, + SourcePath: "fixtures", + }, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockOutputStore := test.NewMockOutputStore() + pRunner, err := NewPipelineRunner(ctx, defs, func(j *PipelineJob) taskctl.Runner { + // Use a real runner here to test the actual processing of a task.Task + taskRunner, _ := taskctl.NewTaskRunner(mockOutputStore) + return taskRunner + }, nil, mockOutputStore) + require.NoError(t, err) + + job, err := pRunner.ScheduleAsync("erroring_script", ScheduleOpts{}) + require.NoError(t, err) + + waitForCompletedJob(t, pRunner, job.ID) + assert.Error(t, job.LastError) + + jt := job.Tasks.ByName("on_error") + if assert.NotNil(t, jt) { + assert.False(t, jt.Canceled, "on_error task was not marked as canceled") + assert.True(t, jt.Errored, "task was not marked as errored") + assert.Equal(t, "error", jt.Status, "task has status done") + assert.NotNil(t, jt.Error, "task has no error set") + assert.Equal(t, "template: interpolate:1:8: executing \"interpolate\" at <.does_not_exist_and_error>: map has no entry for key \"does_not_exist_and_error\"", jt.Error.Error(), "task has wrong error message") + } +} + func TestPipelineRunner_CancelJob_WithRunningJob(t *testing.T) { var defs = &definition.PipelinesDef{ Pipelines: map[string]definition.PipelineDef{ @@ -325,6 +524,7 @@ func TestPipelineRunner_CancelJob_WithQueuedJob(t *testing.T) { waitForCompletedJob(t, pRunner, job1.ID) waitForCanceledJob(t, pRunner, job2.ID) waitForCompletedJob(t, pRunner, job3.ID) + assert.NoError(t, job1.LastError) assert.Nil(t, job2.Start, "job 2 should not be started") assert.Equal(t, true, job2.Tasks.ByName("sleep").Canceled, "job 2 task was marked as canceled") @@ -426,6 +626,7 @@ func TestPipelineRunner_FirstErroredTaskShouldCancelAllRunningTasks_ByDefault(t jobID := job.ID waitForCompletedJob(t, pRunner, jobID) + assert.Error(t, job.LastError) assert.True(t, job.Tasks.ByName("err").Errored, "err task was errored") assert.True(t, job.Tasks.ByName("ok").Canceled, "ok task should be cancelled") diff --git a/taskctl/runner.go b/taskctl/runner.go index fc1dec1..d5d500b 100644 --- a/taskctl/runner.go +++ b/taskctl/runner.go @@ -166,8 +166,8 @@ func (r *TaskRunner) Run(t *task.Task) error { // but this lead to a huge memory leak because the full job output was retained // in memory forever. // This enabled features of taskctl like {{ .Tasks.TASKNAME.Output }} and {{.Output}}, - // but we never promised these features. Thus it is fine to not log to stdout and stderr - // into a Buffer, but directly to a file. + // but we never promised these features. Thus, it is fine to not log stdout and stderr + // into a Buffer, but directly to the output store. stdoutWriter []io.Writer stderrWriter []io.Writer ) diff --git a/taskctl/scheduler.go b/taskctl/scheduler.go index 7ae2271..d2c6a69 100644 --- a/taskctl/scheduler.go +++ b/taskctl/scheduler.go @@ -39,6 +39,7 @@ func (s *Scheduler) OnStageChange(f func(stage *scheduler.Stage)) { // Schedule starts execution of the given ExecutionGraph // // Modified to notify on stage changes +// Method blocks until FULL EXECUTION is completed func (s *Scheduler) Schedule(g *scheduler.ExecutionGraph) error { var wg sync.WaitGroup