Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ linters:
- linters:
- gosec
text: "Use of weak random number generator"
- linters:
- govet
path: x/mint/client/rest/grpc_query_test.go
text: "buildtag"

formatters:
enable:
Expand Down
209 changes: 194 additions & 15 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
aclutils "github.com/sei-protocol/sei-chain/aclmapping/utils"
"github.com/sei-protocol/sei-chain/app/antedecorators"
appparams "github.com/sei-protocol/sei-chain/app/params"
pipelinetypes "github.com/sei-protocol/sei-chain/app/pipeline/types"
"github.com/sei-protocol/sei-chain/app/upgrades"
v0upgrade "github.com/sei-protocol/sei-chain/app/upgrades/v0"
"github.com/sei-protocol/sei-chain/evmrpc"
Expand All @@ -123,6 +124,7 @@
"github.com/sei-protocol/sei-chain/x/evm"
evmante "github.com/sei-protocol/sei-chain/x/evm/ante"
"github.com/sei-protocol/sei-chain/x/evm/blocktest"
evmconfig "github.com/sei-protocol/sei-chain/x/evm/config"
evmkeeper "github.com/sei-protocol/sei-chain/x/evm/keeper"
"github.com/sei-protocol/sei-chain/x/evm/querier"
"github.com/sei-protocol/sei-chain/x/evm/replay"
Expand Down Expand Up @@ -260,8 +262,13 @@
// EmptyAclmOpts defines a type alias for a list of wasm options.
EmptyACLOpts []aclkeeper.Option
// EnableOCC allows tests to override default OCC enablement behavior
EnableOCC = true
EmptyAppOptions []AppOption
EnableOCC = true
// EnablePipelineProcessing enables the new pipeline-based block processing path
EnablePipelineProcessing = false
EmptyAppOptions []AppOption

// EnableBenchmarkMode enables benchmark mode using sei-load generator
EnableBenchmarkMode = false
)

var (
Expand Down Expand Up @@ -391,6 +398,13 @@
wsServerStartSignalSent bool

txPrioritizer sdk.TxPrioritizer

// Pipeline processing (if enabled)
blockPipeline *BlockPipeline

// Benchmark mode - generator channel for load testing
benchmarkGeneratorCh <-chan *abci.ResponsePrepareProposal
enableBenchmarkMode bool
}

type AppOption func(*App)
Expand Down Expand Up @@ -460,6 +474,7 @@
stateStore: stateStore,
httpServerStartSignal: make(chan struct{}, 1),
wsServerStartSignal: make(chan struct{}, 1),
enableBenchmarkMode: EnableBenchmarkMode,
}

for _, option := range appOptions {
Expand Down Expand Up @@ -915,7 +930,11 @@
app.SetAnteDepGenerator(anteDepGenerator)
app.SetMidBlocker(app.MidBlocker)
app.SetEndBlocker(app.EndBlocker)
app.SetPrepareProposalHandler(app.PrepareProposalHandler)
if app.enableBenchmarkMode {
app.SetPrepareProposalHandler(app.PrepareProposalGeneratorHandler)
} else {
app.SetPrepareProposalHandler(app.PrepareProposalHandler)
}
app.SetProcessProposalHandler(app.ProcessProposalHandler)
app.SetFinalizeBlocker(app.FinalizeBlocker)
app.SetInplaceTestnetInitializer(app.inplacetestnetInitializer)
Expand Down Expand Up @@ -965,6 +984,57 @@

app.txPrioritizer = NewSeiTxPrioritizer(logger, &app.EvmKeeper, &app.UpgradeKeeper, &app.ParamsKeeper).GetTxPriorityHint
app.SetTxPrioritizer(app.txPrioritizer)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Compile with Mock Balances

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker) (typecheck)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Run ETH Blocktests 3

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Run ETH Blocktests 4

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Run ETH Blocktests 0

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Run ETH Blocktests 2

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Go / Lint

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Run ETH Blocktests 1

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)

Check failure on line 987 in app/app.go

View workflow job for this annotation

GitHub Actions / Test sei-chain

