Skip to content

Commit a93cc9f

Browse files
authored
fix: use batchtime from sequencer for header time, upgrade to latest go-da and go-sequencing (#1871)
Fixes #1842 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced transaction and batch management, including improved timestamp handling. - Improved handling of chain IDs in tests for better accuracy across multiple chain contexts. - **Tests** - Added new test cases for mempool behavior across nodes and block processing time constraints. - Updated existing tests to improve assertions and ensure comprehensive coverage of functionalities. - Increased timeout duration in tests to accommodate longer processing times. - **Chores** - Updated dependency versions for better compatibility and performance, including `github.com/rollkit/go-sequencing` to version `v0.2.1-0.20241010053131-3134457dc4e5`. - Retraction notice added for version `v0.12.0` due to accidental publication. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 0335555 commit a93cc9f

27 files changed

+381
-250
lines changed

block/cache_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
)
1010

1111
func TestBlockCache(t *testing.T) {
12+
chainID := "TestBlockCache"
1213
require := require.New(t)
1314
// Create new HeaderCache and DataCache and verify not nil
1415
hc := NewHeaderCache()
@@ -19,7 +20,7 @@ func TestBlockCache(t *testing.T) {
1920

2021
// Test setBlock and getBlock
2122
height, nTxs := uint64(1), 2
22-
header, data := types.GetRandomBlock(height, nTxs)
23+
header, data := types.GetRandomBlock(height, nTxs, chainID)
2324
hc.setHeader(height, header)
2425
gotHeader := hc.getHeader(height)
2526
require.NotNil(gotHeader, "getHeader should return non-nil after setHeader")
@@ -30,7 +31,7 @@ func TestBlockCache(t *testing.T) {
3031
require.Equal(data, gotData)
3132

3233
// Test overwriting a block
33-
header1, data1 := types.GetRandomBlock(height, nTxs)
34+
header1, data1 := types.GetRandomBlock(height, nTxs, chainID)
3435
hc.setHeader(height, header1)
3536
gotHeader1 := hc.getHeader(height)
3637
require.NotNil(gotHeader1, "getHeader should return non-nil after overwriting a header")

block/manager.go

Lines changed: 54 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package block
33
import (
44
"bytes"
55
"context"
6-
"crypto/sha256"
76
"encoding/binary"
87
"encoding/hex"
98
"errors"
@@ -52,9 +51,6 @@ const defaultBlockTime = 1 * time.Second
5251
// defaultLazyBlockTime is used only if LazyBlockTime is not configured for manager
5352
const defaultLazyBlockTime = 60 * time.Second
5453

55-
// defaultBatchRetrievalInterval is the interval at which the sequencer retrieves batches
56-
const defaultBatchRetrievalInterval = 1 * time.Second
57-
5854
// defaultMempoolTTL is the number of blocks until transaction is dropped from mempool
5955
const defaultMempoolTTL = 25
6056

@@ -81,6 +77,9 @@ const DAIncludedHeightKey = "da included height"
8177
// dataHashForEmptyTxs to be used while only syncing headers from DA and no p2p to get the Data for no txs scenarios, the syncing can proceed without getting stuck forever.
8278
var dataHashForEmptyTxs = []byte{110, 52, 11, 156, 255, 179, 122, 152, 156, 165, 68, 230, 187, 120, 10, 44, 120, 144, 29, 63, 179, 55, 56, 118, 133, 17, 163, 6, 23, 175, 160, 29}
8379

80+
// ErrNoBatch indicate no batch is available for creating block
81+
var ErrNoBatch = errors.New("no batch to process")
82+
8483
// NewHeaderEvent is used to pass header and DA height to headerInCh
8584
type NewHeaderEvent struct {
8685
Header *types.SignedHeader
@@ -405,37 +404,53 @@ func (m *Manager) getRemainingSleep(start time.Time) time.Duration {
405404

406405
// BatchRetrieveLoop is responsible for retrieving batches from the sequencer.
407406
func (m *Manager) BatchRetrieveLoop(ctx context.Context) {
408-
// batchTimer is used to signal when to retrieve batch from the sequencer
407+
// Initialize batchTimer to fire immediately on start
409408
batchTimer := time.NewTimer(0)
410409
defer batchTimer.Stop()
410+
411411
for {
412412
select {
413413
case <-ctx.Done():
414414
return
415415
case <-batchTimer.C:
416-
// Define the start time for the block production period
417416
start := time.Now()
418-
batch, batchTime, err := m.seqClient.GetNextBatch(ctx, m.lastBatchHash)
419-
if err != nil && ctx.Err() == nil {
417+
418+
// Skip batch retrieval if context is already done
419+
if ctx.Err() != nil {
420+
return
421+
}
422+
423+
res, err := m.seqClient.GetNextBatch(ctx, sequencing.GetNextBatchRequest{
424+
RollupId: []byte(m.genesis.ChainID),
425+
LastBatchHash: m.lastBatchHash,
426+
})
427+
428+
if err != nil {
420429
m.logger.Error("error while retrieving batch", "error", err)
421430
}
422-
// Add the batch to the batch queue
423-
if batch != nil && batch.Transactions != nil {
424-
m.bq.AddBatch(BatchWithTime{batch, batchTime})
425-
// Calculate the hash of the batch and store it for the next batch retrieval
426-
batchBytes, err := batch.Marshal()
427-
if err != nil {
428-
m.logger.Error("error while marshaling batch", "error", err)
431+
432+
if res != nil && res.Batch != nil {
433+
batch := res.Batch
434+
batchTime := res.Timestamp
435+
436+
// Calculate and store batch hash only if hashing succeeds
437+
if h, err := batch.Hash(); err == nil {
438+
m.bq.AddBatch(BatchWithTime{Batch: batch, Time: batchTime})
439+
440+
// Update lastBatchHash only if the batch contains transactions
441+
if batch.Transactions != nil {
442+
m.lastBatchHash = h
443+
}
444+
} else {
445+
m.logger.Error("error while hashing batch", "error", err)
429446
}
430-
h := sha256.Sum256(batchBytes)
431-
m.lastBatchHash = h[:]
432447
}
433-
// Reset the batchTimer to signal the next batch production
434-
// period based on the batch retrieval time.
435-
remainingSleep := time.Duration(0)
448+
449+
// Determine remaining time for the next batch and reset timer
436450
elapsed := time.Since(start)
437-
if elapsed < defaultBatchRetrievalInterval {
438-
remainingSleep = defaultBatchRetrievalInterval - elapsed
451+
remainingSleep := m.conf.BlockTime - elapsed
452+
if remainingSleep < 0 {
453+
remainingSleep = 0
439454
}
440455
batchTimer.Reset(remainingSleep)
441456
}
@@ -984,16 +999,17 @@ func (m *Manager) getSignature(header types.Header) (*types.Signature, error) {
984999
return &signature, nil
9851000
}
9861001

987-
func (m *Manager) getTxsFromBatch() cmtypes.Txs {
1002+
func (m *Manager) getTxsFromBatch() (cmtypes.Txs, *time.Time, error) {
9881003
batch := m.bq.Next()
9891004
if batch == nil {
990-
return make(cmtypes.Txs, 0)
1005+
// batch is nil when there is nothing to process
1006+
return nil, nil, ErrNoBatch
9911007
}
9921008
txs := make(cmtypes.Txs, 0, len(batch.Transactions))
9931009
for _, tx := range batch.Transactions {
9941010
txs = append(txs, tx)
9951011
}
996-
return txs
1012+
return txs, &batch.Time, nil
9971013
}
9981014

9991015
func (m *Manager) publishBlock(ctx context.Context) error {
@@ -1016,6 +1032,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
10161032
lastSignature *types.Signature
10171033
lastHeaderHash types.Hash
10181034
lastDataHash types.Hash
1035+
lastHeaderTime time.Time
10191036
err error
10201037
)
10211038
height := m.store.Height()
@@ -1034,6 +1051,7 @@ func (m *Manager) publishBlock(ctx context.Context) error {
10341051
}
10351052
lastHeaderHash = lastHeader.Hash()
10361053
lastDataHash = lastData.Hash()
1054+
lastHeaderTime = lastHeader.Time()
10371055
}
10381056

10391057
var (
@@ -1056,7 +1074,15 @@ func (m *Manager) publishBlock(ctx context.Context) error {
10561074
return fmt.Errorf("failed to load extended commit for height %d: %w", height, err)
10571075
}
10581076

1059-
header, data, err = m.createBlock(newHeight, lastSignature, lastHeaderHash, extendedCommit, m.getTxsFromBatch())
1077+
txs, timestamp, err := m.getTxsFromBatch()
1078+
if err != nil {
1079+
return fmt.Errorf("failed to get transactions from batch: %w", err)
1080+
}
1081+
// sanity check timestamp for monotonically increasing
1082+
if timestamp.Before(lastHeaderTime) {
1083+
return fmt.Errorf("timestamp is not monotonically increasing: %s < %s", timestamp, m.getLastBlockTime())
1084+
}
1085+
header, data, err = m.createBlock(newHeight, lastSignature, lastHeaderHash, extendedCommit, txs, *timestamp)
10601086
if err != nil {
10611087
return err
10621088
}
@@ -1397,10 +1423,10 @@ func (m *Manager) getLastBlockTime() time.Time {
13971423
return m.lastState.LastBlockTime
13981424
}
13991425

1400-
func (m *Manager) createBlock(height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, extendedCommit abci.ExtendedCommitInfo, txs cmtypes.Txs) (*types.SignedHeader, *types.Data, error) {
1426+
func (m *Manager) createBlock(height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, extendedCommit abci.ExtendedCommitInfo, txs cmtypes.Txs, timestamp time.Time) (*types.SignedHeader, *types.Data, error) {
14011427
m.lastStateMtx.RLock()
14021428
defer m.lastStateMtx.RUnlock()
1403-
return m.executor.CreateBlock(height, lastSignature, extendedCommit, lastHeaderHash, m.lastState, txs)
1429+
return m.executor.CreateBlock(height, lastSignature, extendedCommit, lastHeaderHash, m.lastState, txs, timestamp)
14041430
}
14051431

14061432
func (m *Manager) applyBlock(ctx context.Context, header *types.SignedHeader, data *types.Data) (types.State, *abci.ResponseFinalizeBlock, error) {

0 commit comments

Comments
 (0)