diff --git a/.gitignore b/.gitignore index 66fd13c..d4c13f3 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +.CLAUDE/ +CLAUDE.md diff --git a/brokers/in-memory/broker.go b/brokers/in-memory/broker.go index 312760f..9b8dc00 100644 --- a/brokers/in-memory/broker.go +++ b/brokers/in-memory/broker.go @@ -23,6 +23,9 @@ func New() *Broker { } func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { + // Ensure work channel is closed when producer (broker) is done + defer close(work) + r.mu.RLock() ch, ok := r.queues[queue] r.mu.RUnlock() diff --git a/brokers/nats-js/broker.go b/brokers/nats-js/broker.go index deee68e..88c7de6 100644 --- a/brokers/nats-js/broker.go +++ b/brokers/nats-js/broker.go @@ -96,6 +96,23 @@ func (b *Broker) Enqueue(_ context.Context, msg []byte, queue string) error { return nil } +// TODO: CRITICAL - This implementation has goroutine leak issues that prevent graceful shutdown: +// +// 1. SUBSCRIPTION NOT CLEANED UP: The NATS subscription is never unsubscribed when context +// is cancelled, meaning the subscription callback continues running indefinitely. +// +// 2. CALLBACK GOROUTINE HANGS: The callback function may block forever on "work <- msg.Data" +// if the work channel is full or no longer being read, preventing graceful shutdown. +// +// 3. NO DRAIN/UNSUBSCRIBE: Should call subscription.Drain() or subscription.Unsubscribe() +// when context is cancelled to properly clean up NATS resources. +// +// This causes Server.Start() WaitGroup to hang indefinitely because s.consume() never exits. +// A proper fix would store the subscription and clean it up: +// +// sub, err := b.conn.Subscribe(queue, callback, opts...) +// defer sub.Drain() // or sub.Unsubscribe() +// <-ctx.Done() func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { _, err := b.conn.Subscribe(queue, func(msg *nats.Msg) { work <- msg.Data diff --git a/brokers/redis/broker.go b/brokers/redis/broker.go index b7c4d99..df8e295 100644 --- a/brokers/redis/broker.go +++ b/brokers/redis/broker.go @@ -123,6 +123,8 @@ func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, } func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { + // Ensure work channel is closed when producer (broker) is done + defer close(work) go b.consumeScheduled(ctx, queue) for { diff --git a/chains.go b/chains.go index 9748ad9..1c188d7 100644 --- a/chains.go +++ b/chains.go @@ -95,7 +95,7 @@ func (s *Server) GetChain(ctx context.Context, id string) (ChainMessage, error) // Fetch the current job, to check its status currJob, err := s.GetJob(ctx, c.JobID) if err != nil { - return ChainMessage{}, nil + return ChainMessage{}, fmt.Errorf("failed to get current job %s in chain %s: %w", c.JobID, id, err) } checkJobs: @@ -117,16 +117,17 @@ checkJobs: if len(currJob.OnSuccessIDs) == 0 { c.Status = StatusDone } else { - currJob, err = s.GetJob(ctx, currJob.OnSuccessIDs[0]) + nextJobID := currJob.OnSuccessIDs[0] + currJob, err = s.GetJob(ctx, nextJobID) if err != nil { - return ChainMessage{}, nil + return ChainMessage{}, fmt.Errorf("failed to get next job %s in chain %s: %w", nextJobID, id, err) } goto checkJobs } } if err = s.setChainMessage(ctx, c); err != nil { - return ChainMessage{}, nil + return ChainMessage{}, fmt.Errorf("failed to save chain message for chain %s: %w", id, err) } return c, nil diff --git a/server.go b/server.go index 2c1f4d2..f463c18 100644 --- a/server.go +++ b/server.go @@ -208,13 +208,14 @@ func (s *Server) GetSuccess(ctx context.Context) ([]string, error) { // Start() starts the job consumer and processor. It is a blocking function. func (s *Server) Start(ctx context.Context) { - go s.cron.Start() // Loop over each registered queue. s.q.RLock() queues := s.queues s.q.RUnlock() var wg sync.WaitGroup + + s.startCronScheduler(ctx, &wg) for q, conc := range queues { q := q // Hack to fix the loop variable capture issue. if s.traceProv != nil { @@ -258,41 +259,43 @@ func (s *Server) process(ctx context.Context, w chan []byte) { defer span.End() } - select { - case <-ctx.Done(): - s.log.Info("shutting down processor..") + // Channel-only shutdown: wait for work or channel closure + work, ok := <-w + if !ok { + // Channel closed by broker - clean shutdown + s.log.Info("work channel closed, shutting down processor..") return - case work := <-w: - var ( - msg JobMessage - err error - ) - // Decode the bytes into a job message - if err = msgpack.Unmarshal(work, &msg); err != nil { - s.spanError(span, err) - s.log.Error("error unmarshalling task", "error", err) - break - } + } - // Fetch the registered task handler. - task, err := s.getHandler(msg.Job.Task) - if err != nil { - s.spanError(span, err) - s.log.Error("handler not found", "error", err) - break - } + var ( + msg JobMessage + err error + ) + // Decode the bytes into a job message + if err = msgpack.Unmarshal(work, &msg); err != nil { + s.spanError(span, err) + s.log.Error("error unmarshalling task", "error", err) + continue + } - // Set the job status as being "processed" - if err := s.statusProcessing(ctx, msg); err != nil { - s.spanError(span, err) - s.log.Error("error setting the status to processing", "error", err) - break - } + // Fetch the registered task handler. + task, err := s.getHandler(msg.Job.Task) + if err != nil { + s.spanError(span, err) + s.log.Error("handler not found", "error", err) + continue + } - if err := s.execJob(ctx, msg, task); err != nil { - s.spanError(span, err) - s.log.Error("could not execute job", "error", err) - } + // Set the job status as being "processed" + if err := s.statusProcessing(ctx, msg); err != nil { + s.spanError(span, err) + s.log.Error("error setting the status to processing", "error", err) + continue + } + + if err := s.execJob(ctx, msg, task); err != nil { + s.spanError(span, err) + s.log.Error("could not execute job", "error", err) } } } @@ -570,3 +573,38 @@ func (s *Server) spanError(sp spans.Span, err error) { sp.SetStatus(codes.Error, err.Error()) } } + +// startCronScheduler starts the cron scheduler and manages its lifecycle. +// It handles graceful shutdown when the context is cancelled. +func (s *Server) startCronScheduler(ctx context.Context, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + + // Check if context is already cancelled before starting cron + select { + case <-ctx.Done(): + s.log.Debug("context cancelled before cron start") + return + default: + } + + s.cron.Start() + s.log.Debug("cron scheduler started") + + // Wait for shutdown signal + <-ctx.Done() + s.log.Info("shutting down cron scheduler...") + + // Stop cron with timeout protection + cronCtx := s.cron.Stop() + shutdownTimeout := 5 * time.Second + + select { + case <-cronCtx.Done(): + s.log.Debug("cron scheduler stopped gracefully") + case <-time.After(shutdownTimeout): + s.log.Warn("cron scheduler stop timeout - forcing exit") + } + }() +} diff --git a/server_test.go b/server_test.go index dfa9f86..b7e3ed7 100644 --- a/server_test.go +++ b/server_test.go @@ -1,6 +1,7 @@ package tasqueue import ( + "context" "encoding/json" "fmt" "log/slog" @@ -58,3 +59,61 @@ func MockHandlerWithSleep(msg []byte, _ JobCtx) error { return nil } + +// TestCronLifecycleManagement verifies that cron scheduler starts and stops properly +func TestCronLifecycleManagement(t *testing.T) { + // Create server with in-memory broker/results + srv := newServer(t, "test", func(b []byte, ctx JobCtx) error { + return nil + }) + + // Create a context that we can cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Start server in a goroutine + done := make(chan bool) + go func() { + srv.Start(ctx) + done <- true + }() + + // Give the server a moment to start + time.Sleep(100 * time.Millisecond) + + // Cancel the context to trigger shutdown + cancel() + + // Wait for server to shut down with timeout + select { + case <-done: + t.Log("Server shut down gracefully") + case <-time.After(10 * time.Second): + t.Fatal("Server failed to shut down within timeout - cron leak likely exists") + } +} + +// TestCronContextCancellationBeforeStart verifies that cron handles early cancellation +func TestCronContextCancellationBeforeStart(t *testing.T) { + srv := newServer(t, "test", func(b []byte, ctx JobCtx) error { + return nil + }) + + // Create a context that's already cancelled + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + // Start server in a goroutine + done := make(chan bool) + go func() { + srv.Start(ctx) + done <- true + }() + + // Wait for server to shut down with timeout + select { + case <-done: + t.Log("Server handled pre-cancelled context gracefully") + case <-time.After(5 * time.Second): + t.Fatal("Server failed to handle pre-cancelled context") + } +}