Skip to content

Commit 0aecf84

Browse files
fridrik01calbera
andauthored
Remove orphaned blobs on started after incomplete block finalization (#2988)
Co-authored-by: Cal Bera <[email protected]>
1 parent 7ed55d1 commit 0aecf84

File tree

9 files changed

+201
-0
lines changed

9 files changed

+201
-0
lines changed

beacon/blockchain/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ type BlockchainI interface {
163163
sdk.Context,
164164
*ctypes.BeaconBlock,
165165
) error
166+
PruneOrphanedBlobs(lastBlockHeight int64) error
166167
}
167168

168169
// BlobProcessor is the interface for the blobs processor.

beacon/blockchain/service.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package blockchain
2222

2323
import (
2424
"context"
25+
"fmt"
2526
"sync"
2627
"sync/atomic"
2728

@@ -127,3 +128,34 @@ func (s *Service) Stop() error {
127128
func (s *Service) StorageBackend() StorageBackend {
128129
return s.storageBackend
129130
}
131+
132+
// PruneOrphanedBlobs removes any orphaned blob sidecars that may exist from incomplete block finalization.
133+
func (s *Service) PruneOrphanedBlobs(lastBlockHeight int64) error {
134+
orphanedSlot := math.Slot(lastBlockHeight + 1) // #nosec G115
135+
136+
// Check if any blob sidecars exist at the potentially orphaned slot
137+
sidecars, err := s.storageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
138+
if err != nil {
139+
return fmt.Errorf("failed to read blob sidecars at slot %d: %w", orphanedSlot, err)
140+
}
141+
142+
// If no sidecars exist at this slot, nothing to clean up
143+
if len(sidecars) == 0 {
144+
return nil
145+
}
146+
147+
// Sidecars exist at this slot - they are orphaned, so delete them
148+
s.logger.Warn("Found orphaned blob sidecars from incomplete block finalization, removing",
149+
"slot", orphanedSlot.Base10(),
150+
"num_sidecars", len(sidecars),
151+
)
152+
153+
err = s.storageBackend.AvailabilityStore().DeleteBlobSidecars(orphanedSlot)
154+
if err != nil {
155+
return fmt.Errorf("failed to delete orphaned sidecars at slot %d: %w", orphanedSlot, err)
156+
}
157+
158+
s.logger.Info("Successfully removed orphaned blob sidecars", "slot", orphanedSlot.Base10())
159+
160+
return nil
161+
}

consensus/cometbft/service/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,11 @@ func NewService(
182182
}
183183
}
184184

185+
// Clean up any orphaned blob sidecars from incomplete block finalization.
186+
if err = s.Blockchain.PruneOrphanedBlobs(lastBlockHeight); err != nil {
187+
panic(fmt.Errorf("failed pruning orphaned blobs: %w", err))
188+
}
189+
185190
return s
186191
}
187192

da/store/interfaces.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,7 @@ type IndexDB interface {
3434
// exist in the DB for any reason (pruned, invalid index), an empty list is
3535
// returned with no error.
3636
GetByIndex(index uint64) ([][]byte, error)
37+
38+
// DeleteByIndex removes all entries at the specified index
39+
DeleteByIndex(index uint64) error
3740
}

da/store/store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,8 @@ func (s *Store) Persist(sidecars types.BlobSidecars) error {
114114
)
115115
return nil
116116
}
117+
118+
// DeleteBlobSidecars removes all blob sidecars for the specified slot.
119+
func (s *Store) DeleteBlobSidecars(slot math.Slot) error {
120+
return s.IndexDB.DeleteByIndex(slot.Unwrap())
121+
}

storage/filedb/range_db.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,20 @@ func (db *RangeDB) Prune(start, end uint64) error {
153153
return err
154154
}
155155

