Skip to content

Commit 8d76997

Browse files
committed
replay blocks to sync app and rollup
1 parent 24a9fe9 commit 8d76997

File tree

5 files changed

+197
-3
lines changed

5 files changed

+197
-3
lines changed

block/manager.go

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func NewManager(
157157
genesis *cmtypes.GenesisDoc,
158158
store store.Store,
159159
mempool mempool.Mempool,
160-
proxyApp proxy.AppConnConsensus,
160+
proxyApp proxy.AppConns,
161161
dalc *da.DAClient,
162162
eventBus *cmtypes.EventBus,
163163
logger log.Logger,
@@ -217,7 +217,7 @@ func NewManager(
217217
// allow buffer for the block header and protocol encoding
218218
maxBlobSize -= blockProtocolOverhead
219219

220-
exec := state.NewBlockExecutor(proposerAddress, genesis.ChainID, mempool, proxyApp, eventBus, maxBlobSize, logger, execMetrics, valSet.Hash())
220+
exec := state.NewBlockExecutor(proposerAddress, genesis.ChainID, mempool, proxyApp.Consensus(), eventBus, maxBlobSize, logger, execMetrics, valSet.Hash())
221221
if s.LastBlockHeight+1 == uint64(genesis.InitialHeight) {
222222
res, err := exec.InitChain(genesis)
223223
if err != nil {
@@ -268,6 +268,14 @@ func NewManager(
268268
metrics: seqMetrics,
269269
isProposer: isProposer,
270270
}
271+
272+
if agg.conf.Replay {
273+
// Handshake to ensure that app and rollup are in sync
274+
if err := agg.Handshake(context.Background(), proxyApp); err != nil {
275+
return nil, err
276+
}
277+
}
278+
271279
return agg, nil
272280
}
273281

@@ -1166,3 +1174,124 @@ func updateState(s *types.State, res *abci.ResponseInitChain) {
11661174
s.LastResultsHash = merkle.HashFromByteSlices(nil)
11671175

11681176
}
1177+
1178+
// Handshake performs the ABCI handshake with the application.
1179+
func (m *Manager) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
1180+
// Handshake is done via ABCI Info on the query conn.
1181+
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
1182+
if err != nil {
1183+
return fmt.Errorf("error calling Info: %v", err)
1184+
}
1185+
1186+
blockHeight := res.LastBlockHeight
1187+
if blockHeight < 0 {
1188+
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
1189+
}
1190+
appHash := res.LastBlockAppHash
1191+
1192+
m.logger.Info("ABCI Handshake App Info",
1193+
"height", blockHeight,
1194+
"hash", fmt.Sprintf("%X", appHash),
1195+
"software-version", res.Version,
1196+
"protocol-version", res.AppVersion,
1197+
)
1198+
1199+
// Replay blocks up to the latest in the blockstore.
1200+
appHash, err = m.ReplayBlocks(ctx, appHash, blockHeight, proxyApp)
1201+
if err != nil {
1202+
return fmt.Errorf("error on replay: %v", err)
1203+
}
1204+
1205+
m.logger.Info("Completed ABCI Handshake - CometBFT and App are synced",
1206+
"appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
1207+
1208+
// TODO: (on restart) replay mempool
1209+
1210+
return nil
1211+
}
1212+
1213+
// ReplayBlocks replays blocks from the last state to the app's last block height.
1214+
func (m *Manager) ReplayBlocks(
1215+
ctx context.Context,
1216+
appHash []byte,
1217+
appBlockHeight int64,
1218+
proxyApp proxy.AppConns,
1219+
) ([]byte, error) {
1220+
state := m.lastState
1221+
stateBlockHeight := m.lastState.LastBlockHeight
1222+
m.logger.Info(
1223+
"ABCI Replay Blocks",
1224+
"appHeight",
1225+
appBlockHeight,
1226+
"stateHeight",
1227+
stateBlockHeight)
1228+
1229+
if appBlockHeight < int64(stateBlockHeight) {
1230+
// the app is behind, so replay blocks
1231+
return m.replayBlocks(ctx, state, proxyApp, uint64(appBlockHeight), stateBlockHeight)
1232+
} else if appBlockHeight == int64(stateBlockHeight) {
1233+
// We're good!
1234+
assertAppHashEqualsOneFromState(appHash, state)
1235+
return appHash, nil
1236+
}
1237+
1238+
panic(fmt.Sprintf("uncovered case! app height higher than state height, possibly need app rollback; appHeight: %d, stateHeight: %d", appBlockHeight, stateBlockHeight))
1239+
}
1240+
1241+
func (m *Manager) replayBlocks(
1242+
ctx context.Context,
1243+
state types.State,
1244+
proxyApp proxy.AppConns,
1245+
appBlockHeight,
1246+
stateBlockHeight uint64,
1247+
) ([]byte, error) {
1248+
var appHash []byte
1249+
finalBlock := stateBlockHeight
1250+
firstBlock := appBlockHeight + 1
1251+
if firstBlock == 1 {
1252+
firstBlock = state.InitialHeight
1253+
}
1254+
for i := firstBlock; i <= finalBlock; i++ {
1255+
select {
1256+
case <-ctx.Done():
1257+
return nil, ctx.Err()
1258+
default:
1259+
}
1260+
1261+
m.logger.Info("Applying block", "height", i)
1262+
block, err := m.store.GetBlock(ctx, i)
1263+
if err != nil {
1264+
return nil, fmt.Errorf("failed to get block data for height %d: %w", i, err)
1265+
}
1266+
appHash, err = m.executor.ExecCommitBlock(proxyApp.Consensus(), block, m.logger, state, m.store)
1267+
if err != nil {
1268+
return nil, err
1269+
}
1270+
// Extra check to ensure the app was not changed in a way it shouldn't have.
1271+
if len(appHash) > 0 {
1272+
assertAppHashEqualsOneFromBlock(appHash, block)
1273+
}
1274+
}
1275+
1276+
assertAppHashEqualsOneFromState(appHash, state)
1277+
return appHash, nil
1278+
}
1279+
1280+
func assertAppHashEqualsOneFromBlock(appHash []byte, block *types.Block) {
1281+
if !bytes.Equal(appHash, block.SignedHeader.AppHash) {
1282+
panic(fmt.Sprintf(`block.AppHash does not match AppHash after replay. Got %X, expected %X.
1283+
Block: %v
1284+
`,
1285+
appHash, block.SignedHeader.AppHash, block))
1286+
}
1287+
}
1288+
1289+
func assertAppHashEqualsOneFromState(appHash []byte, state types.State) {
1290+
if !bytes.Equal(appHash, state.AppHash) {
1291+
panic(fmt.Sprintf(`state.AppHash does not match AppHash after replay. Got
1292+
%X, expected %X.
1293+
State: %v
1294+
Did you reset CometBFT without resetting your application's data?`,
1295+
appHash, state.AppHash, state))
1296+
}
1297+
}

config/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
FlagLazyAggregator = "rollkit.lazy_aggregator"
3737
// FlagMaxPendingBlocks is a flag to pause aggregator in case of large number of blocks pending DA submission
3838
FlagMaxPendingBlocks = "rollkit.max_pending_blocks"
39+
// FlagReplay is a flag for replaying blocks
40+
FlagReplay = "rollkit.replay"
3941
)
4042

4143
// NodeConfig stores Rollkit node configuration.
@@ -82,6 +84,8 @@ type BlockManagerConfig struct {
8284
// LazyBlockTime defines how often new blocks are produced in lazy mode
8385
// even if there are no transactions
8486
LazyBlockTime time.Duration `mapstructure:"lazy_block_time"`
87+
// Replay defines whether to replay blocks
88+
Replay bool `mapstructure:"replay"`
8589
}
8690

8791
// GetNodeConfig translates Tendermint's configuration into Rollkit configuration.
@@ -129,6 +133,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
129133
nc.TrustedHash = v.GetString(FlagTrustedHash)
130134
nc.TrustedHash = v.GetString(FlagTrustedHash)
131135
nc.MaxPendingBlocks = v.GetUint64(FlagMaxPendingBlocks)
136+
nc.Replay = v.GetBool(FlagReplay)
132137
return nil
133138
}
134139

@@ -150,4 +155,5 @@ func AddFlags(cmd *cobra.Command) {
150155
cmd.Flags().Bool(FlagLight, def.Light, "run light client")
151156
cmd.Flags().String(FlagTrustedHash, def.TrustedHash, "initial trusted hash to start the header exchange service")
152157
cmd.Flags().Uint64(FlagMaxPendingBlocks, def.MaxPendingBlocks, "limit of blocks pending DA submission (0 for no limit)")
158+
cmd.Flags().Bool(FlagReplay, def.Replay, "replay blocks")
153159
}

config/defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ var DefaultNodeConfig = NodeConfig{
2626
BlockTime: 1 * time.Second,
2727
DABlockTime: 15 * time.Second,
2828
LazyBlockTime: 60 * time.Second,
29+
Replay: false,
2930
},
3031
DAAddress: "http://localhost:26658",
3132
DAGasPrice: -1,

node/full.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func initBlockSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfi
264264
}
265265

