Skip to content

Commit 013820b

Browse files
committed
refactor(services): avoid specific service methods shadow their impl. from the service framework
1 parent b4de908 commit 013820b

12 files changed

Lines changed: 215 additions & 269 deletions

File tree

internal/advancer/advancer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func newMockAdvancerServiceWithBatchSize(
4848
repository: repo,
4949
}
5050
serviceArgs := &service.CreateInfo{Name: "advancer", Impl: s, EnableReschedule: true}
51-
err := service.Create(context.Background(), serviceArgs, &s.Service)
51+
err := service.NewTickService(serviceArgs, &s.TickService)
5252
if err != nil {
5353
return nil, err
5454
}
@@ -97,12 +97,12 @@ func (s *AdvancerSuite) TestServiceInterface() {
9797
repository.GetEpochsReturn = map[common.Address][]*Epoch{
9898
machineManager.Map[1].application.IApplicationAddress: {},
9999
}
100-
tickErrors := advancer.Tick()
100+
tickErrors := advancer.Tick(context.Background())
101101
require.Empty(tickErrors)
102102

103103
// Test Tick with error
104104
repository.GetEpochsError = errors.New("list epochs error")
105-
tickErrors = advancer.Tick()
105+
tickErrors = advancer.Tick(context.Background())
106106
require.NotEmpty(tickErrors)
107107
require.Contains(tickErrors[0].Error(), "list epochs error")
108108

@@ -1547,7 +1547,7 @@ func (s *AdvancerSuite) TestSelfWakeOnSuccess() {
15471547
require.NoError(err)
15481548

15491549
// Call Tick() which internally calls Step() and signals reschedule.
1550-
svc.Tick()
1550+
svc.Tick(context.Background())
15511551

15521552
// The reschedule channel should have a pending signal.
15531553
require.True(svc.DrainReschedule(),
@@ -1571,7 +1571,7 @@ func (s *AdvancerSuite) TestNoSelfWakeWhenIdle() {
15711571
svc, err := newMockAdvancerService(mm, repo)
15721572
require.NoError(err)
15731573

1574-
svc.Tick()
1574+
svc.Tick(context.Background())
15751575

15761576
require.False(svc.DrainReschedule(),
15771577
"reschedule channel should be empty when no work exists")
@@ -1590,7 +1590,7 @@ func (s *AdvancerSuite) TestNoSelfWakeOnError() {
15901590
svc, err := newMockAdvancerService(mm, repo)
15911591
require.NoError(err)
15921592

1593-
errs := svc.Tick()
1593+
errs := svc.Tick(context.Background())
15941594
require.NotEmpty(errs)
15951595

15961596
require.False(svc.DrainReschedule(),
@@ -1631,7 +1631,7 @@ func (s *AdvancerSuite) TestPartialSuccessStillReschedules() {
16311631

16321632
// Call Tick — app1 fails, app2 succeeds with more work remaining (batch limit hit).
16331633
// Tick should surface the error AND signal reschedule for app2's pending work.
1634-
errs := svc.Tick()
1634+
errs := svc.Tick(context.Background())
16351635
require.NotEmpty(errs, "Tick should surface app1's error")
16361636

16371637
// Reschedule SHOULD fire: app2 had work, and one failing app must not

internal/advancer/service.go

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const httpShutdownTimeout = 10 * time.Second //nolint: mnd
2424

2525
// Service is the main advancer service that processes inputs through Cartesi machines
2626
type Service struct {
27-
service.Service
27+
service.TickService
2828
inputBatchSize uint64
2929
snapshotsDir string
3030
repository AdvancerRepository
@@ -54,10 +54,11 @@ func Create(ctx context.Context, c *CreateInfo) (service.IService, error) {
5454
}
5555

5656
s := &Service{}
57+
s.TickImpl = s
5758
c.Impl = s
5859
c.EnableReschedule = true
5960

60-
err = service.Create(ctx, &c.CreateInfo, &s.Service)
61+
err = service.NewTickService(&c.CreateInfo, &s.TickService)
6162
if err != nil {
6263
return nil, err
6364
}
@@ -96,15 +97,12 @@ func Create(ctx context.Context, c *CreateInfo) (service.IService, error) {
9697

9798
s.LogConfig(c.Config)
9899

99-
return &s.Service, nil
100+
return s, nil
100101
}
101102

102103
// Service interface implementation
103-
func (s *Service) Alive() bool { return true }
104-
func (s *Service) Ready() bool { return true }
105-
func (s *Service) Reload() []error { return nil }
106-
func (s *Service) Tick() []error {
107-
hadWork, err := s.Step(s.Context)
104+
func (s *Service) Tick(ctx context.Context) []error {
105+
hadWork, err := s.Step(ctx)
108106

109107
// Signal reschedule whenever work was done, even if some apps errored.
110108
// Failed apps are marked Failed and removed by the machine manager,
@@ -119,25 +117,14 @@ func (s *Service) Tick() []error {
119117
}
120118
// During shutdown, the machine manager is closed and GetMachine() may
121119
// return ErrNoApp. Suppress this to avoid spurious ERR log entries.
122-
if errors.Is(err, ErrNoApp) && s.IsStopping() {
120+
if errors.Is(err, ErrNoApp) && ctx.Err() != nil {
123121
s.Logger.Warn("Tick interrupted by shutdown", "error", err)
124122
return nil
125123
}
126124
return []error{err}
127125
}
128126

129-
func (s *Service) Stop(b bool) []error {
130-
// CAS achieves once-semantics: the second caller returns immediately
131-
// (fire-and-forget) rather than blocking like sync.Once. This is safe
132-
// because the orchestrator calls Cancel() after Stop() and waits for
133-
// the Serve goroutine to exit.
134-
if !s.cleanedUp.CompareAndSwap(false, true) {
135-
return nil // already stopped
136-
}
137-
// This method shadows service.Service.Stop(), so set the stopping flag
138-
// explicitly. Without this, a concurrent Tick that observes closed
139-
// resources would not see IsStopping() == true.
140-
s.SetStopping()
127+
func (s *Service) fonStop(b bool) []error {
141128
var errs []error
142129
if s.HTTPServer != nil {
143130
s.Logger.Info("Shutting down inspect HTTP server")
@@ -155,17 +142,15 @@ func (s *Service) Stop(b bool) []error {
155142
}
156143
return errs
157144
}
158-
func (s *Service) Serve() error {
145+
146+
func (s *Service) OnServe(ctx context.Context) error {
159147
if s.inspector != nil && s.HTTPServerFunc != nil {
160148
go func() {
161149
if err := s.HTTPServerFunc(); err != nil && !errors.Is(err, http.ErrServerClosed) {
162150
s.Logger.Error("Inspect HTTP server failed — shutting down", "error", err)
163-
s.Cancel()
151+
s.Stop(true)
164152
}
165153
}()
166154
}
167-
return s.Service.Serve()
168-
}
169-
func (s *Service) String() string {
170-
return s.Name
155+
return s.TickService.OnServe(ctx)
171156
}

internal/claimer/claimer_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,10 @@ func newServiceMock() (*Service, *claimerRepositoryMock, *claimerBlockchainMock)
188188
blockchain := &claimerBlockchainMock{}
189189

190190
claimer := &Service{
191-
Service: service.Service{
192-
Logger: slog.New(handler),
191+
TickService: service.TickService{
192+
Service: service.Service{
193+
Logger: slog.New(handler),
194+
},
193195
},
194196
submissionEnabled: true,
195197
claimsInFlight: map[int64]common.Hash{},

internal/claimer/service.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type CreateInfo struct {
3030
}
3131

3232
type Service struct {
33-
service.Service
33+
service.TickService
3434

3535
repository iclaimerRepository
3636
blockchain iclaimerBlockchain
@@ -117,37 +117,20 @@ func Create(ctx context.Context, c *CreateInfo) (service.IService, error) {
117117
return &s.Service, nil
118118
}
119119

120-
func (s *Service) Alive() bool {
121-
return true
122-
}
123-
124-
func (s *Service) Ready() bool {
125-
return true
126-
}
127-
128-
func (s *Service) Reload() []error {
129-
return nil
130-
}
131-
132-
func (s *Service) Stop(bool) []error {
133-
s.SetStopping()
134-
return nil
135-
}
136-
137120
// NOTE: tick is not re-entrant!
138-
func (s *Service) Tick() []error {
121+
func (s *Service) Tick(ctx context.Context) []error {
139122
errs := []error{}
140123

141124
// gather epochs pairs with open claims, either:
142125
// - computed but not yet submitted
143-
acceptedOrSubmittedEpochs, computedEpochs, computedApps, errSubmitted := s.repository.SelectSubmittedClaimPairsPerApp(s.Context)
126+
acceptedOrSubmittedEpochs, computedEpochs, computedApps, errSubmitted := s.repository.SelectSubmittedClaimPairsPerApp(ctx)
144127
if errSubmitted != nil {
145128
errs = append(errs, errSubmitted)
146129
return errs
147130
}
148131

149132
// - submitted but not yet accepted.
150-
acceptedEpochs, submittedEpochs, submittedApps, errAccepted := s.repository.SelectAcceptedClaimPairsPerApp(s.Context)
133+
acceptedEpochs, submittedEpochs, submittedApps, errAccepted := s.repository.SelectAcceptedClaimPairsPerApp(ctx)
151134
if errAccepted != nil {
152135
errs = append(errs, errAccepted)
153136
return errs
@@ -164,7 +147,7 @@ func (s *Service) Tick() []error {
164147
}
165148

166149
// we have claims to check. Get the latest/safe/finalized, etc. block
167-
defaultBlockNumber, err := s.blockchain.getDefaultBlockNumber(s.Context)
150+
defaultBlockNumber, err := s.blockchain.getDefaultBlockNumber(ctx)
168151
if err != nil {
169152
errs = append(errs, err)
170153
return errs

internal/evmreader/service.go

Lines changed: 6 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -140,43 +140,20 @@ func (s *Service) Ready() bool {
140140
return s.ready.Load()
141141
}
142142

143-
func (s *Service) Reload() []error {
144-
return nil
145-
}
146-
147-
func (s *Service) Stop(bool) []error {
148-
s.SetStopping()
149-
return nil
150-
}
151-
152-
func (s *Service) Tick() []error {
153-
return []error{}
154-
}
155-
156-
func (s *Service) Serve() error {
143+
func (s *Service) OnServe(ctx context.Context) error {
157144
s.alive.Store(true)
158-
ready := make(chan struct{}, 1)
159-
go func() {
160-
defer s.alive.Store(false)
161-
defer s.ready.Store(false)
162-
err := s.Run(s.Context, ready)
163-
if err != nil && s.Context.Err() == nil {
164-
s.Logger.Error("Run exited with error", "error", err)
165-
}
166-
s.Cancel()
167-
}()
145+
var ready chan struct{}
168146
go func() {
169147
select {
170148
case <-ready:
171149
s.ready.Store(true)
172-
case <-s.Context.Done():
150+
case <-ctx.Done():
173151
}
174152
}()
175-
return s.Service.Serve()
176-
}
177153

178-
func (s *Service) String() string {
179-
return s.Name
154+
defer s.alive.Store(false)
155+
defer s.ready.Store(false)
156+
return s.Run(ctx, ready)
180157
}
181158

182159
func (s *Service) setupPersistentConfig(

internal/jsonrpc/service.go

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -92,25 +92,7 @@ func Create(ctx context.Context, c *CreateInfo) (service.IService, error) {
9292
return &s.Service, nil
9393
}
9494

95-
func (s *Service) Alive() bool {
96-
return true
97-
}
98-
99-
func (s *Service) Ready() bool {
100-
return true
101-
}
102-
103-
func (s *Service) Reload() []error {
104-
return nil
105-
}
106-
107-
func (s *Service) Tick() []error {
108-
// No periodic tasks.
109-
return nil
110-
}
111-
112-
func (s *Service) Stop(_ bool) []error {
113-
s.SetStopping()
95+
func (s *Service) OnStop(_ bool) []error {
11496
var errs []error
11597
s.Logger.Info("Shutting down JSON-RPC HTTP server", "addr", s.server.Addr)
11698
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) //nolint: mnd
@@ -121,11 +103,7 @@ func (s *Service) Stop(_ bool) []error {
121103
return errs
122104
}
123105

124-
func (s *Service) String() string {
125-
return s.Name
126-
}
127-
128-
func (s *Service) Serve() error {
106+
func (s *Service) OnServe(ctx context.Context) error {
129107
listener, err := s.listen("tcp", s.server.Addr)
130108
if err != nil {
131109
return err

internal/node/node.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,15 @@ func (me *Service) Ready() bool {
142142
return allReady
143143
}
144144

145-
func (s *Service) Reload() []error { return nil }
146-
func (s *Service) Tick() []error { return nil }
147-
func (me *Service) Stop(force bool) []error {
148-
me.SetStopping()
145+
func (me *Service) OnStop(force bool) []error {
149146
errs := []error{}
150147
for _, s := range me.Children {
151148
errs = append(errs, s.Stop(force)...)
152149
}
153150
return errs
154151
}
155152

156-
func (me *Service) Serve() error {
153+
func (me *Service) OnServe(ctx context.Context) error {
157154
for _, s := range me.Children {
158155
go s.Serve()
159156
}

internal/prt/service.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type CreateInfo struct {
2929
}
3030

3131
type Service struct {
32-
service.Service
32+
service.TickService
3333
repository prtRepository
3434
client EthClientInterface
3535
adapterFactory AdapterFactory
@@ -123,10 +123,6 @@ func Create(ctx context.Context, c *CreateInfo) (service.IService, error) {
123123
return &s.Service, nil
124124
}
125125

126-
func (s *Service) Alive() bool { return true }
127-
func (s *Service) Ready() bool { return true }
128-
func (s *Service) Reload() []error { return nil }
129-
130126
// Tick executes the Validator main logic of producing claims and/or proofs
131127
// for processed epochs of all running applications.
132128
func (s *Service) Tick() []error {

0 commit comments

Comments
 (0)