-
Notifications
You must be signed in to change notification settings - Fork 856
Try pipeline shape #2530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Try pipeline shape #2530
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #2530 +/- ##
==========================================
+ Coverage 43.21% 48.39% +5.17%
==========================================
Files 1579 389 -1190
Lines 138492 41686 -96806
==========================================
- Hits 59843 20172 -39671
+ Misses 73211 19474 -53737
+ Partials 5438 2040 -3398
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
| go func() { | ||
| defer e.wg.Done() | ||
| for { | ||
| select { | ||
| case <-e.ctx.Done(): | ||
| return | ||
| case blockWithCtx, ok := <-e.in: | ||
| if !ok { | ||
| return | ||
| } | ||
| executed, err := e.executeBlock(blockWithCtx) | ||
| if err != nil { | ||
| // Log error and continue - component should handle errors | ||
| blockWithCtx.Ctx.Logger().Error("execution failed", "error", err, "height", blockWithCtx.Block.Height) | ||
| continue | ||
| } | ||
| select { | ||
| case <-e.ctx.Done(): | ||
| return | ||
| case e.out <- executed: | ||
| } | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer f.wg.Done() | ||
| for { | ||
| select { | ||
| case <-f.ctx.Done(): | ||
| return | ||
| case executed, ok := <-f.in: | ||
| if !ok { | ||
| return | ||
| } | ||
| processed, err := f.finalizeBlock(executed) | ||
| if err != nil { | ||
| // Log error and continue - component should handle errors | ||
| // Note: we don't have direct access to logger here, so we'll need to handle this differently | ||
| // For now, we'll just continue | ||
| continue | ||
| } | ||
| select { | ||
| case <-f.ctx.Done(): | ||
| return | ||
| case f.out <- processed: | ||
| } | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer o.wg.Done() | ||
| for { | ||
| select { | ||
| case <-o.ctx.Done(): | ||
| return | ||
| case block, ok := <-o.in: | ||
| if !ok { | ||
| return | ||
| } | ||
| o.processBlock(block) | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer p.wg.Done() | ||
| for { | ||
| select { | ||
| case <-p.ctx.Done(): | ||
| return | ||
| case blockReq, ok := <-p.in: | ||
| if !ok { | ||
| return | ||
| } | ||
| preprocessed, err := p.preprocessBlock( | ||
| blockReq.Ctx, | ||
| blockReq.Req, | ||
| abci.LastCommitInfo(blockReq.LastCommit), | ||
| blockReq.Txs, | ||
| p.txDecoder, | ||
| p.helper, | ||
| ) | ||
| if err != nil { | ||
| // Log error and continue - component should handle errors | ||
| blockReq.Ctx.Logger().Error("preprocessing failed", "error", err) | ||
| continue | ||
| } | ||
| select { | ||
| case <-p.ctx.Done(): | ||
| return | ||
| case p.out <- preprocessed: | ||
| } | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer bp.wg.Done() | ||
| for { | ||
| select { | ||
| case <-bp.ctx.Done(): | ||
| close(bp.executorIn) | ||
| return | ||
| case block, ok := <-bp.orderedOut: | ||
| if !ok { | ||
| close(bp.executorIn) | ||
| return | ||
| } | ||
| // Convert PreprocessedBlock to PreprocessedBlockWithContext | ||
| blockWithCtx := &pipelinetypes.PreprocessedBlockWithContext{ | ||
| Block: block, | ||
| Ctx: block.Ctx, | ||
| Txs: block.Txs, | ||
| } | ||
| select { | ||
| case <-bp.ctx.Done(): | ||
| return | ||
| case bp.executorIn <- blockWithCtx: | ||
| } | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
| go func() { | ||
| defer close(ch) | ||
| var height int64 | ||
| for { | ||
| // bail on ctx err | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
| // generate txs like: txs := gen.GenerateN(1000) | ||
| loadTxs := gen.GenerateN(1000) | ||
| if len(loadTxs) == 0 { | ||
| continue | ||
| } | ||
|
|
||
| // Convert LoadTx to Cosmos SDK transaction bytes and filter by size | ||
| var totalBytes int64 | ||
| txRecords := make([]*abci.TxRecord, 0, len(loadTxs)) | ||
| for _, loadTx := range loadTxs { | ||
| if loadTx.EthTx == nil { | ||
| continue | ||
| } | ||
|
|
||
| // Convert Ethereum transaction to Cosmos SDK format | ||
| txData, err := ethtx.NewTxDataFromTx(loadTx.EthTx) | ||
| if err != nil { | ||
| logger.Error("failed to convert eth tx to tx data", "error", err) | ||
| continue | ||
| } | ||
|
|
||
| msg, err := evmtypes.NewMsgEVMTransaction(txData) | ||
| if err != nil { | ||
| logger.Error("failed to create msg evm transaction", "error", err) | ||
| continue | ||
| } | ||
|
|
||
| gasUsedEstimate := loadTx.EthTx.Gas() // Use gas limit from transaction | ||
|
|
||
| txBuilder := txConfig.NewTxBuilder() | ||
| if err = txBuilder.SetMsgs(msg); err != nil { | ||
| logger.Error("failed to set msgs", "error", err) | ||
| continue | ||
| } | ||
| txBuilder.SetGasEstimate(gasUsedEstimate) | ||
|
|
||
| txbz, encodeErr := txConfig.TxEncoder()(txBuilder.GetTx()) | ||
| if encodeErr != nil { | ||
| logger.Error("failed to encode tx", "error", encodeErr) | ||
| continue | ||
| } | ||
|
|
||
| // Filter by MaxTxBytes - stop adding transactions if we exceed the limit | ||
| txSize := int64(len(txbz)) | ||
| if totalBytes+txSize > maxTxBytes { | ||
| break | ||
| } | ||
| totalBytes += txSize | ||
|
|
||
| txRecords = append(txRecords, &abci.TxRecord{ | ||
| Action: abci.TxRecord_GENERATED, | ||
| Tx: txbz, | ||
| }) | ||
| } | ||
|
|
||
| if len(txRecords) == 0 { | ||
| continue | ||
| } | ||
|
|
||
| proposal := &abci.ResponsePrepareProposal{ | ||
| TxRecords: txRecords, | ||
| } | ||
|
|
||
| height++ | ||
| fmt.Println("Generating Block ", height) | ||
| select { | ||
| case ch <- proposal: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() |
Check notice
Code scanning / CodeQL
Spawning a Go routine Note
Describe your changes and provide context