Skip to content

Commit 9cda977

Browse files
committed
fix: cleaner transactions
Signed-off-by: isubasinghe <[email protected]>
1 parent ace7bc6 commit 9cda977

File tree

3 files changed

+59
-64
lines changed

3 files changed

+59
-64
lines changed

persist/sqldb/workflow_archive.go

Lines changed: 47 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -112,59 +112,57 @@ func (r *workflowArchive) ArchiveWorkflow(ctx context.Context, wf *wfv1.Workflow
112112
if r.dbType == sqldb.Postgres {
113113
workflow = bytes.ReplaceAll(workflow, []byte("\\u0000"), []byte(postgresNullReplacement))
114114
}
115-
return r.sessionProxy.With(ctx, func(s db.Session) error {
116-
return s.Tx(func(sess db.Session) error {
117-
_, err := sess.SQL().
118-
DeleteFrom(archiveTableName).
119-
Where(r.clusterManagedNamespaceAndInstanceID()).
120-
And(db.Cond{"uid": wf.UID}).
121-
Exec()
122-
if err != nil {
123-
return err
124-
}
125-
_, err = sess.Collection(archiveTableName).
126-
Insert(&archivedWorkflowRecord{
127-
archivedWorkflowMetadata: archivedWorkflowMetadata{
128-
ClusterName: r.clusterName,
129-
InstanceID: r.instanceIDService.InstanceID(),
130-
UID: string(wf.UID),
131-
Name: wf.Name,
132-
Namespace: wf.Namespace,
133-
Phase: wf.Status.Phase,
134-
StartedAt: wf.Status.StartedAt.Time,
135-
FinishedAt: wf.Status.FinishedAt.Time,
136-
CreationTimestamp: wf.CreationTimestamp.Time,
137-
},
138-
Workflow: string(workflow),
139-
})
140-
if err != nil {
141-
return err
142-
}
115+
return r.sessionProxy.TxWith(ctx, func(s db.Session) error {
116+
_, err := s.SQL().
117+
DeleteFrom(archiveTableName).
118+
Where(r.clusterManagedNamespaceAndInstanceID()).
119+
And(db.Cond{"uid": wf.UID}).
120+
Exec()
121+
if err != nil {
122+
return err
123+
}
124+
_, err = s.Collection(archiveTableName).
125+
Insert(&archivedWorkflowRecord{
126+
archivedWorkflowMetadata: archivedWorkflowMetadata{
127+
ClusterName: r.clusterName,
128+
InstanceID: r.instanceIDService.InstanceID(),
129+
UID: string(wf.UID),
130+
Name: wf.Name,
131+
Namespace: wf.Namespace,
132+
Phase: wf.Status.Phase,
133+
StartedAt: wf.Status.StartedAt.Time,
134+
FinishedAt: wf.Status.FinishedAt.Time,
135+
CreationTimestamp: wf.CreationTimestamp.Time,
136+
},
137+
Workflow: string(workflow),
138+
})
139+
if err != nil {
140+
return err
141+
}
143142

144-
_, err = sess.SQL().
145-
DeleteFrom(archiveLabelsTableName).
146-
Where(db.Cond{"clustername": r.clusterName}).
147-
And(db.Cond{"uid": wf.UID}).
148-
Exec()
143+
_, err = s.SQL().
144+
DeleteFrom(archiveLabelsTableName).
145+
Where(db.Cond{"clustername": r.clusterName}).
146+
And(db.Cond{"uid": wf.UID}).
147+
Exec()
148+
if err != nil {
149+
return err
150+
}
151+
// insert the labels
152+
for key, value := range wf.GetLabels() {
153+
_, err := s.Collection(archiveLabelsTableName).
154+
Insert(&archivedWorkflowLabelRecord{
155+
ClusterName: r.clusterName,
156+
UID: string(wf.UID),
157+
Key: key,
158+
Value: value,
159+
})
149160
if err != nil {
150161
return err
151162
}
152-
// insert the labels
153-
for key, value := range wf.GetLabels() {
154-
_, err := sess.Collection(archiveLabelsTableName).
155-
Insert(&archivedWorkflowLabelRecord{
156-
ClusterName: r.clusterName,
157-
UID: string(wf.UID),
158-
Key: key,
159-
Value: value,
160-
})
161-
if err != nil {
162-
return err
163-
}
164-
}
165-
return nil
166-
})
167-
})
163+
}
164+
return nil
165+
}, nil)
168166
}
169167

170168
func (r *workflowArchive) ListWorkflows(ctx context.Context, options sutils.ListOptions) (wfv1.Workflows, error) {

util/sqldb/session.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,10 @@ func (sp *SessionProxy) Tx() *SessionProxy {
105105
}
106106

107107
// TxWith runs a With transaction
108-
func (sp *SessionProxy) TxWith(ctx context.Context, fn func(db.Session) error) error {
109-
return sp.Tx().With(ctx, fn)
108+
func (sp *SessionProxy) TxWith(ctx context.Context, fn func(db.Session) error, opts *sql.TxOptions) error {
109+
return sp.Tx().With(ctx, func(s db.Session) error {
110+
return s.TxContext(ctx, fn, opts)
111+
})
110112
}
111113

112114
func (sp *SessionProxy) connect(ctx context.Context) error {

workflow/sync/sync_manager.go

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -318,19 +318,14 @@ func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName s
318318
var msg string
319319
var failedLockName string
320320
err := sm.dbInfo.SessionProxy.TxWith(ctx, func(sess db.Session) error {
321-
return sess.TxContext(ctx, func(txSess db.Session) error {
322-
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - starting transaction")
323-
var err error
324-
s := sqldb.NewSessionProxyFromSession(txSess, nil, "", "")
325-
tx := &transaction{s}
326-
already, updated, msg, failedLockName, err = sm.tryAcquireImpl(ctx, wf, tx, holderKey, failedLockName, syncItems, lockKeys)
327-
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - transaction completed")
328-
return err
329-
}, &sql.TxOptions{
330-
Isolation: sql.LevelSerializable,
331-
ReadOnly: false,
332-
})
333-
})
321+
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - starting transaction")
322+
var err error
323+
s := sqldb.NewSessionProxyFromSession(sess, nil, "", "")
324+
tx := &transaction{s}
325+
already, updated, msg, failedLockName, err = sm.tryAcquireImpl(ctx, wf, tx, holderKey, failedLockName, syncItems, lockKeys)
326+
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - transaction completed")
327+
return err
328+
}, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false})
334329
if err != nil {
335330
return false, false, "", failedLockName, err
336331
}

0 commit comments

Comments
 (0)