Skip to content

Commit d8cc8ca

Browse files
committed
fix: addres buggy transactions
Signed-off-by: isubasinghe <[email protected]>
1 parent 9cda977 commit d8cc8ca

File tree

3 files changed

+13
-11
lines changed

3 files changed

+13
-11
lines changed

persist/sqldb/workflow_archive.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,16 @@ func (r *workflowArchive) ArchiveWorkflow(ctx context.Context, wf *wfv1.Workflow
112112
if r.dbType == sqldb.Postgres {
113113
workflow = bytes.ReplaceAll(workflow, []byte("\\u0000"), []byte(postgresNullReplacement))
114114
}
115-
return r.sessionProxy.TxWith(ctx, func(s db.Session) error {
116-
_, err := s.SQL().
115+
return r.sessionProxy.TxWith(ctx, func(sp *sqldb.SessionProxy) error {
116+
_, err := sp.Session().SQL().
117117
DeleteFrom(archiveTableName).
118118
Where(r.clusterManagedNamespaceAndInstanceID()).
119119
And(db.Cond{"uid": wf.UID}).
120120
Exec()
121121
if err != nil {
122122
return err
123123
}
124-
_, err = s.Collection(archiveTableName).
124+
_, err = sp.Session().Collection(archiveTableName).
125125
Insert(&archivedWorkflowRecord{
126126
archivedWorkflowMetadata: archivedWorkflowMetadata{
127127
ClusterName: r.clusterName,
@@ -140,7 +140,7 @@ func (r *workflowArchive) ArchiveWorkflow(ctx context.Context, wf *wfv1.Workflow
140140
return err
141141
}
142142

143-
_, err = s.SQL().
143+
_, err = sp.Session().SQL().
144144
DeleteFrom(archiveLabelsTableName).
145145
Where(db.Cond{"clustername": r.clusterName}).
146146
And(db.Cond{"uid": wf.UID}).
@@ -150,7 +150,7 @@ func (r *workflowArchive) ArchiveWorkflow(ctx context.Context, wf *wfv1.Workflow
150150
}
151151
// insert the labels
152152
for key, value := range wf.GetLabels() {
153-
_, err := s.Collection(archiveLabelsTableName).
153+
_, err := sp.Session().Collection(archiveLabelsTableName).
154154
Insert(&archivedWorkflowLabelRecord{
155155
ClusterName: r.clusterName,
156156
UID: string(wf.UID),

util/sqldb/session.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,12 @@ func (sp *SessionProxy) Tx() *SessionProxy {
105105
}
106106

107107
// TxWith runs a With transaction
108-
func (sp *SessionProxy) TxWith(ctx context.Context, fn func(db.Session) error, opts *sql.TxOptions) error {
109-
return sp.Tx().With(ctx, func(s db.Session) error {
110-
return s.TxContext(ctx, fn, opts)
108+
func (sp *SessionProxy) TxWith(ctx context.Context, fn func(*SessionProxy) error, opts *sql.TxOptions) error {
109+
return sp.With(ctx, func(s db.Session) error {
110+
return s.TxContext(ctx, func(sess db.Session) error {
111+
newSp := SessionProxy{sp.kubectlConfig, sp.namespace, sp.dbConfig, sp.username, sp.password, sess, sync.RWMutex{}, sp.closed, sp.maxRetries, sp.baseDelay, sp.maxDelay, sp.retryMultiple, sp.insideTransaction}
112+
return fn(newSp.Tx())
113+
}, opts)
111114
})
112115
}
113116

workflow/sync/sync_manager.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,11 +317,10 @@ func (sm *Manager) TryAcquire(ctx context.Context, wf *wfv1.Workflow, nodeName s
317317
var already bool
318318
var msg string
319319
var failedLockName string
320-
err := sm.dbInfo.SessionProxy.TxWith(ctx, func(sess db.Session) error {
320+
err := sm.dbInfo.SessionProxy.TxWith(ctx, func(sp *sqldb.SessionProxy) error {
321321
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - starting transaction")
322322
var err error
323-
s := sqldb.NewSessionProxyFromSession(sess, nil, "", "")
324-
tx := &transaction{s}
323+
tx := &transaction{sp}
325324
already, updated, msg, failedLockName, err = sm.tryAcquireImpl(ctx, wf, tx, holderKey, failedLockName, syncItems, lockKeys)
326325
sm.log.WithField("holderKey", holderKey).Info(ctx, "TryAcquire - transaction completed")
327326
return err

0 commit comments

Comments
 (0)