app.BeginBlocker undefined (type *App has no field or method BeginBlocker)
// Initialize pipeline processing if enabled
if EnablePipelineProcessing {
logger.Info("Initializing pipeline-based block processing", "mode", "pipeline")

// Create helpers
preprocessorHelper := newAppExecutionHelper(app) // ExecutionHelper implements PreprocessorHelper
executionHelper := newAppExecutionHelper(app)
finalizerHelper := newAppFinalizerHelper(app)

// Initialize pipeline
app.blockPipeline = NewBlockPipeline(
context.Background(),
preprocessorHelper,
executionHelper,
finalizerHelper,
app.txDecoder,
app.BeginBlocker,
app.MidBlocker,
app.EndBlocker,
func() { app.WriteState() },
func() []byte { return app.GetWorkingHash() },
PreprocessBlock,
ExecutePreprocessedEVMTransaction,
ExecuteCosmosTransaction,
25, // preprocessorWorkers
1000, // bufferSize
1, // startSequence
)

// Start the pipeline
app.blockPipeline.Start(context.Background())
logger.Info("Pipeline processing initialized and started")
} else {
logger.Info("Using legacy block processing", "mode", "legacy")
}

// Initialize benchmark generator if enabled
if app.enableBenchmarkMode {
logger.Info("Initializing benchmark mode generator", "mode", "benchmark")
genCtx := context.Background()
// Get EVM chain ID from config mapping (not Cosmos chain ID)
evmChainID := evmconfig.GetEVMChainID(app.ChainID).Int64()
// Use a reasonable default for MaxTxBytes (20MB - conservative estimate)
// The generator will filter transactions to fit within this limit
// Tendermint will still validate the final proposal doesn't exceed req.MaxTxBytes
defaultMaxTxBytes := int64(20 * 1024 * 1024) // 20MB
app.benchmarkGeneratorCh = NewGeneratorCh(genCtx, app.encodingConfig.TxConfig, evmChainID, defaultMaxTxBytes, logger)
logger.Info("Benchmark generator initialized and started", "maxTxBytes", defaultMaxTxBytes)
}

return app
}

Expand Down Expand Up @@ -1104,6 +1174,25 @@
}, nil
}

func (app *App) PrepareProposalGeneratorHandler(_ sdk.Context, req *abci.RequestPrepareProposal) (*abci.ResponsePrepareProposal, error) {
// Pull from generator channel - the generator has already filtered transactions by size
select {
case proposal, ok := <-app.benchmarkGeneratorCh:
if proposal == nil || !ok {
// Channel closed or no proposal available, return empty (req.Txs will remain in mempool)
return &abci.ResponsePrepareProposal{
TxRecords: []*abci.TxRecord{},
}, nil
}
return proposal, nil
default:
// No proposal ready yet, return empty (req.Txs will remain in mempool)
return &abci.ResponsePrepareProposal{
TxRecords: []*abci.TxRecord{},
}, nil
}
}

func (app *App) GetOptimisticProcessingInfo() OptimisticProcessingInfo {
app.optimisticProcessingInfoMutex.RLock()
defer app.optimisticProcessingInfoMutex.RUnlock()
Expand Down Expand Up @@ -1194,6 +1283,68 @@
ctx.Logger().Info(fmt.Sprintf("FinalizeBlock took %dms", duration/time.Millisecond))
}()

