@@ -15,6 +15,7 @@ import (
1515 "github.com/ethereum-optimism/optimism/op-service/retry"
1616 "github.com/ethereum/go-ethereum/common"
1717 "github.com/ethereum/go-ethereum/log"
18+ "github.com/google/uuid"
1819)
1920
2021const (
@@ -37,6 +38,7 @@ func NewArchiver(l log.Logger, cfg flags.ArchiverConfig, dataStoreClient storage
3738 metrics : m ,
3839 beaconClient : client ,
3940 stopCh : make (chan struct {}),
41+ id : uuid .New ().String (),
4042 }, nil
4143}
4244
@@ -47,6 +49,7 @@ type Archiver struct {
4749 beaconClient BeaconClient
4850 metrics metrics.Metricer
4951 stopCh chan struct {}
52+ id string
5053}
5154
5255// Start starts archiving blobs. It begins polling the beacon node for the latest blocks and persisting blobs for
@@ -63,6 +66,8 @@ func (a *Archiver) Start(ctx context.Context) error {
6366 return err
6467 }
6568
69+ a .waitObtainStorageLock (ctx )
70+
6671 go a .backfillBlobs (ctx , currentBlock )
6772
6873 return a .trackLatestBlocks (ctx )
@@ -119,7 +124,7 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
119124 }
120125
121126 // The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
122- err = a .dataStoreClient .Write (ctx , blobData )
127+ err = a .dataStoreClient .WriteBlob (ctx , blobData )
123128
124129 if err != nil {
125130 a .log .Error ("failed to write blob" , "err" , err )
@@ -131,36 +136,123 @@ func (a *Archiver) persistBlobsForBlockToS3(ctx context.Context, blockIdentifier
131136 return currentHeader .Data , exists , nil
132137}
133138
139+ const LockUpdateInterval = 10 * time .Second
140+ const LockTimeout = int64 (20 ) // 20 seconds
141+ var ObtainLockRetryInterval = 10 * time .Second
142+
143+ func (a * Archiver ) waitObtainStorageLock (ctx context.Context ) {
144+ lockfile , err := a .dataStoreClient .ReadLockfile (ctx )
145+ if err != nil {
146+ a .log .Crit ("failed to read lockfile" , "err" , err )
147+ }
148+
149+ currentTime := time .Now ().Unix ()
150+ emptyLockfile := storage.Lockfile {}
151+ if lockfile != emptyLockfile {
152+ for lockfile .ArchiverId != a .id && lockfile .Timestamp + LockTimeout > currentTime {
153+ // Loop until the timestamp read from storage is expired
154+ a .log .Info ("waiting for storage lock timestamp to expire" ,
155+ "timestamp" , strconv .FormatInt (lockfile .Timestamp , 10 ),
156+ "currentTime" , strconv .FormatInt (currentTime , 10 ),
157+ )
158+ time .Sleep (ObtainLockRetryInterval )
159+ lockfile , err = a .dataStoreClient .ReadLockfile (ctx )
160+ if err != nil {
161+ a .log .Crit ("failed to read lockfile" , "err" , err )
162+ }
163+ currentTime = time .Now ().Unix ()
164+ }
165+ }
166+
167+ err = a .dataStoreClient .WriteLockfile (ctx , storage.Lockfile {ArchiverId : a .id , Timestamp : currentTime })
168+ if err != nil {
169+ a .log .Crit ("failed to write to lockfile: %v" , err )
170+ }
171+ a .log .Info ("obtained storage lock" )
172+
173+ go func () {
174+ // Retain storage lock by continually updating the stored timestamp
175+ ticker := time .NewTicker (LockUpdateInterval )
176+ for {
177+ select {
178+ case <- ticker .C :
179+ currentTime := time .Now ().Unix ()
180+ err := a .dataStoreClient .WriteLockfile (ctx , storage.Lockfile {ArchiverId : a .id , Timestamp : currentTime })
181+ if err != nil {
182+ a .log .Error ("failed to update lockfile timestamp" , "err" , err )
183+ }
184+ case <- ctx .Done ():
185+ ticker .Stop ()
186+ return
187+ }
188+ }
189+ }()
190+ }
191+
134192// backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
135193// to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
136194// If an error is encountered persisting a block, it will retry after waiting for a period of time.
137195func (a * Archiver ) backfillBlobs (ctx context.Context , latest * v1.BeaconBlockHeader ) {
138- current , alreadyExists , err := latest , false , error (nil )
139-
140- defer func () {
141- a .log .Info ("backfill complete" , "endHash" , current .Root .String (), "startHash" , latest .Root .String ())
142- }()
196+ // Add backfill process that starts at latest slot, then loop through all backfill processes
197+ backfillProcesses , err := a .dataStoreClient .ReadBackfillProcesses (ctx )
198+ if err != nil {
199+ a .log .Crit ("failed to read backfill_processes" , "err" , err )
200+ }
201+ backfillProcesses [common .Hash (latest .Root )] = storage.BackfillProcess {Start : * latest , Current : * latest }
202+ a .dataStoreClient .WriteBackfillProcesses (ctx , backfillProcesses )
203+
204+ backfillLoop := func (start * v1.BeaconBlockHeader , current * v1.BeaconBlockHeader ) {
205+ curr , alreadyExists , err := current , false , error (nil )
206+ count := 0
207+ a .log .Info ("backfill process initiated" ,
208+ "currHash" , curr .Root .String (),
209+ "currSlot" , curr .Header .Message .Slot ,
210+ "startHash" , start .Root .String (),
211+ "startSlot" , start .Header .Message .Slot ,
212+ )
213+
214+ defer func () {
215+ a .log .Info ("backfill process complete" ,
216+ "endHash" , curr .Root .String (),
217+ "endSlot" , curr .Header .Message .Slot ,
218+ "startHash" , start .Root .String (),
219+ "startSlot" , start .Header .Message .Slot ,
220+ )
221+ delete (backfillProcesses , common .Hash (start .Root ))
222+ a .dataStoreClient .WriteBackfillProcesses (ctx , backfillProcesses )
223+ }()
224+
225+ for ! alreadyExists {
226+ previous := curr
227+
228+ if common .Hash (curr .Root ) == a .cfg .OriginBlock {
229+ a .log .Info ("reached origin block" , "hash" , curr .Root .String ())
230+ return
231+ }
143232
144- for ! alreadyExists {
145- previous := current
233+ curr , alreadyExists , err = a .persistBlobsForBlockToS3 (ctx , previous .Header .Message .ParentRoot .String (), false )
234+ if err != nil {
235+ a .log .Error ("failed to persist blobs for block, will retry" , "err" , err , "hash" , previous .Header .Message .ParentRoot .String ())
236+ // Revert back to block we failed to fetch
237+ curr = previous
238+ time .Sleep (backfillErrorRetryInterval )
239+ continue
240+ }
146241
147- if common .Hash (current .Root ) == a .cfg .OriginBlock {
148- a .log .Info ("reached origin block" , "hash" , current .Root .String ())
149- return
150- }
242+ if ! alreadyExists {
243+ a .metrics .RecordProcessedBlock (metrics .BlockSourceBackfill )
244+ }
151245
152- current , alreadyExists , err = a .persistBlobsForBlockToS3 (ctx , previous .Header .Message .ParentRoot .String (), false )
153- if err != nil {
154- a .log .Error ("failed to persist blobs for block, will retry" , "err" , err , "hash" , previous .Header .Message .ParentRoot .String ())
155- // Revert back to block we failed to fetch
156- current = previous
157- time .Sleep (backfillErrorRetryInterval )
158- continue
246+ count ++
247+ if count % 10 == 0 {
248+ backfillProcesses [common .Hash (start .Root )] = storage.BackfillProcess {Start : * start , Current : * curr }
249+ a .dataStoreClient .WriteBackfillProcesses (ctx , backfillProcesses )
250+ }
159251 }
252+ }
160253
161- if ! alreadyExists {
162- a .metrics .RecordProcessedBlock (metrics .BlockSourceBackfill )
163- }
254+ for _ , process := range backfillProcesses {
255+ backfillLoop (& process .Start , & process .Current )
164256 }
165257}
166258
0 commit comments