Skip to content

Commit 1456586

Browse files
authored
refactor: syncer service improvements (#1745)
Refactoring of Sync Service to address audit issues. #1484 and #1485 are resolved together because `SyncService` is using generics now. Resolves #1483 Resolves #1484 Resolves #1485 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **Refactor** - Improved initialization and starting processes for synchronization services. - Introduced helper methods for P2P setup and syncer initialization. - **Enhancements** - Updated `Start` and `Stop` methods for synchronization services to include context parameters for more controlled operation. - Adjusted initialization logic to remove redundant context parameters for a cleaner API. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 196c419 commit 1456586

File tree

3 files changed

+99
-60
lines changed

3 files changed

+99
-60
lines changed

block/sync_service.go

Lines changed: 86 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type SyncService[H header.Header[H]] struct {
5050
syncerStatus *SyncerStatus
5151

5252
logger log.Logger
53-
ctx context.Context
5453
}
5554

5655
// BlockSyncService is the P2P Sync Service for blocks.
@@ -60,16 +59,16 @@ type BlockSyncService = SyncService[*types.Block]
6059
type HeaderSyncService = SyncService[*types.SignedHeader]
6160

6261
// NewBlockSyncService returns a new BlockSyncService.
63-
func NewBlockSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) {
64-
return newSyncService[*types.Block](ctx, store, blockSync, conf, genesis, p2p, logger)
62+
func NewBlockSyncService(store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*BlockSyncService, error) {
63+
return newSyncService[*types.Block](store, blockSync, conf, genesis, p2p, logger)
6564
}
6665

6766
// NewHeaderSyncService returns a new HeaderSyncService.
68-
func NewHeaderSyncService(ctx context.Context, store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSyncService, error) {
69-
return newSyncService[*types.SignedHeader](ctx, store, headerSync, conf, genesis, p2p, logger)
67+
func NewHeaderSyncService(store ds.TxnDatastore, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*HeaderSyncService, error) {
68+
return newSyncService[*types.SignedHeader](store, headerSync, conf, genesis, p2p, logger)
7069
}
7170

72-
func newSyncService[H header.Header[H]](ctx context.Context, store ds.TxnDatastore, syncType syncType, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*SyncService[H], error) {
71+
func newSyncService[H header.Header[H]](store ds.TxnDatastore, syncType syncType, conf config.NodeConfig, genesis *cmtypes.GenesisDoc, p2p *p2p.Client, logger log.Logger) (*SyncService[H], error) {
7372
if genesis == nil {
7473
return nil, errors.New("genesis doc cannot be nil")
7574
}
@@ -95,7 +94,6 @@ func newSyncService[H header.Header[H]](ctx context.Context, store ds.TxnDatasto
9594
conf: conf,
9695
genesis: genesis,
9796
p2p: p2p,
98-
ctx: ctx,
9997
store: ss,
10098
syncType: syncType,
10199
logger: logger,
@@ -115,7 +113,7 @@ func (syncService *SyncService[H]) initStoreAndStartSyncer(ctx context.Context,
115113
if err := syncService.store.Init(ctx, initial); err != nil {
116114
return err
117115
}
118-
if err := syncService.StartSyncer(); err != nil {
116+
if err := syncService.StartSyncer(ctx); err != nil {
119117
return err
120118
}
121119
return nil
@@ -134,7 +132,7 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
134132
return fmt.Errorf("failed to initialize the store")
135133
}
136134

137-
if err := syncService.StartSyncer(); err != nil {
135+
if err := syncService.StartSyncer(ctx); err != nil {
138136
return fmt.Errorf("failed to start syncer after initializing the store")
139137
}
140138
}
@@ -157,73 +155,96 @@ func (syncService *SyncService[H]) isInitialized() bool {
157155
}
158156

159157
// Start is a part of Service interface.
160-
func (syncService *SyncService[H]) Start() error {
161-
// have to do the initializations here to utilize the p2p node which is created on start
162-
ps := syncService.p2p.PubSub()
163-
chainID := syncService.genesis.ChainID + "-" + string(syncService.syncType)
158+
func (syncService *SyncService[H]) Start(ctx context.Context) error {
159+
peerIDs, err := syncService.setupP2P(ctx)
160+
if err != nil {
161+
return err
162+
}
163+
164+
if err := syncService.prepareSyncer(ctx); err != nil {
165+
return err
166+
}
167+
168+
return syncService.setFirstAndStart(ctx, peerIDs)
169+
}
164170

171+
// setupP2P sets up the P2P configuration for the SyncService and starts the necessary components.
172+
// it returns IDs of peers in configuration (seeds) and available in the network.
173+
func (syncService *SyncService[H]) setupP2P(ctx context.Context) ([]peer.ID, error) {
174+
ps := syncService.p2p.PubSub()
165175
var err error
166176
syncService.sub, err = goheaderp2p.NewSubscriber[H](
167177
ps,
168178
pubsub.DefaultMsgIdFn,
169-
goheaderp2p.WithSubscriberNetworkID(chainID),
179+
goheaderp2p.WithSubscriberNetworkID(syncService.getChainID()),
170180
goheaderp2p.WithSubscriberMetrics(),
171181
)
172182
if err != nil {
173-
return err
183+
return nil, err
174184
}
175185

176-
if err := syncService.sub.Start(syncService.ctx); err != nil {
177-
return fmt.Errorf("error while starting subscriber: %w", err)
186+
if err := syncService.sub.Start(ctx); err != nil {
187+
return nil, fmt.Errorf("error while starting subscriber: %w", err)
178188
}
179189
if _, err := syncService.sub.Subscribe(); err != nil {
180-
return fmt.Errorf("error while subscribing: %w", err)
190+
return nil, fmt.Errorf("error while subscribing: %w", err)
181191
}
182-
183-
if err := syncService.store.Start(syncService.ctx); err != nil {
184-
return fmt.Errorf("error while starting store: %w", err)
192+
if err := syncService.store.Start(ctx); err != nil {
193+
return nil, fmt.Errorf("error while starting store: %w", err)
185194
}
186195

187196
_, _, network, err := syncService.p2p.Info()
188197
if err != nil {
189-
return fmt.Errorf("error while fetching the network: %w", err)
198+
return nil, fmt.Errorf("error while fetching the network: %w", err)
190199
}
191-
networkID := network + "-" + string(syncService.syncType)
200+
networkID := syncService.getNetworkID(network)
192201

193202
if syncService.p2pServer, err = newP2PServer(syncService.p2p.Host(), syncService.store, networkID); err != nil {
194-
return fmt.Errorf("error while creating p2p server: %w", err)
203+
return nil, fmt.Errorf("error while creating p2p server: %w", err)
195204
}
196-
if err := syncService.p2pServer.Start(syncService.ctx); err != nil {
197-
return fmt.Errorf("error while starting p2p server: %w", err)
205+
if err := syncService.p2pServer.Start(ctx); err != nil {
206+
return nil, fmt.Errorf("error while starting p2p server: %w", err)
198207
}
199208

200-
peerIDs := syncService.p2p.PeerIDs()
201-
if !syncService.conf.Aggregator {
202-
peerIDs = append(peerIDs, getSeedNodes(syncService.conf.P2P.Seeds, syncService.logger)...)
203-
}
209+
peerIDs := syncService.getPeerIDs()
204210
if syncService.ex, err = newP2PExchange[H](syncService.p2p.Host(), peerIDs, networkID, syncService.genesis.ChainID, syncService.p2p.ConnectionGater()); err != nil {
205-
return fmt.Errorf("error while creating exchange: %w", err)
211+
return nil, fmt.Errorf("error while creating exchange: %w", err)
206212
}
207-
if err := syncService.ex.Start(syncService.ctx); err != nil {
208-
return fmt.Errorf("error while starting exchange: %w", err)
213+
if err := syncService.ex.Start(ctx); err != nil {
214+
return nil, fmt.Errorf("error while starting exchange: %w", err)
209215
}
216+
return peerIDs, nil
217+
}
210218

219+
// prepareSyncer initializes the syncer for the SyncService with the provided options.
220+
// If the initialization is successful and the SyncService is already initialized,
221+
// it starts the syncer by calling StartSyncer.
222+
// Returns error if initialization or starting of syncer fails.
223+
func (syncService *SyncService[H]) prepareSyncer(ctx context.Context) error {
224+
var err error
211225
if syncService.syncer, err = newSyncer[H](
212226
syncService.ex,
213227
syncService.store,
214228
syncService.sub,
215229
[]goheadersync.Option{goheadersync.WithBlockTime(syncService.conf.BlockTime)},
216230
); err != nil {
217-
return fmt.Errorf("error while creating syncer: %w", err)
231+
return nil
218232
}
219233

220234
if syncService.isInitialized() {
221-
if err := syncService.StartSyncer(); err != nil {
222-
return fmt.Errorf("error while starting the syncer: %w", err)
235+
if err := syncService.StartSyncer(ctx); err != nil {
236+
return nil
223237
}
224238
return nil
225239
}
240+
return err
241+
}
226242

243+
// setFirstAndStart looks up for the trusted hash or the genesis header/block.
244+
// If trusted hash is available, it fetches the trusted header/block (by hash) from peers.
245+
// Otherwise, it tries to fetch the genesis header/block by height.
246+
// If trusted header/block is available, syncer is started.
247+
func (syncService *SyncService[H]) setFirstAndStart(ctx context.Context, peerIDs []peer.ID) error {
227248
// Look to see if trusted hash is passed, if not get the genesis header/block
228249
var trusted H
229250
// Try fetching the trusted header/block from peers if exists
@@ -234,35 +255,37 @@ func (syncService *SyncService[H]) Start() error {
234255
return fmt.Errorf("failed to parse the trusted hash for initializing the store: %w", err)
235256
}
236257

237-
if trusted, err = syncService.ex.Get(syncService.ctx, header.Hash(trustedHashBytes)); err != nil {
258+
if trusted, err = syncService.ex.Get(ctx, trustedHashBytes); err != nil {
238259
return fmt.Errorf("failed to fetch the trusted header/block for initializing the store: %w", err)
239260
}
240261
} else {
241262
// Try fetching the genesis header/block if available, otherwise fallback to block
242-
if trusted, err = syncService.ex.GetByHeight(syncService.ctx, uint64(syncService.genesis.InitialHeight)); err != nil {
263+
var err error
264+
if trusted, err = syncService.ex.GetByHeight(ctx, uint64(syncService.genesis.InitialHeight)); err != nil {
243265
// Full/light nodes have to wait for aggregator to publish the genesis block
244266
// proposing aggregator can init the store and start the syncer when the first block is published
245267
return fmt.Errorf("failed to fetch the genesis: %w", err)
246268
}
247269
}
248-
return syncService.initStoreAndStartSyncer(syncService.ctx, trusted)
270+
271+
return syncService.initStoreAndStartSyncer(ctx, trusted)
249272
}
250273
return nil
251274
}
252275

253276
// Stop is a part of Service interface.
254277
//
255278
// `store` is closed last because it's used by other services.
256-
func (syncService *SyncService[H]) Stop() error {
279+
func (syncService *SyncService[H]) Stop(ctx context.Context) error {
257280
err := errors.Join(
258-
syncService.p2pServer.Stop(syncService.ctx),
259-
syncService.ex.Stop(syncService.ctx),
260-
syncService.sub.Stop(syncService.ctx),
281+
syncService.p2pServer.Stop(ctx),
282+
syncService.ex.Stop(ctx),
283+
syncService.sub.Stop(ctx),
261284
)
262285
if syncService.syncerStatus.isStarted() {
263-
err = errors.Join(err, syncService.syncer.Stop(syncService.ctx))
286+
err = errors.Join(err, syncService.syncer.Stop(ctx))
264287
}
265-
err = errors.Join(err, syncService.store.Stop(syncService.ctx))
288+
err = errors.Join(err, syncService.store.Stop(ctx))
266289
return err
267290
}
268291

@@ -309,18 +332,34 @@ func newSyncer[H header.Header[H]](
309332
}
310333

311334
// StartSyncer starts the SyncService's syncer
312-
func (syncService *SyncService[H]) StartSyncer() error {
335+
func (syncService *SyncService[H]) StartSyncer(ctx context.Context) error {
313336
if syncService.syncerStatus.isStarted() {
314337
return nil
315338
}
316-
err := syncService.syncer.Start(syncService.ctx)
339+
err := syncService.syncer.Start(ctx)
317340
if err != nil {
318341
return err
319342
}
320343
syncService.syncerStatus.started.Store(true)
321344
return nil
322345
}
323346

347+
func (syncService *SyncService[H]) getNetworkID(network string) string {
348+
return network + "-" + string(syncService.syncType)
349+
}
350+
351+
func (syncService *SyncService[H]) getChainID() string {
352+
return syncService.genesis.ChainID + "-" + string(syncService.syncType)
353+
}
354+
355+
func (syncService *SyncService[H]) getPeerIDs() []peer.ID {
356+
peerIDs := syncService.p2p.PeerIDs()
357+
if !syncService.conf.Aggregator {
358+
peerIDs = append(peerIDs, getSeedNodes(syncService.conf.P2P.Seeds, syncService.logger)...)
359+
}
360+
return peerIDs
361+
}
362+
324363
func getSeedNodes(seeds string, logger log.Logger) []peer.ID {
325364
var peerIDs []peer.ID
326365
for _, seed := range strings.Split(seeds, ",") {

node/full.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,12 @@ func newFullNode(
142142
}
143143

144144
mainKV := newPrefixKV(baseKV, mainPrefix)
145-
headerSyncService, err := initHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
145+
headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
146146
if err != nil {
147147
return nil, err
148148
}
149149

150-
blockSyncService, err := initBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger)
150+
blockSyncService, err := initBlockSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
151151
if err != nil {
152152
return nil, err
153153
}
@@ -247,16 +247,16 @@ func initMempool(logger log.Logger, proxyApp proxy.AppConns, memplMetrics *mempo
247247
return mempool
248248
}
249249

250-
func initHeaderSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSyncService, error) {
251-
headerSyncService, err := block.NewHeaderSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService"))
250+
func initHeaderSyncService(mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.HeaderSyncService, error) {
251+
headerSyncService, err := block.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "HeaderSyncService"))
252252
if err != nil {
253253
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
254254
}
255255
return headerSyncService, nil
256256
}
257257

258-
func initBlockSyncService(ctx context.Context, mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockSyncService, error) {
259-
blockSyncService, err := block.NewBlockSyncService(ctx, mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockSyncService"))
258+
func initBlockSyncService(mainKV ds.TxnDatastore, nodeConfig config.NodeConfig, genesis *cmtypes.GenesisDoc, p2pClient *p2p.Client, logger log.Logger) (*block.BlockSyncService, error) {
259+
blockSyncService, err := block.NewBlockSyncService(mainKV, nodeConfig, genesis, p2pClient, logger.With("module", "BlockSyncService"))
260260
if err != nil {
261261
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
262262
}
@@ -376,11 +376,11 @@ func (n *FullNode) OnStart() error {
376376
return fmt.Errorf("error while starting P2P client: %w", err)
377377
}
378378

379-
if err = n.hSyncService.Start(); err != nil {
379+
if err = n.hSyncService.Start(n.ctx); err != nil {
380380
return fmt.Errorf("error while starting header sync service: %w", err)
381381
}
382382

383-
if err = n.bSyncService.Start(); err != nil {
383+
if err = n.bSyncService.Start(n.ctx); err != nil {
384384
return fmt.Errorf("error while starting block sync service: %w", err)
385385
}
386386

@@ -422,8 +422,8 @@ func (n *FullNode) OnStop() {
422422
n.Logger.Info("shutting down full node sub services...")
423423
err := errors.Join(
424424
n.p2pClient.Close(),
425-
n.hSyncService.Stop(),
426-
n.bSyncService.Stop(),
425+
n.hSyncService.Stop(n.ctx),
426+
n.bSyncService.Stop(n.ctx),
427427
n.IndexerService.Stop(),
428428
)
429429
if n.prometheusSrv != nil {

node/light.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func newLightNode(
7979
return nil, err
8080
}
8181

82-
headerSyncService, err := block.NewHeaderSyncService(ctx, datastore, conf, genesis, client, logger.With("module", "HeaderSyncService"))
82+
headerSyncService, err := block.NewHeaderSyncService(datastore, conf, genesis, client, logger.With("module", "HeaderSyncService"))
8383
if err != nil {
8484
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
8585
}
@@ -120,7 +120,7 @@ func (ln *LightNode) OnStart() error {
120120
return err
121121
}
122122

123-
if err := ln.hSyncService.Start(); err != nil {
123+
if err := ln.hSyncService.Start(ln.ctx); err != nil {
124124
return fmt.Errorf("error while starting header sync service: %w", err)
125125
}
126126

@@ -132,7 +132,7 @@ func (ln *LightNode) OnStop() {
132132
ln.Logger.Info("halting light node...")
133133
ln.cancel()
134134
err := ln.P2P.Close()
135-
err = errors.Join(err, ln.hSyncService.Stop())
135+
err = errors.Join(err, ln.hSyncService.Stop(ln.ctx))
136136
ln.Logger.Error("errors while stopping node:", "errors", err)
137137
}
138138

0 commit comments

Comments
 (0)