Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ type StateStoreConfig struct {
// default to empty
DBDirectory string `mapstructure:"db-directory"`

// DedicatedChangelog defines if we should use a separate changelog for SS store other than sharing with SC
DedicatedChangelog bool `mapstructure:"dedicated-changelog"`

// Backend defines the backend database used for state-store
// Supported backends: pebbledb, rocksdb
// defaults to pebbledb
Expand Down Expand Up @@ -101,7 +98,6 @@ func DefaultStateCommitConfig() StateCommitConfig {
return StateCommitConfig{
Enable: true,
AsyncCommitBuffer: DefaultAsyncCommitBuffer,
CacheSize: DefaultCacheSize,
SnapshotInterval: DefaultSnapshotInterval,
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
}
Expand Down
41 changes: 21 additions & 20 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,34 +152,35 @@
}
database.lastRangeHashedCache = lastHashed

if config.DedicatedChangelog {
if config.KeepRecent < 0 {
return nil, errors.New("KeepRecent must be non-negative")
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()
}
if config.KeepRecent < 0 {
return nil, errors.New("KeepRecent must be non-negative")
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism

return database, nil
}

func (db *Database) Close() error {
// First, stop accepting new pending changes and drain the worker
close(db.pendingChanges)
Copy link
Collaborator

@masih masih Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small note that if Close is called twice for whatever reason close will panic when the channel is already closed. Ditto for rocksdb.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, will add a check to make sure it's not called twice

// Wait for the async writes to finish
db.asyncWriteWG.Wait()
// Now close the WAL stream
if db.streamHandler != nil {
_ = db.streamHandler.Close()
db.streamHandler = nil
close(db.pendingChanges)
}
// Wait for the async writes to finish
db.asyncWriteWG.Wait()
err := db.storage.Close()
db.storage = nil
return err
Expand Down
11 changes: 5 additions & 6 deletions ss/pebbledb/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ func setupTestDB(t *testing.T) (*Database, string) {

// Set up config with hash range enabled
cfg := config.StateStoreConfig{
HashRange: 10, // 10 blocks per hash range
AsyncWriteBuffer: 100,
KeepRecent: 100,
KeepLastVersion: true,
ImportNumWorkers: 4,
DedicatedChangelog: false,
HashRange: 10, // 10 blocks per hash range
AsyncWriteBuffer: 100,
KeepRecent: 100,
KeepLastVersion: true,
ImportNumWorkers: 4,
}

db, err := New(tempDir, cfg)
Expand Down
35 changes: 16 additions & 19 deletions ss/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,18 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

if config.DedicatedChangelog {
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: 300 * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()
}
streamHandler, _ := changelog.NewStream(
logger.NewNopLogger(),
utils.GetChangelogPath(dataDir),
changelog.Config{
DisableFsync: true,
ZeroCopy: true,
KeepRecent: uint64(config.KeepRecent),
PruneInterval: time.Duration(config.PruneIntervalSeconds) * time.Second,
},
)
database.streamHandler = streamHandler
go database.writeAsyncInBackground()

return database, nil
}
Expand Down Expand Up @@ -494,17 +492,16 @@ func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlo
}

func (db *Database) Close() error {
// Close the pending changes channel to signal the background goroutine to stop
close(db.pendingChanges)
// Wait for the async writes to finish processing all buffered items
db.asyncWriteWG.Wait()
if db.streamHandler != nil {
// Close the changelog stream first
db.streamHandler.Close()
// Close the pending changes channel to signal the background goroutine to stop
close(db.pendingChanges)
// Wait for the async writes to finish processing all buffered items
db.asyncWriteWG.Wait()
// Only set to nil after background goroutine has finished
db.streamHandler = nil
}

db.storage.Close()
db.storage = nil
db.cfHandle = nil
Expand Down
39 changes: 20 additions & 19 deletions ss/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,16 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
if err != nil {
return nil, err
}

// Handle auto recovery for DB running with async mode
if ssConfig.DedicatedChangelog {
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
if ssConfig.DBDirectory != "" {
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
}
err := RecoverStateStore(logger, changelogPath, stateStore)
if err != nil {
return nil, err
}
changelogPath := utils.GetChangelogPath(utils.GetStateStorePath(homeDir, ssConfig.Backend))
if ssConfig.DBDirectory != "" {
changelogPath = utils.GetChangelogPath(ssConfig.DBDirectory)
}
if err := RecoverStateStore(logger, changelogPath, stateStore); err != nil {
return nil, err
}

// Start the pruning manager for DB
pruningManager := pruning.NewPruningManager(logger, stateStore, int64(ssConfig.KeepRecent), int64(ssConfig.PruneIntervalSeconds))
pruningManager.Start()
Expand All @@ -62,9 +61,6 @@ func NewStateStore(logger logger.Logger, homeDir string, ssConfig config.StateSt
func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore types.StateStore) error {
ssLatestVersion := stateStore.GetLatestVersion()
logger.Info(fmt.Sprintf("Recovering from changelog %s with latest SS version %d", changelogPath, ssLatestVersion))
if ssLatestVersion <= 0 {
return nil
}
streamHandler, err := changelog.NewStream(logger, changelogPath, changelog.Config{})
if err != nil {
return err
Expand All @@ -84,15 +80,20 @@ func RecoverStateStore(logger logger.Logger, changelogPath string, stateStore ty
// Look backward to find where we should start replay from
curVersion := lastEntry.Version
curOffset := lastOffset
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
if ssLatestVersion > 0 {
for curVersion > ssLatestVersion && curOffset > firstOffset {
curOffset--
curEntry, errRead := streamHandler.ReadAt(curOffset)
if errRead != nil {
return err
}
curVersion = curEntry.Version
}
curVersion = curEntry.Version
} else {
// Fresh store (or no applied versions) – start from the first offset
curOffset = firstOffset
}
// Replay from the offset where the offset where the version is larger than SS store latest version
// Replay from the offset where the version is larger than SS store latest version
targetStartOffset := curOffset
logger.Info(fmt.Sprintf("Start replaying changelog to recover StateStore from offset %d to %d", targetStartOffset, lastOffset))
if targetStartOffset < lastOffset {
Expand Down
13 changes: 6 additions & 7 deletions ss/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ import (

func TestNewStateStore(t *testing.T) {
tempDir := os.TempDir()
homeDir := filepath.Join(tempDir, "seidb")
homeDir := filepath.Join(tempDir, "pebbledb")
ssConfig := config.StateStoreConfig{
DedicatedChangelog: true,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 50,
KeepRecent: 500,
Backend: string(PebbleDBBackend),
AsyncWriteBuffer: 100,
KeepRecent: 500,
}
stateStore, err := NewStateStore(logger.NewNopLogger(), homeDir, ssConfig)
require.NoError(t, err)
for i := 1; i < 20; i++ {
for i := 1; i < 50; i++ {
var changesets []*proto.NamedChangeSet
kvPair := &iavl.KVPair{
Delete: false,
Expand All @@ -51,7 +50,7 @@ func TestNewStateStore(t *testing.T) {
require.NoError(t, err)

// Make sure key and values can be found
for i := 1; i < 20; i++ {
for i := 1; i < 50; i++ {
value, err := stateStore.Get("storeA", int64(i), []byte(fmt.Sprintf("key%d", i)))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("value%d", i), string(value))
Expand Down
Loading