Skip to content

Commit 9915d21

Browse files
committed
fix some lock contention during event handling and add diagnostic logs
1 parent be39dd7 commit 9915d21

File tree

3 files changed

+129
-56
lines changed

3 files changed

+129
-56
lines changed

kite-service/internal/core/engine/app.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,6 @@ func NewApp(
4242
}
4343

4444
func (a *App) AddCommand(cmd *model.Command) {
45-
a.Lock()
46-
defer a.Unlock()
47-
4845
command, err := NewCommand(
4946
cmd,
5047
a.stores,
@@ -54,6 +51,9 @@ func (a *App) AddCommand(cmd *model.Command) {
5451
return
5552
}
5653

54+
a.Lock()
55+
defer a.Unlock()
56+
5757
a.commands[cmd.ID] = command
5858

5959
if !cmd.LastDeployedAt.Valid || cmd.LastDeployedAt.Time.Before(cmd.UpdatedAt) {
@@ -62,14 +62,14 @@ func (a *App) AddCommand(cmd *model.Command) {
6262
}
6363

6464
func (a *App) RemoveDanglingCommands(commandIDs []string) {
65-
a.Lock()
66-
defer a.Unlock()
67-
6865
commandIDMap := make(map[string]struct{}, len(commandIDs))
6966
for _, commandID := range commandIDs {
7067
commandIDMap[commandID] = struct{}{}
7168
}
7269

70+
a.Lock()
71+
defer a.Unlock()
72+
7373
for cmdID := range a.commands {
7474
if _, ok := commandIDMap[cmdID]; !ok {
7575
delete(a.commands, cmdID)
@@ -79,9 +79,6 @@ func (a *App) RemoveDanglingCommands(commandIDs []string) {
7979
}
8080

8181
func (a *App) AddEventListener(listener *model.EventListener) {
82-
a.Lock()
83-
defer a.Unlock()
84-
8582
eventListener, err := NewEventListener(
8683
listener,
8784
a.stores,
@@ -91,18 +88,21 @@ func (a *App) AddEventListener(listener *model.EventListener) {
9188
return
9289
}
9390

91+
a.Lock()
92+
defer a.Unlock()
93+
9494
a.listeners[listener.ID] = eventListener
9595
}
9696

9797
func (a *App) RemoveDanglingEventListeners(listenerIDs []string) {
98-
a.Lock()
99-
defer a.Unlock()
100-
10198
listenerIDMap := make(map[string]struct{}, len(listenerIDs))
10299
for _, listenerID := range listenerIDs {
103100
listenerIDMap[listenerID] = struct{}{}
104101
}
105102

103+
a.Lock()
104+
defer a.Unlock()
105+
106106
for listenerID := range a.listeners {
107107
if _, ok := listenerIDMap[listenerID]; !ok {
108108
delete(a.listeners, listenerID)
@@ -128,17 +128,38 @@ func (a *App) createLogEntry(level model.LogLevel, message string) {
128128
}
129129

130130
func (a *App) HandleEvent(appID string, session *state.State, event gateway.Event) {
131-
a.RLock()
132-
defer a.RUnlock()
133-
134131
switch e := event.(type) {
135132
case *gateway.InteractionCreateEvent:
133+
timeDiff := time.Since(e.ID.Time())
134+
if timeDiff > 500*time.Millisecond {
135+
slog.Warn(
136+
"Received interaction event late",
137+
slog.String("app_id", appID),
138+
slog.String("interaction_id", e.ID.String()),
139+
slog.String("time_diff", timeDiff.String()),
140+
)
141+
}
142+
136143
switch d := e.Data.(type) {
137144
case *discord.CommandInteraction:
138145
fullName := getFullCommandName(d)
146+
147+
lockStart := time.Now()
148+
a.RLock()
149+
defer a.RUnlock()
150+
lockDiff := time.Since(lockStart)
151+
if lockDiff > 100*time.Millisecond {
152+
slog.Warn(
153+
"Locking app took too long",
154+
slog.String("app_id", appID),
155+
slog.String("lock_duration", lockDiff.String()),
156+
)
157+
}
158+
139159
for _, command := range a.commands {
140160
if command.cmd.Name == fullName {
141161
go command.HandleEvent(appID, session, event)
162+
break
142163
}
143164
}
144165
case *discord.ButtonInteraction:
@@ -186,14 +207,17 @@ func (a *App) HandleEvent(appID string, session *state.State, event gateway.Even
186207
}
187208

188209
if resumePoint.CommandID.Valid {
210+
a.RLock()
211+
defer a.RUnlock()
212+
189213
command, ok := a.commands[resumePoint.CommandID.String]
190214
if !ok {
191215
return
192216
}
193217

194218
node := command.flow.FindChildWithID(resumePoint.FlowNodeID)
195219

196-
a.stores.executeFlowEvent(
220+
go a.stores.executeFlowEvent(
197221
context.Background(),
198222
a.id,
199223
node,
@@ -254,8 +278,7 @@ func (a *App) HandleEvent(appID string, session *state.State, event gateway.Even
254278
}
255279

256280
node := targetFlow.FindChildWithID(resumePoint.FlowNodeID)
257-
258-
a.stores.executeFlowEvent(
281+
go a.stores.executeFlowEvent(
259282
context.Background(),
260283
a.id,
261284
node,
@@ -268,6 +291,10 @@ func (a *App) HandleEvent(appID string, session *state.State, event gateway.Even
268291
}
269292
default:
270293
eventType := model.EventTypeFromDiscordEventType(e.EventType())
294+
295+
a.RLock()
296+
defer a.RUnlock()
297+
271298
for _, listener := range a.listeners {
272299
if listener.listener.Source != model.EventSourceDiscord {
273300
continue
@@ -277,7 +304,7 @@ func (a *App) HandleEvent(appID string, session *state.State, event gateway.Even
277304
continue
278305
}
279306

280-
listener.HandleEvent(appID, session, event)
307+
go listener.HandleEvent(appID, session, event)
281308
}
282309
}
283310
}

kite-service/internal/core/engine/engine.go

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ func NewEngine(
2929
}
3030
}
3131

32-
func (m *Engine) Run(ctx context.Context) {
32+
func (e *Engine) Run(ctx context.Context) {
3333
updateTicker := time.NewTicker(1 * time.Second)
34+
removeTicker := time.NewTicker(60 * time.Second)
3435
deployTicker := time.NewTicker(60 * time.Second)
3536

3637
go func() {
@@ -40,101 +41,136 @@ func (m *Engine) Run(ctx context.Context) {
4041
updateTicker.Stop()
4142
return
4243
case <-updateTicker.C:
43-
lastUpdate := m.lastUpdate
44-
m.lastUpdate = time.Now().UTC()
44+
lastUpdate := e.lastUpdate
45+
e.lastUpdate = time.Now().UTC()
4546

46-
if err := m.populateCommands(ctx, lastUpdate); err != nil {
47+
if err := e.populateCommands(ctx, lastUpdate); err != nil {
4748
slog.Error(
4849
"Failed to populate commands in engine",
4950
slog.String("error", err.Error()),
5051
)
5152
}
52-
if err := m.populateEventListeners(ctx, lastUpdate); err != nil {
53+
if err := e.populateEventListeners(ctx, lastUpdate); err != nil {
5354
slog.Error(
5455
"Failed to populate event listeners in engine",
5556
slog.String("error", err.Error()),
5657
)
5758
}
59+
case <-removeTicker.C:
60+
if err := e.removeDanglingCommands(ctx); err != nil {
61+
slog.Error(
62+
"Failed to remove dangling commands in engine",
63+
slog.String("error", err.Error()),
64+
)
65+
}
66+
if err := e.removeDanglingEventListeners(ctx); err != nil {
67+
slog.Error(
68+
"Failed to remove dangling event listeners in engine",
69+
slog.String("error", err.Error()),
70+
)
71+
}
5872
case <-deployTicker.C:
59-
m.deployCommands(ctx)
73+
e.deployCommands(ctx)
6074
}
6175
}
6276
}()
6377
}
6478

65-
func (m *Engine) populateCommands(ctx context.Context, lastUpdate time.Time) error {
66-
commandIDs, err := m.stores.CommandStore.EnabledCommandIDs(ctx)
67-
if err != nil {
68-
return fmt.Errorf("failed to get enabled command IDs: %w", err)
69-
}
70-
71-
commands, err := m.stores.CommandStore.EnabledCommandsUpdatedSince(ctx, lastUpdate)
79+
func (e *Engine) populateCommands(ctx context.Context, lastUpdate time.Time) error {
80+
commands, err := e.stores.CommandStore.EnabledCommandsUpdatedSince(ctx, lastUpdate)
7281
if err != nil {
7382
return fmt.Errorf("failed to get commands: %w", err)
7483
}
7584

76-
m.Lock()
77-
defer m.Unlock()
85+
lockStart := time.Now()
86+
e.Lock()
87+
defer e.Unlock()
88+
lockDiff := time.Since(lockStart)
89+
if lockDiff > 250*time.Millisecond {
90+
slog.Warn(
91+
"Locking engine for commands took too long",
92+
slog.String("lock_duration", lockDiff.String()),
93+
)
94+
}
7895

7996
for _, command := range commands {
80-
app, ok := m.apps[command.AppID]
97+
app, ok := e.apps[command.AppID]
8198
if !ok {
8299
app = NewApp(
83100
command.AppID,
84-
m.stores,
101+
e.stores,
85102
)
86-
m.apps[command.AppID] = app
103+
e.apps[command.AppID] = app
87104
}
88105

89106
app.AddCommand(command)
90107
}
91108

92-
for _, app := range m.apps {
93-
app.RemoveDanglingCommands(commandIDs)
94-
}
95-
96109
return nil
97110
}
98111

99-
func (m *Engine) populateEventListeners(ctx context.Context, lastUpdate time.Time) error {
100-
listenerIDs, err := m.stores.EventListenerStore.EnabledEventListenerIDs(ctx)
112+
func (e *Engine) removeDanglingCommands(ctx context.Context) error {
113+
commandIDs, err := e.stores.CommandStore.EnabledCommandIDs(ctx)
101114
if err != nil {
102-
return fmt.Errorf("failed to get enabled event listener IDs: %w", err)
115+
return fmt.Errorf("failed to get enabled command IDs: %w", err)
103116
}
104117

105-
listeners, err := m.stores.EventListenerStore.EnabledEventListenersUpdatedSince(ctx, lastUpdate)
118+
e.RLock()
119+
defer e.RUnlock()
120+
121+
for _, app := range e.apps {
122+
app.RemoveDanglingCommands(commandIDs)
123+
}
124+
125+
return nil
126+
}
127+
128+
func (e *Engine) populateEventListeners(ctx context.Context, lastUpdate time.Time) error {
129+
listeners, err := e.stores.EventListenerStore.EnabledEventListenersUpdatedSince(ctx, lastUpdate)
106130
if err != nil {
107131
return fmt.Errorf("failed to get event listeners: %w", err)
108132
}
109133

110-
m.Lock()
111-
defer m.Unlock()
134+
e.Lock()
135+
defer e.Unlock()
112136

113137
for _, listener := range listeners {
114-
app, ok := m.apps[listener.AppID]
138+
app, ok := e.apps[listener.AppID]
115139
if !ok {
116140
app = NewApp(
117141
listener.AppID,
118-
m.stores,
142+
e.stores,
119143
)
120-
m.apps[listener.AppID] = app
144+
e.apps[listener.AppID] = app
121145
}
122146

123147
app.AddEventListener(listener)
124148
}
125149

126-
for _, app := range m.apps {
150+
return nil
151+
}
152+
153+
func (e *Engine) removeDanglingEventListeners(ctx context.Context) error {
154+
listenerIDs, err := e.stores.EventListenerStore.EnabledEventListenerIDs(ctx)
155+
if err != nil {
156+
return fmt.Errorf("failed to get enabled event listener IDs: %w", err)
157+
}
158+
159+
e.RLock()
160+
defer e.RUnlock()
161+
162+
for _, app := range e.apps {
127163
app.RemoveDanglingEventListeners(listenerIDs)
128164
}
129165

130166
return nil
131167
}
132168

133-
func (m *Engine) deployCommands(ctx context.Context) {
134-
m.Lock()
135-
defer m.Unlock()
169+
func (e *Engine) deployCommands(ctx context.Context) {
170+
e.Lock()
171+
defer e.Unlock()
136172

137-
for _, app := range m.apps {
173+
for _, app := range e.apps {
138174
if app.hasUndeployedChanges {
139175
go func() {
140176
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
@@ -156,10 +192,20 @@ func (m *Engine) deployCommands(ctx context.Context) {
156192
}
157193
}
158194

195+
// HandleEvent blocks until the event is handled by the corresponding app.
159196
func (e *Engine) HandleEvent(appID string, session *state.State, event gateway.Event) {
197+
lockStart := time.Now()
160198
e.RLock()
161199
app := e.apps[appID]
162200
e.RUnlock()
201+
lockDiff := time.Since(lockStart)
202+
if lockDiff > 100*time.Millisecond {
203+
slog.Warn(
204+
"Locking engine for event took too long",
205+
slog.String("app_id", appID),
206+
slog.String("lock_duration", lockDiff.String()),
207+
)
208+
}
163209

164210
if app != nil {
165211
app.HandleEvent(appID, session, event)

kite-service/internal/core/gateway/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (m *GatewayManager) addGateway(ctx context.Context, app *model.App) error {
145145
return m.addGateway(ctx, app)
146146
}
147147

148-
g.Update(ctx, app)
148+
go g.Update(ctx, app)
149149
} else {
150150
g := NewGateway(app, m.logStore, m.appStore, m.planManager, m.eventHandler)
151151
m.gateways[app.ID] = g

0 commit comments

Comments
 (0)