156+
// DeleteByIndex removes all entries at the specified index.
157+
func (db *RangeDB) DeleteByIndex(index uint64) error {
158+
db.rwMu.Lock()
159+
defer db.rwMu.Unlock()
160+
161+
path := fmt.Sprintf(pathFormat, index)
162+
if err := db.coreDB.fs.RemoveAll(path); err != nil && !os.IsNotExist(err) {
163+
return fmt.Errorf("RangeDB DeleteByIndex failed to remove index %d: %w", index, err)
164+
}
165+
166+
// Note: We intentionally do not update lowerBoundIndex here.
167+
return nil
168+
}
169+
156170
// GetByIndex takes the database index and returns all associated entries,
157171
// expecting database keys to follow the prefix() format. If index does not
158172
// exist in the DB for any reason (pruned, invalid index), an empty list is

storage/filedb/range_db_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,40 @@ func TestRangeDB_Invariants(t *testing.T) {
354354
}
355355
}
356356

357+
// =========================== DELETE BY INDEX ================================
358+
359+
// TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound verifies the critical
360+
// invariant that DeleteByIndex does not modify lowerBoundIndex, unlike Prune.
361+
func TestRangeDB_DeleteByIndex_DoesNotAffectLowerBound(t *testing.T) {
362+
t.Parallel()
363+
rdb := file.NewRangeDB(newTestFDB("/tmp/testdb-deletebyindex"))
364+
365+
// Populate indexes 1-10
366+
require.NoError(t, populateTestDB(rdb, 1, 10))
367+
368+
// Prune indexes 1-5, which sets lowerBoundIndex to 5
369+
require.NoError(t, rdb.Prune(1, 5))
370+
lowerBoundBefore := getFirstNonNilIndex(rdb)
371+
require.Equal(t, uint64(5), lowerBoundBefore, "lowerBoundIndex should be 5 after pruning")
372+
373+
// Delete index 7 using DeleteByIndex
374+
require.NoError(t, rdb.DeleteByIndex(7))
375+
exists, err := rdb.Has(7, []byte("key"))
376+
require.NoError(t, err)
377+
require.False(t, exists, "index 7 should be deleted")
378+
379+
// Verify lowerBoundIndex was NOT changed
380+
lowerBoundAfter := getFirstNonNilIndex(rdb)
381+
require.Equal(t, lowerBoundBefore, lowerBoundAfter, "DeleteByIndex must not modify lowerBoundIndex")
382+
383+
// Prune again to verify DeleteByIndex didn't break the pruning mechanism
384+
require.NoError(t, rdb.Prune(5, 8))
385+
386+
// Verify the second prune worked correctly
387+
lowerBoundAfter = getFirstNonNilIndex(rdb)
388+
require.Equal(t, uint64(8), lowerBoundAfter, "second Prune should update lowerBoundIndex to 8")
389+
}
390+
357391
// =============================== HELPERS ==================================
358392

