Skip to content

Commit bd558f9

Browse files
committed
Update
1 parent 73fcb6c commit bd558f9

File tree

2 files changed

+69
-7
lines changed

2 files changed

+69
-7
lines changed

README.MD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ type Event struct {
185185
type Transaction struct {
186186
store *EventStore
187187
events []Event
188+
startHead uint64 // head position when transaction began
189+
188190
}
189191
```
190192

transaction.go

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,26 +27,86 @@ func (tx *Transaction) Publish(e Event) {
2727
// Commit enqueues all buffered events and processes them immediately.
2828
// It returns the first error encountered from Subscribe or handler execution.
2929
func (tx *Transaction) Commit(ctx context.Context) error {
30-
// (same as before)…
31-
// at the very end we clear tx.events
32-
atomic.StoreUint64(&tx.store.tail, atomic.LoadUint64(&tx.store.head))
30+
// 1) Enqueue
31+
for _, e := range tx.events {
32+
e.Ctx = ctx
33+
if err := tx.store.Subscribe(ctx, e); err != nil {
34+
return err
35+
}
36+
}
37+
38+
// 2) Process synchronously
39+
head := atomic.LoadUint64(&tx.store.head)
40+
tail := atomic.LoadUint64(&tx.store.tail)
41+
disp := *tx.store.dispatcher
42+
mask := tx.store.size - 1
43+
44+
for i := tail; i < head; i++ {
45+
ptr := atomic.LoadPointer(&tx.store.buf[i&mask])
46+
if ptr == nil {
47+
continue
48+
}
49+
ev := *(*Event)(ptr)
50+
if handler, ok := disp[ev.Projection]; ok {
51+
// pick up recorded context
52+
cctx := ev.Ctx
53+
if c2, ok2 := ev.Args["__ctx"].(context.Context); ok2 && c2 != nil {
54+
cctx = c2
55+
}
56+
57+
// before hooks
58+
for _, hook := range tx.store.beforeHooks {
59+
hook(cctx, ev)
60+
}
61+
62+
// wrap middlewares
63+
wrapped := handler
64+
for j := len(tx.store.middlewares) - 1; j >= 0; j-- {
65+
wrapped = tx.store.middlewares[j](wrapped)
66+
}
67+
68+
// invoke handler
69+
res, err := wrapped(cctx, ev.Args)
70+
atomic.AddUint64(&tx.store.processedCount, 1)
71+
72+
// after hooks
73+
for _, hook := range tx.store.afterHooks {
74+
hook(cctx, ev, res, err)
75+
}
76+
77+
if err != nil {
78+
atomic.AddUint64(&tx.store.errorCount, 1)
79+
for _, hook := range tx.store.errorHooks {
80+
hook(cctx, ev, err)
81+
}
82+
// advance tail and exit on first handler error
83+
atomic.StoreUint64(&tx.store.tail, head)
84+
return err
85+
}
86+
}
87+
}
88+
89+
// 3) Advance tail past processed events
90+
atomic.StoreUint64(&tx.store.tail, head)
91+
92+
// 4) Clear transaction buffer
3393
tx.events = tx.events[:0]
3494
return nil
3595
}
3696

3797
// Rollback clears the local buffer *and* any events that have already
3898
// been pushed into the store’s ring-buffer since the transaction began.
3999
func (tx *Transaction) Rollback() {
40-
// 1) clear our local buffer
100+
// clear our local buffer
41101
tx.events = tx.events[:0]
42102

43-
// 2) remove any partial enqueues from the store
44-
// by zeroing pointers in the buffer slots we touched…
103+
// remove any partial enqueues from the store
45104
currHead := atomic.LoadUint64(&tx.store.head)
46105
mask := tx.store.size - 1
47106
for i := tx.startHead; i < currHead; i++ {
48107
atomic.StorePointer(&tx.store.buf[i&mask], nil)
49108
}
50-
// 3) restore the head pointer
109+
110+
// restore the head pointer
51111
atomic.StoreUint64(&tx.store.head, tx.startHead)
52112
}

0 commit comments

Comments
 (0)