266266
func initBlockManager(signingKey crypto.PrivKey, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, store store.Store, mempool mempool.Mempool, proxyApp proxy.AppConns, dalc *da.DAClient, eventBus *cmtypes.EventBus, logger log.Logger, blockSyncService *block.BlockSyncService, seqMetrics *block.Metrics, execMetrics *state.Metrics) (*block.Manager, error) {
267-
blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"), blockSyncService.BlockStore(), seqMetrics, execMetrics)
267+
blockManager, err := block.NewManager(signingKey, nodeConfig.BlockManagerConfig, genesis, store, mempool, proxyApp, dalc, eventBus, logger.With("module", "BlockManager"), blockSyncService.BlockStore(), seqMetrics, execMetrics)
268268
if err != nil {
269269
return nil, fmt.Errorf("error while initializing BlockManager: %w", err)
270270
}

state/executor.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
cmtypes "github.com/cometbft/cometbft/types"
1515

1616
"github.com/rollkit/rollkit/mempool"
17+
"github.com/rollkit/rollkit/store"
1718
"github.com/rollkit/rollkit/third_party/log"
1819
"github.com/rollkit/rollkit/types"
1920
abciconv "github.com/rollkit/rollkit/types/abci"
@@ -505,3 +506,60 @@ func fromRollkitTxs(rollkitTxs types.Txs) cmtypes.Txs {
505506
}
506507
return txs
507508
}
509+
510+
// ----------------------------------------------------------------------------------------------------
511+
// Execute block without state. TODO: eliminate
512+
513+
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
514+
// It returns the application root hash (result of abci.Commit).
515+
func (e *BlockExecutor) ExecCommitBlock(
516+
appConnConsensus proxy.AppConnConsensus,
517+
block *types.Block,
518+
logger log.Logger,
519+
state types.State,
520+
store store.Store,
521+
) ([]byte, error) {
522+
abciHeader, err := abciconv.ToABCIHeaderPB(&block.SignedHeader.Header)
523+
if err != nil {
524+
return nil, err
525+
}
526+
abciHeader.ChainID = e.chainID
527+
abciBlock, err := abciconv.ToABCIBlock(block)
528+
if err != nil {
529+
return nil, err
530+
}
531+
resp, err := e.proxyApp.FinalizeBlock(context.TODO(), &abci.RequestFinalizeBlock{
532+
Hash: block.Hash(),
533+
NextValidatorsHash: e.valsetHash,
534+
ProposerAddress: abciHeader.ProposerAddress,
535+
Height: abciHeader.Height,
536+
Time: abciHeader.Time,
537+
DecidedLastCommit: abci.CommitInfo{
538+
Round: 0,
539+
Votes: nil,
540+
},
541+
Misbehavior: abciBlock.Evidence.Evidence.ToABCI(),
542+
Txs: abciBlock.Txs.ToSliceOfBytes(),
543+
})
544+
if err != nil {
545+
logger.Error("Error in proxyAppConn.FinalizeBlock", "err", err)
546+
return nil, err
547+
}
548+
549+
// Assert that the application correctly returned tx results for each of the transactions provided in the block
550+
if len(block.Data.Txs) != len(resp.TxResults) {
551+
return nil, fmt.Errorf("expected tx results length to match size of transactions in block. Expected %d, got %d", len(block.Data.Txs), len(resp.TxResults))
552+
}
553+
554+
logger.Info("Executed block", "height", block.Height, "app_hash", fmt.Sprintf("%X", resp.AppHash))
555+
556+
// Commit block
557+
_, err = e.proxyApp.Commit(context.TODO())
558+
if err != nil {
559+
logger.Error("Client error during proxyAppConn.Commit", "err", err)
560+
return nil, err
561+
}
562+
563+
// ResponseCommit has no error or log
564+
return resp.AppHash, nil
565+
}

0 commit comments

Comments
 (0)