// Route to pipeline if enabled
if EnablePipelineProcessing && app.blockPipeline != nil && app.blockPipeline.IsRunning() {
ctx.Logger().Info("Processing block via pipeline", "mode", "pipeline", "height", req.Height)

// Create block request
blockReq := &pipelinetypes.BlockRequest{
Ctx: ctx,
Req: req,
LastCommit: req.DecidedLastCommit,
Txs: req.Txs,
}

// Send to pipeline
select {
case app.blockPipeline.Input() <- blockReq:
case <-time.After(10 * time.Second):
ctx.Logger().Error("Timeout sending block to pipeline")
return nil, fmt.Errorf("timeout sending block to pipeline")
}

// Wait for result (pipeline processes sequentially, so we'll get the result for this block)
var processed *pipelinetypes.ProcessedBlock
select {
case processed = <-app.blockPipeline.Output():
case <-time.After(30 * time.Second):
ctx.Logger().Error("Timeout waiting for pipeline result")
return nil, fmt.Errorf("timeout waiting for pipeline result")
}

// Check if preprocessing failed
if processed.ExecutedBlock.PreprocessedBlock.PreprocessError != nil {
ctx.Logger().Error("Pipeline preprocessing failed", "error", processed.ExecutedBlock.PreprocessedBlock.PreprocessError, "height", req.Height)
return nil, fmt.Errorf("pipeline preprocessing failed: %w", processed.ExecutedBlock.PreprocessedBlock.PreprocessError)
}

// Verify we got the correct block
if processed.ExecutedBlock.PreprocessedBlock.Height != req.Height {
ctx.Logger().Error("Pipeline returned wrong block", "expected", req.Height, "got", processed.ExecutedBlock.PreprocessedBlock.Height)
return nil, fmt.Errorf("pipeline returned wrong block height")
}

// Convert pipeline results to expected format
executed := processed.ExecutedBlock
txResults := make([]*abci.ExecTxResult, len(executed.TxResults))
for i, txResult := range executed.TxResults {
txResults[i] = convertTransactionResultToExecTxResult(txResult)
}

app.SetDeliverStateToCommit()
if app.EvmKeeper.EthReplayConfig.Enabled || app.EvmKeeper.EthBlockTestConfig.Enabled {
return &abci.ResponseFinalizeBlock{}, nil
}
cms := app.WriteState()
app.LightInvarianceChecks(cms, app.lightInvarianceConfig)
appHash := app.GetWorkingHash()
resp := app.getFinalizeBlockResponse(appHash, executed.Events, txResults, executed.EndBlockResp)
return &resp, nil
}

// Legacy processing path
ctx.Logger().Info("Processing block via legacy path", "mode", "legacy")

// Get all optimistic processing info atomically
app.optimisticProcessingInfoMutex.RLock()
completion := app.optimisticProcessingInfo.Completion
Expand Down Expand Up @@ -1243,6 +1394,28 @@
return &resp, nil
}

// convertTransactionResultToExecTxResult converts a TransactionResult to ExecTxResult
func convertTransactionResultToExecTxResult(txResult *pipelinetypes.TransactionResult) *abci.ExecTxResult {
result := &abci.ExecTxResult{
Code: txResult.Code,
Data: txResult.ReturnData,
Log: txResult.Log,
GasUsed: txResult.GasUsed,
GasWanted: txResult.GasUsed, // Use GasUsed as GasWanted for now
Events: txResult.Events,
Codespace: txResult.Codespace,
}

// Add EVM-specific info if there's a VM error
if txResult.VmError != "" {
result.EvmTxInfo = &abci.EvmTxInfo{
VmError: txResult.VmError,
}
}

return result
}

func (app *App) DeliverTxWithResult(ctx sdk.Context, tx []byte, typedTx sdk.Tx) *abci.ExecTxResult {
deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
Expand Down Expand Up @@ -1776,16 +1949,10 @@
if app.EvmKeeper.EthReplayConfig.Enabled || app.EvmKeeper.EthBlockTestConfig.Enabled {
return abci.ResponseFinalizeBlock{}
}
return abci.ResponseFinalizeBlock{
Events: events,
TxResults: txResults,
ValidatorUpdates: utils.Map(endBlockResp.ValidatorUpdates, func(v abci.ValidatorUpdate) abci.ValidatorUpdate {
return abci.ValidatorUpdate{
PubKey: v.PubKey,
Power: v.Power,
}
}),
ConsensusParamUpdates: &tmproto.ConsensusParams{

var consensusParamUpdates *tmproto.ConsensusParams
if endBlockResp.ConsensusParamUpdates != nil {
consensusParamUpdates = &tmproto.ConsensusParams{
Block: &tmproto.BlockParams{
MaxBytes: endBlockResp.ConsensusParamUpdates.Block.MaxBytes,
MaxGas: endBlockResp.ConsensusParamUpdates.Block.MaxGas,
Expand All @@ -1803,8 +1970,20 @@
Version: &tmproto.VersionParams{
AppVersion: endBlockResp.ConsensusParamUpdates.Version.AppVersion,
},
},
AppHash: appHash,
}
}

return abci.ResponseFinalizeBlock{
Events: events,
TxResults: txResults,
ValidatorUpdates: utils.Map(endBlockResp.ValidatorUpdates, func(v abci.ValidatorUpdate) abci.ValidatorUpdate {
return abci.ValidatorUpdate{
PubKey: v.PubKey,
Power: v.Power,
}
}),
ConsensusParamUpdates: consensusParamUpdates,
AppHash: appHash,
}
}

Expand Down
Loading
Loading