Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ A distributed system for processing Ethereum execution layer data with support f
- **Dual Processing Modes**: Forwards (real-time) and backwards (backfill) processing
- **State Management**: Track processing progress with ClickHouse storage
- **Resource Management**: Memory-optimized chunked processing with leak prevention
- **Queue Prioritization**: Separate queues for forwards/backwards processing and verification
- **Queue Prioritization**: Separate queues for forwards/backwards processing

## Quick Start

Expand Down Expand Up @@ -43,9 +43,8 @@ A distributed system for processing Ethereum execution layer data with support f

### Processing Modes

- **Forwards**: Process new blocks as they arrive (priority 100)
- **Backwards**: Backfill historical blocks (priority 50)
- **Verification**: Validate processed data (priority 10)
- **Forwards**: Process new blocks as they arrive (priority 10)
- **Backwards**: Backfill historical blocks (priority 5)

### Queue Architecture

Expand All @@ -55,8 +54,6 @@ A distributed system for processing Ethereum execution layer data with support f
├─────────────────────────────────────────┤
│ 1. Forwards Processing (Priority 10)│
│ 2. Backwards Processing (Priority 5) │
│ 3. Forwards Verification (Priority 1) │
│ 4. Backwards Verification (Priority 1) │
└─────────────────────────────────────────┘
```

Expand Down Expand Up @@ -147,7 +144,6 @@ curl -X POST http://localhost:8080/api/v1/queue/blocks/transaction_structlog \
- Graceful handling of chain head scenarios
- Retry logic for transient failures
- Comprehensive logging and metrics
- Verification system with mismatch detection

## Monitoring

Expand Down
5 changes: 0 additions & 5 deletions pkg/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,4 @@ var (
Name: "execution_processor_retry_count_total",
Help: "Total number of retry attempts",
}, []string{"network", "processor", "reason"})

VerificationMismatchRate = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "execution_processor_verification_mismatch_total",
Help: "Total number of verification mismatches",
}, []string{"network", "processor", "transaction_hash"})
)
35 changes: 0 additions & 35 deletions pkg/processor/common/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ func ProcessQueue(processorName string) string {
return fmt.Sprintf("%s:process", processorName)
}

// VerifyQueue returns the verify queue name for a processor (deprecated - use mode-specific queues).
func VerifyQueue(processorName string) string {
return fmt.Sprintf("%s:verify", processorName)
}

// ProcessForwardsQueue returns the forwards process queue name for a processor.
func ProcessForwardsQueue(processorName string) string {
return fmt.Sprintf("%s:process:forwards", processorName)
Expand All @@ -22,16 +17,6 @@ func ProcessBackwardsQueue(processorName string) string {
return fmt.Sprintf("%s:process:backwards", processorName)
}

// VerifyForwardsQueue returns the forwards verify queue name for a processor.
func VerifyForwardsQueue(processorName string) string {
return fmt.Sprintf("%s:verify:forwards", processorName)
}

// VerifyBackwardsQueue returns the backwards verify queue name for a processor.
func VerifyBackwardsQueue(processorName string) string {
return fmt.Sprintf("%s:verify:backwards", processorName)
}

// PrefixedProcessForwardsQueue returns the forwards process queue name with prefix.
func PrefixedProcessForwardsQueue(processorName, prefix string) string {
queue := ProcessForwardsQueue(processorName)
Expand All @@ -51,23 +36,3 @@ func PrefixedProcessBackwardsQueue(processorName, prefix string) string {

return fmt.Sprintf("%s:%s", prefix, queue)
}

// PrefixedVerifyForwardsQueue returns the forwards verify queue name with prefix.
func PrefixedVerifyForwardsQueue(processorName, prefix string) string {
queue := VerifyForwardsQueue(processorName)
if prefix == "" {
return queue
}

return fmt.Sprintf("%s:%s", prefix, queue)
}

// PrefixedVerifyBackwardsQueue returns the backwards verify queue name with prefix.
func PrefixedVerifyBackwardsQueue(processorName, prefix string) string {
queue := VerifyBackwardsQueue(processorName)
if prefix == "" {
return queue
}

return fmt.Sprintf("%s:%s", prefix, queue)
}
2 changes: 0 additions & 2 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,12 +881,10 @@ func (m *Manager) shouldSkipBlockProcessing(ctx context.Context) (bool, string)
if m.config.Mode == c.FORWARDS_MODE {
queuesToCheck = []string{
c.PrefixedProcessForwardsQueue(name, m.redisPrefix),
c.PrefixedVerifyForwardsQueue(name, m.redisPrefix),
}
} else {
queuesToCheck = []string{
c.PrefixedProcessBackwardsQueue(name, m.redisPrefix),
c.PrefixedVerifyBackwardsQueue(name, m.redisPrefix),
}
}

Expand Down
35 changes: 0 additions & 35 deletions pkg/processor/transaction/simple/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc {
return map[string]asynq.HandlerFunc{
ProcessForwardsTaskType: p.handleProcessTask,
ProcessBackwardsTaskType: p.handleProcessTask,
VerifyForwardsTaskType: p.handleVerifyTask,
VerifyBackwardsTaskType: p.handleVerifyTask,
}
}

Expand Down Expand Up @@ -161,39 +159,6 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err
"success",
).Inc()

// Enqueue verify task
verifyPayload := &VerifyPayload{
BlockNumber: payload.BlockNumber,
NetworkID: payload.NetworkID,
NetworkName: payload.NetworkName,
InsertedCount: len(transactions),
}

var verifyTask *asynq.Task

var queue string

if payload.ProcessingMode == c.BACKWARDS_MODE {
verifyTask, err = NewVerifyBackwardsTask(verifyPayload)
queue = p.getVerifyBackwardsQueue()
} else {
verifyTask, err = NewVerifyForwardsTask(verifyPayload)
queue = p.getVerifyForwardsQueue()
}

if err != nil {
return fmt.Errorf("failed to create verify task: %w", err)
}

if err := p.EnqueueTask(ctx, verifyTask,
asynq.Queue(queue),
asynq.ProcessIn(10*time.Second),
); err != nil {
p.log.WithError(err).Warn("Failed to enqueue verify task")
}

common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, verifyTask.Type()).Inc()

p.log.WithFields(logrus.Fields{
"block_number": blockNumber.Uint64(),
"tx_count": len(transactions),
Expand Down
18 changes: 0 additions & 18 deletions pkg/processor/transaction/simple/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ func (p *Processor) GetQueues() []c.QueueInfo {
Name: c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix),
Priority: 5,
},
{
Name: c.PrefixedVerifyForwardsQueue(ProcessorName, p.redisPrefix),
Priority: 1,
},
{
Name: c.PrefixedVerifyBackwardsQueue(ProcessorName, p.redisPrefix),
Priority: 1,
},
}
}

Expand All @@ -140,13 +132,3 @@ func (p *Processor) getProcessForwardsQueue() string {
func (p *Processor) getProcessBackwardsQueue() string {
return c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix)
}

// getVerifyForwardsQueue returns the prefixed verify forwards queue name.
func (p *Processor) getVerifyForwardsQueue() string {
return c.PrefixedVerifyForwardsQueue(ProcessorName, p.redisPrefix)
}

// getVerifyBackwardsQueue returns the prefixed verify backwards queue name.
func (p *Processor) getVerifyBackwardsQueue() string {
return c.PrefixedVerifyBackwardsQueue(ProcessorName, p.redisPrefix)
}
44 changes: 0 additions & 44 deletions pkg/processor/transaction/simple/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ const (
ProcessForwardsTaskType = "transaction_simple_process_forwards"
// ProcessBackwardsTaskType is the task type for backwards processing.
ProcessBackwardsTaskType = "transaction_simple_process_backwards"
// VerifyForwardsTaskType is the task type for forwards verification.
VerifyForwardsTaskType = "transaction_simple_verify_forwards"
// VerifyBackwardsTaskType is the task type for backwards verification.
VerifyBackwardsTaskType = "transaction_simple_verify_backwards"
)

// ProcessPayload represents the payload for processing a block.
Expand All @@ -40,26 +36,6 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, p)
}

// VerifyPayload represents the payload for verifying a block.
//
//nolint:tagliatelle // snake_case required for backwards compatibility with queued tasks
type VerifyPayload struct {
BlockNumber big.Int `json:"block_number"`
NetworkID int32 `json:"network_id"`
NetworkName string `json:"network_name"`
InsertedCount int `json:"inserted_count"`
}

// MarshalBinary implements encoding.BinaryMarshaler.
func (v *VerifyPayload) MarshalBinary() ([]byte, error) {
return json.Marshal(v)
}

// UnmarshalBinary implements encoding.BinaryUnmarshaler.
func (v *VerifyPayload) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, v)
}

// NewProcessForwardsTask creates a new forwards process task.
func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) {
payload.ProcessingMode = c.FORWARDS_MODE
Expand All @@ -83,23 +59,3 @@ func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) {

return asynq.NewTask(ProcessBackwardsTaskType, data), nil
}

// NewVerifyForwardsTask creates a new forwards verify task.
func NewVerifyForwardsTask(payload *VerifyPayload) (*asynq.Task, error) {
data, err := payload.MarshalBinary()
if err != nil {
return nil, err
}

return asynq.NewTask(VerifyForwardsTaskType, data), nil
}

// NewVerifyBackwardsTask creates a new backwards verify task.
func NewVerifyBackwardsTask(payload *VerifyPayload) (*asynq.Task, error) {
data, err := payload.MarshalBinary()
if err != nil {
return nil, err
}

return asynq.NewTask(VerifyBackwardsTaskType, data), nil
}
Loading