Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions server/store/datastore/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package datastore

import (
"fmt"
"sync"
"time"

"xorm.io/builder"
"xorm.io/xorm"
"xorm.io/xorm/schemas"

"go.woodpecker-ci.org/woodpecker/v3/server/model"
)
Expand Down Expand Up @@ -127,7 +130,16 @@ func (s storage) GetPipelineCount() (int64, error) {
return s.engine.Count(new(model.Pipeline))
}

// createPipelineSQLiteLock is needed for CreatePipeline if sqlite is used
// as sqlite don't support to upgrade the transaction lock to be exclusive
var createPipelineSQLiteLock = &sync.Mutex{}

func (s storage) CreatePipeline(pipeline *model.Pipeline, stepList ...*model.Step) error {
// TODO: xorm do not allow us to start an `EXCLUSIVE` transaction so we have to do it on our own
if s.engine.Dialect().URI().DBType == schemas.SQLITE {
createPipelineSQLiteLock.Lock()
defer createPipelineSQLiteLock.Unlock()
}
sess := s.engine.NewSession()
defer sess.Close()
if err := sess.Begin(); err != nil {
Expand All @@ -143,15 +155,39 @@ func (s storage) CreatePipeline(pipeline *model.Pipeline, stepList ...*model.Ste
return ErrorRepoNotExist{RepoID: pipeline.RepoID}
}

// get write lock
// TODO: upstream that to xorm
switch s.engine.Dialect().URI().DBType {
case schemas.SQLITE:
// we have an exclusive lock via createPipelineSQLiteLock already

case schemas.MYSQL:
if _, err := sess.SQL("LOCK TABLE `pipelines` WRITE").Exec(); err != nil {
return err
}
// session end does not unlock so we have to
defer func() {
sess.SQL("UNLOCK TABLES").Exec()
}()

case schemas.POSTGRES:
if _, err := sess.SQL("LOCK TABLE pipelines IN EXCLUSIVE MODE").Exec(); err != nil {
return err
}

default:
return fmt.Errorf("unsupported schema %s detected", s.engine.Dialect().URI().DBType)
}

// calc pipeline number
var number int64
var currMaxNumber int64
if _, err := sess.Select("MAX(number)").
Table(new(model.Pipeline)).
Where("repo_id = ?", pipeline.RepoID).
Get(&number); err != nil {
Get(&currMaxNumber); err != nil {
return err
}
pipeline.Number = number + 1
pipeline.Number = currMaxNumber + 1

pipeline.Created = time.Now().UTC().Unix()
// only Insert set auto created ID back to object
Expand Down