Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,43 @@ pipelines:
tasks: # as usual
```

### Error handling with on_error

When a pipeline fails due to a task error, you can optionally configure an `on_error` task that will be executed
to handle the failure. This is useful for sending notifications, cleanup operations, or logging failure details.

```yaml
pipelines:
deployment:
tasks:
build:
script:
- npm run build
deploy:
script:
- ./deploy.sh
depends_on:
- build
on_error:
script:
- echo "Deployment failed! Notifying team..."
- curl -d "Deployment of {{.failedTaskName}} failed" ntfy.sh/mytopic
```

The on_error task has access to special variables containing information about the failed task:

* `failedTaskName`: Name of the task that failed (key from `pipelines`)
* `failedTaskExitCode`: Exit code of the failed task
* `failedTaskError`: Error message of the failed task
* `failedTaskStdout`: Standard output of the failed task
* `failedTaskStderr`: Standard error output of the failed task

#### Important notes:

* The `on_error` task only runs when a "normal" task in the pipeline fails
* If the `on_error` task itself fails, the error will be logged but won't trigger another error handler
* The `on_error` task has access to all the same job variables as regular tasks
* Environment variables can be configured for the `on_error` task just like regular tasks

### Configuring retention period

Expand Down
20 changes: 20 additions & 0 deletions definition/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ func (d TaskDef) Equals(otherDef TaskDef) bool {
return true
}

// OnErrorTaskDef is a special task definition to be executed solely if an error occurs during "normal" task handling.
type OnErrorTaskDef struct {
// Script is a list of shell commands that are executed if an error occurs in a "normal" task
Script []string `yaml:"script"`

// Env sets/overrides environment variables for this task (takes precedence over pipeline environment)
Env map[string]string `yaml:"env"`
}

type PipelineDef struct {
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
Concurrency int `yaml:"concurrency"`
Expand All @@ -60,8 +69,19 @@ type PipelineDef struct {
// Env sets/overrides environment variables for all tasks (takes precedence over process environment)
Env map[string]string `yaml:"env"`

// Tasks is a map of task names to task definitions
Tasks map[string]TaskDef `yaml:"tasks"`

// Task to be added and executed if this pipeline fails, e.g. for notifications.
//
// In this task, you have the following variables set:
// - failedTaskName: Name of the failed task (key from pipelines.yml)
// - failedTaskExitCode: Exit code of the failed task
// - failedTaskError: Error message of the failed task
// - failedTaskStdout: Stdout of the failed task
// - failedTaskStderr: Stderr of the failed task
OnError *OnErrorTaskDef `yaml:"on_error"`

// SourcePath stores the source path where the pipeline was defined
SourcePath string
}
Expand Down
3 changes: 3 additions & 0 deletions examples/pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ pipelines:
no-work:
script:
- go for a walk
on_error:
script:
- echo "Something went wrong, let's handle the error from {{.failedTaskName}}"

queue_it:
concurrency: 2
Expand Down
183 changes: 165 additions & 18 deletions prunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prunner
import (
"context"
"fmt"
"io"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -40,22 +41,27 @@ type PipelineRunner struct {
// persistRequests is for triggering saving-the-store, which is then handled asynchronously, at most every 3 seconds (see NewPipelineRunner)
// externally, call requestPersist()
persistRequests chan struct{}
persistLoopDone chan struct{}

// Mutex for reading or writing jobs and job state
// Mutex for reading or writing pipeline definitions (defs), jobs and job state
mx sync.RWMutex
createTaskRunner func(j *PipelineJob) taskctl.Runner

// Wait group for waiting for asynchronous operations like job.Cancel
wg sync.WaitGroup
// Flag if the runner is shutting down
isShuttingDown bool
// shutdownCancel is the cancel function for the shutdown context (will stop persist loop)
shutdownCancel context.CancelFunc

// Poll interval for completed jobs for graceful shutdown
ShutdownPollInterval time.Duration
}

// NewPipelineRunner creates the central data structure which controls the full runner state; so this knows what is currently running
func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, createTaskRunner func(j *PipelineJob) taskctl.Runner, store store.DataStore, outputStore taskctl.OutputStore) (*PipelineRunner, error) {
ctx, cancel := context.WithCancel(ctx)

pRunner := &PipelineRunner{
defs: defs,
// jobsByID contains ALL jobs, no matter whether they are on the waitlist or are scheduled or cancelled.
Expand All @@ -68,6 +74,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
outputStore: outputStore,
// Use channel buffered with one extra slot, so we can keep save requests while a save is running without blocking
persistRequests: make(chan struct{}, 1),
persistLoopDone: make(chan struct{}),
shutdownCancel: cancel,
createTaskRunner: createTaskRunner,
ShutdownPollInterval: 3 * time.Second,
}
Expand All @@ -79,6 +87,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
}

go func() {
defer close(pRunner.persistLoopDone) // Signal that the persist loop is done on shutdown

for {
select {
case <-ctx.Done():
Expand All @@ -103,7 +113,8 @@ func NewPipelineRunner(ctx context.Context, defs *definition.PipelinesDef, creat
// It can be scheduled (in the waitListByPipeline of PipelineRunner),
// or currently running (jobsByID / jobsByPipeline in PipelineRunner).
type PipelineJob struct {
ID uuid.UUID
ID uuid.UUID
// Identifier of the pipeline (from the YAML file)
Pipeline string
Env map[string]string
Variables map[string]interface{}
Expand All @@ -121,6 +132,8 @@ type PipelineJob struct {
// Tasks is an in-memory representation with state of tasks, sorted by dependencies
Tasks jobTasks
LastError error
// firstFailedTask is a reference to the first task that failed in this job
firstFailedTask *jobTask

sched *taskctl.Scheduler
taskRunner runner.Runner
Expand Down Expand Up @@ -194,6 +207,10 @@ var ErrJobNotFound = errors.New("job not found")
var errJobAlreadyCompleted = errors.New("job is already completed")
var ErrShuttingDown = errors.New("runner is shutting down")

// ScheduleAsync schedules a pipeline execution, if pipeline concurrency config allows for it.
// "pipeline" is the pipeline ID from the YAML file.
//
// the returned PipelineJob is the individual execution context.
func (r *PipelineRunner) ScheduleAsync(pipeline string, opts ScheduleOpts) (*PipelineJob, error) {
r.mx.Lock()
defer r.mx.Unlock()
Expand Down Expand Up @@ -395,14 +412,142 @@ func (r *PipelineRunner) startJob(job *PipelineJob) {

// Run graph asynchronously
r.wg.Add(1)
go func() {
go func(sched *taskctl.Scheduler) {
defer r.wg.Done()
lastErr := job.sched.Schedule(graph)
r.JobCompleted(job.ID, lastErr)
}()
lastErr := sched.Schedule(graph)
if lastErr != nil {
r.RunJobErrorHandler(job)
}
r.JobCompleted(job, lastErr)
}(job.sched)
}
func (r *PipelineRunner) RunJobErrorHandler(job *PipelineJob) {
r.mx.Lock()
errorGraph, err := r.buildErrorGraph(job)
r.mx.Unlock()
if err != nil {
log.
WithError(err).
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Error("Failed to build error pipeline graph")
// At this point, an error with the error handling happened - duh...
// Nothing we can do at this point.
return
}

// if errorGraph is nil (and no error); no error handling configured for task.
if errorGraph == nil {
return
}

// re-init scheduler, as we need a new one to schedule the error on. (the old one is already shut down
// if ContinueRunningTasksAfterFailure == false)
r.mx.Lock()
r.initScheduler(job)
r.mx.Unlock()

err = job.sched.Schedule(errorGraph)

if err != nil {
log.
WithError(err).
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Error("Failed to run error handling for job")
} else {
log.
WithField("jobID", job.ID).
WithField("pipeline", job.Pipeline).
Info("error handling completed")
}
}

const OnErrorTaskName = "on_error"

func (r *PipelineRunner) buildErrorGraph(job *PipelineJob) (*scheduler.ExecutionGraph, error) {
pipelineDef, pipelineDefExists := r.defs.Pipelines[job.Pipeline]
if !pipelineDefExists {
return nil, fmt.Errorf("pipeline definition not found for pipeline %s (should never happen)", job.Pipeline)
}
onErrorTaskDef := pipelineDef.OnError
if onErrorTaskDef == nil {
// no error, but no error handling configured
return nil, nil
}

failedTask := job.firstFailedTask

failedTaskStdout := r.readTaskOutputBestEffort(job, failedTask, "stdout")
failedTaskStderr := r.readTaskOutputBestEffort(job, failedTask, "stderr")

onErrorVariables := make(map[string]interface{})
for key, value := range job.Variables {
onErrorVariables[key] = value
}

if failedTask != nil {
onErrorVariables["failedTaskName"] = failedTask.Name
onErrorVariables["failedTaskExitCode"] = failedTask.ExitCode
onErrorVariables["failedTaskError"] = failedTask.Error
onErrorVariables["failedTaskStdout"] = string(failedTaskStdout)
onErrorVariables["failedTaskStderr"] = string(failedTaskStderr)
}

onErrorJobTask := jobTask{
TaskDef: definition.TaskDef{
Script: onErrorTaskDef.Script,
// AllowFailure needs to be false, otherwise lastError below won't be filled (so errors will not appear in the log)
AllowFailure: false,
Env: onErrorTaskDef.Env,
},
Name: OnErrorTaskName,
Status: toStatus(scheduler.StatusWaiting),
}
job.Tasks = append(job.Tasks, onErrorJobTask)

return buildPipelineGraph(job.ID, jobTasks{onErrorJobTask}, onErrorVariables)
}

// HandleTaskChange will be called when the task state changes in the task runner
func (r *PipelineRunner) readTaskOutputBestEffort(job *PipelineJob, task *jobTask, outputName string) []byte {
if task == nil || job == nil {
return []byte(nil)
}

rc, err := r.outputStore.Reader(job.ID.String(), task.Name, outputName)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", job.ID.String()).
WithField("pipeline", job.Pipeline).
WithField("failedTaskName", task.Name).
WithField("outputName", outputName).
WithError(err).
Debug("Could not create stderrReader for failed task")
return []byte(nil)
} else {
defer func(rc io.ReadCloser) {
_ = rc.Close()
}(rc)
outputAsBytes, err := io.ReadAll(rc)
if err != nil {
log.
WithField("component", "runner").
WithField("jobID", job.ID.String()).
WithField("pipeline", job.Pipeline).
WithField("failedTaskName", task.Name).
WithField("outputName", outputName).
WithError(err).
Debug("Could not read output of task")
}

return outputAsBytes
}

}

// HandleTaskChange will be called when the task state changes in the task runner (taskctl)
// it is short-lived and updates our JobTask state accordingly.
func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
r.mx.Lock()
defer r.mx.Unlock()
Expand Down Expand Up @@ -437,11 +582,15 @@ func (r *PipelineRunner) HandleTaskChange(t *task.Task) {
jt.Error = t.Error
}

// if the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is set to FALSE),
// If the task has errored, and we want to fail-fast (ContinueRunningTasksAfterFailure is false),
// then we directly abort all other tasks of the job.
// NOTE: this is NOT the context.Canceled case from above (if a job is explicitly aborted), but only
// if one task failed, and we want to kill the other tasks.
if jt.Errored {
if j.firstFailedTask == nil {
// Remember the first failed task for later use in the error handling
j.firstFailedTask = jt
}
pipelineDef, found := r.defs.Pipelines[j.Pipeline]
if found && !pipelineDef.ContinueRunningTasksAfterFailure {
log.
Expand Down Expand Up @@ -484,15 +633,10 @@ func (r *PipelineRunner) HandleStageChange(stage *scheduler.Stage) {
r.requestPersist()
}

func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) {
func (r *PipelineRunner) JobCompleted(job *PipelineJob, err error) {
r.mx.Lock()
defer r.mx.Unlock()

job := r.jobsByID[id]
if job == nil {
return
}

job.deinitScheduler()

job.Completed = true
Expand All @@ -508,7 +652,7 @@ func (r *PipelineRunner) JobCompleted(id uuid.UUID, err error) {
pipeline := job.Pipeline
log.
WithField("component", "runner").
WithField("jobID", id).
WithField("jobID", job.ID).
WithField("pipeline", pipeline).
Debug("Job completed")

Expand Down Expand Up @@ -710,9 +854,6 @@ func (r *PipelineRunner) initialLoadFromStore() error {
}

func (r *PipelineRunner) SaveToStore() {
r.wg.Add(1)
defer r.wg.Done()

log.
WithField("component", "runner").
Debugf("Saving job state to data store")
Expand Down Expand Up @@ -807,15 +948,21 @@ func (r *PipelineRunner) Shutdown(ctx context.Context) error {
log.
WithField("component", "runner").
Debugf("Shutting down, waiting for pending operations...")

// Wait for all running jobs to have called JobCompleted
r.wg.Wait()

// Wait until the persist loop is done
<-r.persistLoopDone

// Do a final save to include the state of recently completed jobs
r.SaveToStore()
}()

r.mx.Lock()
r.isShuttingDown = true
r.shutdownCancel()

// Cancel all jobs on wait list
for pipelineName, jobs := range r.waitListByPipeline {
for _, job := range jobs {
Expand Down
Loading