Skip to content

Commit 1d6cdf0

Browse files
pouloknetopyr
andauthored
feat: Add final Consistency functionality to Otter App (#21334)
Signed-off-by: Michael Heinrichs <[email protected]> Signed-off-by: Kelly Greco <[email protected]> Co-authored-by: Michael Heinrichs <[email protected]>
1 parent 9961fa1 commit 1d6cdf0

File tree

12 files changed

+629
-59
lines changed

12 files changed

+629
-59
lines changed

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/TransactionGenerator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
* Interface representing a transaction generator.
66
*
77
* <p>A {@link TransactionGenerator} generates a steady flow of transaction to all nodes in the
8-
* network. The generator sends 100 transactions to each node per second, which ensures there is
9-
* always at least one transaction waiting to be processed by the event creator.
8+
* network. The generator sends a constant number of transactions to each node per second, which ensures there is
9+
* always at least one transaction waiting to be processed by the event creator. The number of transactions per
10+
* second is defined by the {@link #TPS} constant.
1011
*/
1112
public interface TransactionGenerator {
1213

14+
/** The number of transactions to generate per second, per node. */
1315
int TPS = 100;
1416

1517
/**

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/app/OtterApp.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.hedera.hapi.node.base.SemanticVersion;
77
import com.hedera.hapi.platform.event.StateSignatureTransaction;
88
import com.swirlds.common.context.PlatformContext;
9+
import com.swirlds.config.api.Configuration;
910
import com.swirlds.platform.state.ConsensusStateEventHandler;
1011
import com.swirlds.platform.system.InitTrigger;
1112
import com.swirlds.platform.system.Platform;
@@ -99,8 +100,8 @@ public void onPreHandle(
99100
for (final OtterService service : allServices) {
100101
service.preHandleEvent(event);
101102
}
102-
for (final Iterator<Transaction> transactionIterator = event.transactionIterator();
103-
transactionIterator.hasNext(); ) {
103+
final Iterator<Transaction> transactionIterator = event.transactionIterator();
104+
while (transactionIterator.hasNext()) {
104105
try {
105106
final OtterTransaction transaction = OtterTransaction.parseFrom(
106107
transactionIterator.next().getApplicationTransaction().toInputStream());
@@ -125,16 +126,15 @@ public void onHandleConsensusRound(
125126
@NonNull final OtterAppState state,
126127
@NonNull final Consumer<ScopedSystemTransaction<StateSignatureTransaction>> callback) {
127128
for (final OtterService service : allServices) {
128-
service.handleRound(state.getWritableStates(service.name()), round);
129+
service.onRoundStart(state.getWritableStates(service.name()), round);
129130
}
130131

131132
for (final ConsensusEvent consensusEvent : round) {
132133
for (final OtterService service : allServices) {
133-
service.handleEvent(state.getWritableStates(service.name()), consensusEvent);
134+
service.onEventStart(state.getWritableStates(service.name()), consensusEvent);
134135
}
135-
for (final Iterator<ConsensusTransaction> transactionIterator =
136-
consensusEvent.consensusTransactionIterator();
137-
transactionIterator.hasNext(); ) {
136+
final Iterator<ConsensusTransaction> transactionIterator = consensusEvent.consensusTransactionIterator();
137+
while (transactionIterator.hasNext()) {
138138
try {
139139
final OtterTransaction transaction = OtterTransaction.parseFrom(transactionIterator
140140
.next()
@@ -151,6 +151,14 @@ public void onHandleConsensusRound(
151151
ex);
152152
}
153153
}
154+
155+
for (final OtterService service : allServices) {
156+
service.onEventComplete(consensusEvent);
157+
}
158+
}
159+
160+
for (final OtterService service : allServices) {
161+
service.onRoundComplete(round);
154162
}
155163

156164
state.commitState();
@@ -190,12 +198,14 @@ public void onStateInitialized(
190198
@NonNull final Platform platform,
191199
@NonNull final InitTrigger trigger,
192200
@Nullable final SemanticVersion previousVersion) {
201+
final Configuration configuration = platform.getContext().getConfiguration();
193202
if (state.getReadableStates(ConsistencyService.NAME).isEmpty()) {
194-
OtterStateInitializer.initOtterAppState(
195-
platform.getContext().getConfiguration(), state, version, appServices);
203+
OtterStateInitializer.initOtterAppState(configuration, state, version, appServices);
196204
}
197205

198-
consistencyService.initialize();
206+
for (final OtterService service : allServices) {
207+
service.initialize(trigger, platform.getSelfId(), configuration, state);
208+
}
199209
}
200210

201211
/**
@@ -225,4 +235,10 @@ public void onNewRecoveredState(@NonNull final OtterAppState recoveredState) {
225235
public void updateSyntheticBottleneck(final long millisToSleepPerRound) {
226236
this.syntheticBottleneckMillis.set(millisToSleepPerRound);
227237
}
238+
239+
public void destroy() {
240+
for (final OtterService service : allServices) {
241+
service.destroy();
242+
}
243+
}
228244
}

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/app/OtterService.java

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33

44
import com.hedera.hapi.node.base.SemanticVersion;
55
import com.hedera.hapi.platform.event.StateSignatureTransaction;
6+
import com.swirlds.config.api.Configuration;
7+
import com.swirlds.platform.system.InitTrigger;
68
import com.swirlds.state.lifecycle.Schema;
79
import com.swirlds.state.spi.WritableStates;
810
import edu.umd.cs.findbugs.annotations.NonNull;
911
import java.util.function.Consumer;
1012
import org.hiero.consensus.model.event.Event;
1113
import org.hiero.consensus.model.hashgraph.Round;
14+
import org.hiero.consensus.model.node.NodeId;
1215
import org.hiero.consensus.model.transaction.ScopedSystemTransaction;
13-
import org.hiero.consensus.model.transaction.Transaction;
1416

1517
/**
1618
* This interface defines a service of the Otter application.
@@ -34,15 +36,48 @@ public interface OtterService {
3436
@NonNull
3537
Schema<SemanticVersion> genesisSchema(@NonNull SemanticVersion version);
3638

39+
/**
40+
* Called when the service is initialized. This is called once when the application starts up.
41+
*
42+
* @param trigger the trigger that caused the initialization
43+
* @param selfId the ID of this node
44+
* @param configuration the configuration to use
45+
* @param state the current state at the time of initialization
46+
*/
47+
default void initialize(
48+
@NonNull final InitTrigger trigger,
49+
@NonNull final NodeId selfId,
50+
@NonNull final Configuration configuration,
51+
@NonNull final OtterAppState state) {
52+
// Default implementation does nothing
53+
}
54+
55+
/**
56+
* Called when the service is being shut down. This is called once when the application is shutting down.
57+
*/
58+
default void destroy() {
59+
// Default implementation does nothing
60+
}
61+
3762
/**
3863
* Called when a new round of consensus has been received. The service should only do actions for the whole round in
39-
* this method. For actions on individual events, use {@link #handleEvent(WritableStates, Event)}. For actions on
40-
* individual transactions, use {@link #handleTransaction(WritableStates, Event, Transaction, Consumer)}.
64+
* this method. For actions on individual events, use {@link #onEventStart(WritableStates, Event)}. For actions on
65+
* individual transactions, use {@link #handleTransaction(WritableStates, Event, OtterTransaction, Consumer)} .
4166
*
4267
* @param writableStates the {@link WritableStates} to use to modify state
4368
* @param round the round to handle
4469
*/
45-
default void handleRound(@NonNull final WritableStates writableStates, @NonNull final Round round) {
70+
default void onRoundStart(@NonNull final WritableStates writableStates, @NonNull final Round round) {
71+
// Default implementation does nothing
72+
}
73+
74+
/**
75+
* Called when a round of consensus has been completely handled. This is called after all events and transactions in
76+
* the round have been handled.
77+
*
78+
* @param round the round that was completed
79+
*/
80+
default void onRoundComplete(@NonNull final Round round) {
4681
// Default implementation does nothing
4782
}
4883

@@ -54,7 +89,17 @@ default void handleRound(@NonNull final WritableStates writableStates, @NonNull
5489
* @param writableStates the {@link WritableStates} to use to modify state
5590
* @param event the event to handle
5691
*/
57-
default void handleEvent(@NonNull final WritableStates writableStates, @NonNull final Event event) {
92+
default void onEventStart(@NonNull final WritableStates writableStates, @NonNull final Event event) {
93+
// Default implementation does nothing
94+
}
95+
96+
/**
97+
* Called when an event has been completely handled. This is called after all transactions in the event have been
98+
* handled.
99+
*
100+
* @param event the event that was completed
101+
*/
102+
default void onEventComplete(@NonNull final Event event) {
58103
// Default implementation does nothing
59104
}
60105

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/app/services/consistency/ConsistencyService.java

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,41 @@
55

66
import com.hedera.hapi.node.base.SemanticVersion;
77
import com.hedera.hapi.platform.event.StateSignatureTransaction;
8+
import com.swirlds.common.config.StateCommonConfig;
9+
import com.swirlds.config.api.Configuration;
10+
import com.swirlds.platform.system.InitTrigger;
811
import com.swirlds.state.lifecycle.Schema;
912
import com.swirlds.state.spi.WritableStates;
1013
import edu.umd.cs.findbugs.annotations.NonNull;
14+
import java.io.IOException;
15+
import java.io.UncheckedIOException;
16+
import java.nio.file.Files;
17+
import java.nio.file.Path;
1118
import java.util.Set;
1219
import java.util.concurrent.ConcurrentHashMap;
1320
import java.util.function.Consumer;
1421
import org.apache.logging.log4j.LogManager;
1522
import org.apache.logging.log4j.Logger;
1623
import org.hiero.consensus.model.event.Event;
24+
import org.hiero.consensus.model.hashgraph.ConsensusConstants;
1725
import org.hiero.consensus.model.hashgraph.Round;
26+
import org.hiero.consensus.model.node.NodeId;
1827
import org.hiero.consensus.model.transaction.ScopedSystemTransaction;
28+
import org.hiero.otter.fixtures.app.OtterAppState;
1929
import org.hiero.otter.fixtures.app.OtterService;
2030
import org.hiero.otter.fixtures.app.OtterTransaction;
31+
import org.jetbrains.annotations.NotNull;
2132

2233
/**
2334
* A service that ensures the consistency of rounds and transactions sent by the platform to the execution layer for
2435
* handling. It checks these aspects of consistency:
2536
* <ol>
2637
* <li>Consensus rounds increase in number monotonically</li>
2738
* <li>Consensus rounds are received only once</li>
28-
* <li>Differences in rounds or transactions sent to {@link #recordRound(Round)} on different nodes will cause an ISS</li>
29-
* <li>Consensus transactions were previous received in preHandle</li>
30-
* <li>After a restart, any rounds that reach consensus in PCES replay exactly match the rounds calculated previously.</li>
39+
* <li>Differences in rounds or transactions recorded in the {@link ConsistencyServiceRoundHistory} on different nodes will cause an ISS</li>
40+
* <li>Transactions are pre-handled only once</li>
41+
* <li>Consensus transactions were previously received in pre-handle</li>
42+
* <li>After a restart, any rounds that reach consensus during PCES replay exactly match the rounds calculated previously.</li>
3143
* </ol>
3244
*/
3345
public class ConsistencyService implements OtterService {
@@ -40,23 +52,86 @@ public class ConsistencyService implements OtterService {
4052
/** A set of transaction nonce values seen in pre-handle that have not yet been handled. */
4153
private final Set<Long> transactionsAwaitingHandle = ConcurrentHashMap.newKeySet();
4254

55+
/** A history of all rounds and transaction nonce values contained within. */
56+
private final ConsistencyServiceRoundHistory roundHistory = new ConsistencyServiceRoundHistory();
57+
58+
/** The round number of the previous round handled. */
59+
private long previousRoundHandled = ConsensusConstants.ROUND_UNDEFINED;
60+
61+
/**
62+
* {@inheritDoc}
63+
*/
64+
public void initialize(
65+
@NonNull final InitTrigger trigger,
66+
@NonNull final NodeId selfId,
67+
@NonNull final Configuration configuration,
68+
@NonNull final OtterAppState state) {
69+
if (trigger != InitTrigger.GENESIS && trigger != InitTrigger.RESTART) {
70+
return;
71+
}
72+
final StateCommonConfig stateConfig = configuration.getConfigData(StateCommonConfig.class);
73+
final ConsistencyServiceConfig consistencyServiceConfig =
74+
configuration.getConfigData(ConsistencyServiceConfig.class);
75+
76+
final Path historyFileDirectory = stateConfig
77+
.savedStateDirectory()
78+
.resolve(consistencyServiceConfig.historyFileDirectory())
79+
.resolve(Long.toString(selfId.id()));
80+
try {
81+
Files.createDirectories(historyFileDirectory);
82+
} catch (final IOException e) {
83+
log.error(EXCEPTION.getMarker(), "Unable to create log file directory", e);
84+
throw new UncheckedIOException("unable to set up file system for consistency data", e);
85+
}
86+
87+
final Path historyFilePath = historyFileDirectory.resolve(consistencyServiceConfig.historyFileName());
88+
roundHistory.init(historyFilePath);
89+
}
90+
91+
/**
92+
* {@inheritDoc}
93+
*/
94+
@Override
95+
public void destroy() {
96+
roundHistory.close();
97+
}
98+
4399
/**
44-
* Records the contents of all rounds, even empty ones. This method calculates a running hash that includes the
100+
* Records the contents of all rounds, even empty ones. This method calculates a running checksum that includes the
45101
* round number and all transactions and stores the number of rounds handled in the state.
46102
*
47103
* @param writableStates the writable states used to modify the consistency state
48104
* @param round the round to handle
49105
*/
50106
@Override
51-
public void handleRound(@NonNull final WritableStates writableStates, @NonNull final Round round) {
52-
new WritableConsistencyStateStore(writableStates)
107+
public void onRoundStart(@NonNull final WritableStates writableStates, @NonNull final Round round) {
108+
verifyRoundIncreases(round);
109+
110+
final WritableConsistencyStateStore store = new WritableConsistencyStateStore(writableStates)
53111
.accumulateRunningChecksum(round.getRoundNum())
54112
.incrementRoundsHandled();
113+
roundHistory.onRoundStart(round, store.getRunningChecksum());
114+
}
115+
116+
private void verifyRoundIncreases(@NonNull final Round round) {
117+
if (previousRoundHandled == ConsensusConstants.ROUND_UNDEFINED) {
118+
previousRoundHandled = round.getRoundNum();
119+
return;
120+
}
121+
122+
final long newRoundNumber = round.getRoundNum();
123+
124+
// make sure round numbers always increase
125+
if (newRoundNumber <= previousRoundHandled) {
126+
final String error = "Round " + newRoundNumber + " is not greater than round " + previousRoundHandled;
127+
log.error(EXCEPTION.getMarker(), error);
128+
}
129+
130+
previousRoundHandled = round.getRoundNum();
55131
}
56132

57133
/**
58-
* This method updates the running hash that includes the contents of all
59-
* transactions.
134+
* This method updates the running hash that includes the contents of all transactions.
60135
*/
61136
@Override
62137
public void handleTransaction(
@@ -67,13 +142,14 @@ public void handleTransaction(
67142
final long transactionNonce = transaction.getNonce();
68143
new WritableConsistencyStateStore(writableStates).accumulateRunningChecksum(transactionNonce);
69144
if (!transactionsAwaitingHandle.remove(transactionNonce)) {
70-
log.error(EXCEPTION.getMarker(), "Transaction {} was not prehandled.", transactionNonce);
145+
log.error(EXCEPTION.getMarker(), "Transaction {} was not pre-handled.", transactionNonce);
71146
}
147+
roundHistory.onTransaction(transaction);
72148
}
73149

74150
/**
75-
* This method records the checksum of all transactions that are pre-handled, so that we can verify
76-
* that all consensus transactions were previously pre-handled.
151+
* This method records the checksum of all transactions that are pre-handled, so that we can verify that all
152+
* consensus transactions were previously pre-handled.
77153
*
78154
* @param event the event that contains the transaction
79155
* @param transaction the transaction being pre-handled
@@ -90,23 +166,12 @@ public void preHandleTransaction(
90166
}
91167
}
92168

93-
private void recordRound(@NonNull final Round round) {
94-
// FUTURE WORK: Write the round data to in-memory structure and disk. Write to in-memory structure
95-
// so we can verify that rounds increase monotonically (no rounds are repeated or skipped). Write to
96-
// disk so that we can verify that the same rounds reach consensus after a restart during PCES replay.
97-
98-
// FUTURE WORK: Compare the round to rounds previous recorded in memory and do basic validations, like
99-
// checking that the round number is one greater than the previous round number, and that all transactions
100-
// were previously received in prehandle.
101-
}
102-
103-
public void initialize() {
104-
// FUTURE WORK: Read round data from disk (written in recordRound()) into in-memory structure.
105-
}
106-
107-
public void recordPreHandleTransactions(@NonNull final Event event) {
108-
// FUTURE WORK: Record the prehandle transactions so that we can verify all
109-
// consensus transactions were previously sent to prehandle.
169+
/**
170+
* {@inheritDoc}
171+
*/
172+
@Override
173+
public void onRoundComplete(@NotNull final Round round) {
174+
roundHistory.onRoundComplete();
110175
}
111176

112177
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.otter.fixtures.app.services.consistency;
3+
4+
import com.swirlds.config.api.ConfigData;
5+
import com.swirlds.config.api.ConfigProperty;
6+
7+
/**
8+
* Configuration for the Consistency Service
9+
*
10+
* @param historyFileDirectory the directory where the history file will be stored
11+
* @param historyFileName the name of the history file
12+
*/
13+
@ConfigData("consistencyTestingTool")
14+
public record ConsistencyServiceConfig(
15+
@ConfigProperty(defaultValue = "consistency-test") String historyFileDirectory,
16+
@ConfigProperty(defaultValue = "ConsistencyTestLog.csv") String historyFileName) {}

0 commit comments

Comments
 (0)