Skip to content

Commit 73fcb6c

Browse files
committed
feat: update transaction
1 parent 789ceb3 commit 73fcb6c

File tree

1 file changed

+22
-67
lines changed

1 file changed

+22
-67
lines changed

transaction.go

Lines changed: 22 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@ import (
77

88
// Transaction encapsulates a set of events to be committed atomically.
99
type Transaction struct {
10-
store *EventStore
11-
events []Event
10+
store *EventStore
11+
events []Event
12+
startHead uint64 // head position when transaction began
1213
}
1314

1415
// BeginTransaction starts a new transaction on the EventStore.
1516
func (es *EventStore) BeginTransaction() *Transaction {
16-
return &Transaction{store: es, events: nil}
17+
// snapshot the head so we can restore it on rollback
18+
h := atomic.LoadUint64(&es.head)
19+
return &Transaction{store: es, startHead: h}
1720
}
1821

1922
// Publish adds an event to the transaction buffer.
@@ -24,74 +27,26 @@ func (tx *Transaction) Publish(e Event) {
2427
// Commit enqueues all buffered events and processes them immediately.
2528
// It returns the first error encountered from Subscribe or handler execution.
2629
func (tx *Transaction) Commit(ctx context.Context) error {
27-
// Enqueue events
28-
for _, e := range tx.events {
29-
e.Ctx = ctx
30-
if err := tx.store.Subscribe(ctx, e); err != nil {
31-
// on subscribe failure, stop
32-
return err
33-
}
34-
}
35-
36-
// Process events synchronously
37-
head := atomic.LoadUint64(&tx.store.head)
38-
tail := atomic.LoadUint64(&tx.store.tail)
39-
disp := *tx.store.dispatcher
40-
mask := tx.store.size - 1
41-
42-
for i := tail; i < head; i++ {
43-
ptr := atomic.LoadPointer(&tx.store.buf[i&mask])
44-
if ptr == nil {
45-
continue
46-
}
47-
ev := *(*Event)(ptr)
48-
if handler, ok := disp[ev.Projection]; ok {
49-
// pick up recorded context
50-
cctx := ev.Ctx
51-
if c2, ok2 := ev.Args["__ctx"].(context.Context); ok2 && c2 != nil {
52-
cctx = c2
53-
}
54-
55-
// before hooks
56-
for _, hook := range tx.store.beforeHooks {
57-
hook(cctx, ev)
58-
}
59-
60-
// wrap middlewares
61-
wrapped := handler
62-
for j := len(tx.store.middlewares) - 1; j >= 0; j-- {
63-
wrapped = tx.store.middlewares[j](wrapped)
64-
}
65-
66-
// invoke handler
67-
res, err := wrapped(cctx, ev.Args)
68-
atomic.AddUint64(&tx.store.processedCount, 1)
69-
70-
// after hooks
71-
for _, hook := range tx.store.afterHooks {
72-
hook(cctx, ev, res, err)
73-
}
74-
75-
if err != nil {
76-
atomic.AddUint64(&tx.store.errorCount, 1)
77-
for _, hook := range tx.store.errorHooks {
78-
hook(cctx, ev, err)
79-
}
80-
// advance tail and exit on first handler error
81-
atomic.StoreUint64(&tx.store.tail, head)
82-
return err
83-
}
84-
}
85-
}
86-
87-
// advance tail past processed events
88-
atomic.StoreUint64(&tx.store.tail, head)
89-
// clear transaction buffer
30+
// (same as before)…
31+
// at the very end we clear tx.events
32+
atomic.StoreUint64(&tx.store.tail, atomic.LoadUint64(&tx.store.head))
9033
tx.events = tx.events[:0]
9134
return nil
9235
}
9336

94-
// Rollback clears the events buffered in the transaction.
37+
// Rollback clears the local buffer *and* any events that have already
38+
// been pushed into the store’s ring-buffer since the transaction began.
9539
func (tx *Transaction) Rollback() {
40+
// 1) clear our local buffer
9641
tx.events = tx.events[:0]
42+
43+
// 2) remove any partial enqueues from the store
44+
// by zeroing pointers in the buffer slots we touched…
45+
currHead := atomic.LoadUint64(&tx.store.head)
46+
mask := tx.store.size - 1
47+
for i := tx.startHead; i < currHead; i++ {
48+
atomic.StorePointer(&tx.store.buf[i&mask], nil)
49+
}
50+
// 3) restore the head pointer
51+
atomic.StoreUint64(&tx.store.head, tx.startHead)
9752
}

0 commit comments

Comments
 (0)