359393
// newTestFDB returns a new file DB instance with an in-memory filesystem.
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
//go:build simulated
2+
3+
// SPDX-License-Identifier: BUSL-1.1
4+
//
5+
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
6+
// Use of this software is governed by the Business Source License included
7+
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
8+
//
9+
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
10+
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
11+
// VERSIONS OF THE LICENSED WORK.
12+
//
13+
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
14+
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
15+
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
16+
//
17+
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
18+
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
19+
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
20+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
21+
// TITLE.
22+
23+
package simulated_test
24+
25+
import (
26+
"time"
27+
28+
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
29+
"github.com/berachain/beacon-kit/da/kzg"
30+
datypes "github.com/berachain/beacon-kit/da/types"
31+
"github.com/berachain/beacon-kit/primitives/common"
32+
"github.com/berachain/beacon-kit/primitives/crypto"
33+
"github.com/berachain/beacon-kit/primitives/eip4844"
34+
"github.com/berachain/beacon-kit/primitives/math"
35+
"github.com/berachain/beacon-kit/testing/simulated"
36+
"github.com/stretchr/testify/require"
37+
)
38+
39+
// TestOrphanedBlobCleanup tests that orphaned blob sidecars are properly cleaned up on node restart.
40+
// This simulates the scenario where sidecars are saved to disk but the block finalization fails.
41+
func (s *SimulatedSuite) TestOrphanedBlobCleanup() {
42+
// Initialize chain and move forward two blocks.
43+
s.InitializeChain(s.T())
44+
nodeAddress, err := s.SimComet.GetNodeAddress()
45+
s.Require().NoError(err)
46+
s.SimComet.Comet.SetNodeAddress(nodeAddress)
47+
48+
_, _, proposalTime := s.MoveChainToHeight(s.T(), 1, 2, nodeAddress, time.Now())
49+
50+
// Get the last committed block height.
51+
lastBlockHeight := s.SimComet.Comet.CommitMultiStore().LastCommitID().Version
52+
orphanedSlot := math.Slot(lastBlockHeight + 1)
53+
54+
// Create and persist orphaned blob sidecars.
55+
// This simulates FinalizeSidecars succeeding but finalizeBeaconBlock failing.
56+
orphanedSidecars := createOrphanedSidecars(s.T(), orphanedSlot, s.TestNode.KZGVerifier)
57+
err = s.TestNode.StorageBackend.AvailabilityStore().Persist(orphanedSidecars)
58+
s.Require().NoError(err)
59+
60+
// Verify orphaned blobs exist.
61+
sidecars, err := s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
62+
s.Require().NoError(err)
63+
s.Require().Len(sidecars, 1)
64+
65+
// Simulate node restart by calling PruneOrphanedBlobs.
66+
err = s.TestNode.Blockchain.PruneOrphanedBlobs(lastBlockHeight)
67+
s.Require().NoError(err)
68+
69+
// Verify orphaned blobs were cleaned up.
70+
sidecars, err = s.TestNode.StorageBackend.AvailabilityStore().GetBlobSidecars(orphanedSlot)
71+
s.Require().NoError(err)
72+
s.Require().Empty(sidecars)
73+
74+
// Verify chain continues normally.
75+
proposals, _, _ := s.MoveChainToHeight(s.T(), 3, 1, nodeAddress, proposalTime)
76+
s.Require().Len(proposals, 1)
77+
}
78+
79+
// createOrphanedSidecars creates fake blob sidecars for testing orphaned blob cleanup.
80+
func createOrphanedSidecars(
81+
t require.TestingT,
82+
slot math.Slot,
83+
verifier kzg.BlobProofVerifier,
84+
) datypes.BlobSidecars {
85+
blobs := []*eip4844.Blob{{1, 2, 3}}
86+
proofs, commitments := simulated.GetProofAndCommitmentsForBlobs(require.New(t), blobs, verifier)
87+
88+
sidecars := make(datypes.BlobSidecars, len(blobs))
89+
for i := range blobs {
90+
sidecars[i] = datypes.BuildBlobSidecar(
91+
math.U64(i),
92+
&ctypes.SignedBeaconBlockHeader{
93+
Header: &ctypes.BeaconBlockHeader{Slot: slot},
94+
Signature: crypto.BLSSignature{},
95+
},
96+
blobs[i],
97+
commitments[i],
98+
proofs[i],
99+
make([]common.Root, ctypes.KZGInclusionProofDepth),
100+
)
101+
}
102+
return sidecars
103+
}

testing/simulated/testnode.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ type ValidatorAPI interface {
7070
type TestNode struct {
7171
nodetypes.Node
7272
StorageBackend blockchain.StorageBackend
73+
Blockchain *blockchain.Service
7374
ChainSpec chain.Spec
7475
APIBackend ValidatorAPI
7576
SimComet *SimComet
@@ -128,6 +129,7 @@ func buildNode(
128129
simComet *SimComet
129130
config *config.Config
130131
storageBackend blockchain.StorageBackend
132+
blockchain *blockchain.Service
131133
chainSpec chain.Spec
132134
engineClient *client.EngineClient
133135
stateProcessor *core.StateProcessor
@@ -153,6 +155,7 @@ func buildNode(
153155
&simComet,
154156
&config,
155157
&storageBackend,
158+
&blockchain,
156159
&chainSpec,
157160
&engineClient,
158161
&stateProcessor,
@@ -172,6 +175,7 @@ func buildNode(
172175
return TestNode{
173176
Node: beaconNode,
174177
StorageBackend: storageBackend,
178+
Blockchain: blockchain,
175179
ChainSpec: chainSpec,
176180
APIBackend: apiServer.GetBeaconHandler(),
177181
SimComet: simComet,

0 commit comments

Comments
 (0)