Skip to content

Commit e7725e3

Browse files
committed
fix: 22553: Reconnect teacher may experience high virtual pipeline backpressure
Signed-off-by: Artem Ananev <[email protected]>
1 parent 19a3061 commit e7725e3

File tree

4 files changed

+96
-28
lines changed

4 files changed

+96
-28
lines changed

platform-sdk/swirlds-common/src/main/java/com/swirlds/common/merkle/synchronization/TeachingSynchronizer.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,39 @@ public TeachingSynchronizer(
111111
this.reconnectConfig = Objects.requireNonNull(reconnectConfig, "reconnectConfig must not be null");
112112
}
113113

114+
/**
115+
* Create a new teaching synchronizer.
116+
*
117+
* @param threadManager responsible for managing thread lifecycles
118+
* @param in the input stream
119+
* @param out the output stream
120+
* @param view the teacher tree view, used to access all tree nodes
121+
* @param breakConnection a method that breaks the connection. Used iff an exception is encountered. Prevents
122+
* deadlock if there is a thread stuck on a blocking IO operation that will never finish due
123+
* to a failure.
124+
* @param reconnectConfig reconnect configuration from platform
125+
*/
126+
public TeachingSynchronizer(
127+
@NonNull final Time time,
128+
@NonNull final ThreadManager threadManager,
129+
@NonNull final MerkleDataInputStream in,
130+
@NonNull final MerkleDataOutputStream out,
131+
@NonNull final TeacherTreeView<?> view,
132+
@Nullable final Runnable breakConnection,
133+
@NonNull final ReconnectConfig reconnectConfig) {
134+
135+
this.time = Objects.requireNonNull(time);
136+
this.threadManager = Objects.requireNonNull(threadManager, "threadManager must not be null");
137+
inputStream = Objects.requireNonNull(in, "in must not be null");
138+
outputStream = Objects.requireNonNull(out, "out must not be null");
139+
140+
subtrees = new LinkedList<>();
141+
subtrees.add(new TeacherSubtree(null, view));
142+
143+
this.breakConnection = breakConnection;
144+
this.reconnectConfig = Objects.requireNonNull(reconnectConfig, "reconnectConfig must not be null");
145+
}
146+
114147
/**
115148
* Perform synchronization in the role of the teacher.
116149
*/

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/reconnect/ReconnectStatePeerProtocol.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.swirlds.platform.network.protocol.PeerProtocol;
2525
import com.swirlds.platform.network.protocol.ReservedSignedStateResult;
2626
import com.swirlds.platform.state.signed.ReservedSignedState;
27+
import com.swirlds.platform.state.signed.SignedState;
2728
import com.swirlds.state.MerkleNodeState;
2829
import com.swirlds.state.StateLifecycleManager;
2930
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -367,18 +368,26 @@ private void learner(final Connection connection) {
367368
* @param connection the connection to use for the reconnect
368369
*/
369370
private void teacher(final Connection connection) {
370-
try (final ReservedSignedState state = teacherState) {
371-
new ReconnectStateTeacher(
372-
platformContext,
373-
time,
374-
threadManager,
375-
connection,
376-
reconnectSocketTimeout,
377-
connection.getSelfId(),
378-
connection.getOtherId(),
379-
state.get().getRound(),
380-
reconnectMetrics)
381-
.execute(state.get());
371+
try {
372+
final SignedState state = teacherState.get();
373+
final ReconnectStateTeacher teacher;
374+
try {
375+
teacher = new ReconnectStateTeacher(
376+
platformContext,
377+
time,
378+
threadManager,
379+
connection,
380+
reconnectSocketTimeout,
381+
connection.getSelfId(),
382+
connection.getOtherId(),
383+
state.getRound(),
384+
state,
385+
reconnectMetrics);
386+
} finally {
387+
// The teacher now has all the information needed to teach. Time to release the original state
388+
teacherState.close();
389+
}
390+
teacher.execute();
382391
} finally {
383392
teacherThrottle.reconnectAttemptFinished();
384393
teacherState = null;

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/reconnect/ReconnectStateTeacher.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,33 @@
66
import static com.swirlds.platform.reconnect.ReconnectStateLearner.endReconnectHandshake;
77
import static com.swirlds.platform.state.service.PlatformStateUtils.getInfoString;
88

9+
import com.hedera.hapi.node.state.roster.Roster;
910
import com.swirlds.base.time.Time;
1011
import com.swirlds.common.context.PlatformContext;
1112
import com.swirlds.common.io.streams.MerkleDataInputStream;
1213
import com.swirlds.common.io.streams.MerkleDataOutputStream;
1314
import com.swirlds.common.merkle.synchronization.TeachingSynchronizer;
1415
import com.swirlds.common.merkle.synchronization.config.ReconnectConfig;
16+
import com.swirlds.common.merkle.synchronization.views.TeacherTreeView;
1517
import com.swirlds.common.threading.manager.ThreadManager;
1618
import com.swirlds.config.api.Configuration;
1719
import com.swirlds.logging.legacy.payload.ReconnectFinishPayload;
1820
import com.swirlds.logging.legacy.payload.ReconnectStartPayload;
1921
import com.swirlds.platform.config.StateConfig;
2022
import com.swirlds.platform.metrics.ReconnectMetrics;
2123
import com.swirlds.platform.network.Connection;
24+
import com.swirlds.platform.state.signed.SigSet;
2225
import com.swirlds.platform.state.signed.SignedState;
26+
import com.swirlds.state.merkle.VirtualMapState;
27+
import com.swirlds.virtualmap.VirtualMap;
2328
import edu.umd.cs.findbugs.annotations.NonNull;
2429
import java.io.IOException;
2530
import java.net.SocketException;
2631
import java.time.Duration;
2732
import java.util.Objects;
2833
import org.apache.logging.log4j.LogManager;
2934
import org.apache.logging.log4j.Logger;
35+
import org.hiero.base.crypto.Hash;
3036
import org.hiero.consensus.model.node.NodeId;
3137
import org.hiero.consensus.roster.RosterUtils;
3238

@@ -41,6 +47,12 @@ public class ReconnectStateTeacher {
4147
private final Connection connection;
4248
private final Duration reconnectSocketTimeout;
4349

50+
private final TeacherTreeView<Long> teacherView;
51+
private final SigSet signatures;
52+
private final long signingWeight;
53+
private final Roster roster;
54+
private final Hash hash;
55+
4456
private final NodeId selfId;
4557
private final NodeId otherId;
4658
private final long lastRoundReceived;
@@ -76,6 +88,7 @@ public ReconnectStateTeacher(
7688
@NonNull final NodeId selfId,
7789
@NonNull final NodeId otherId,
7890
final long lastRoundReceived,
91+
@NonNull final SignedState signedState,
7992
@NonNull final ReconnectMetrics statistics) {
8093

8194
this.platformContext = Objects.requireNonNull(platformContext);
@@ -89,6 +102,19 @@ public ReconnectStateTeacher(
89102
this.lastRoundReceived = lastRoundReceived;
90103
this.statistics = Objects.requireNonNull(statistics);
91104
this.configuration = Objects.requireNonNull(platformContext.getConfiguration());
105+
106+
signatures = signedState.getSigSet();
107+
signingWeight = signedState.getSigningWeight();
108+
roster = signedState.getRoster();
109+
hash = signedState.getState().getHash();
110+
if (!(signedState.getState() instanceof VirtualMapState virtualMapState)) {
111+
throw new UnsupportedOperationException("Reconnects are only supported for VirtualMap states");
112+
}
113+
final ReconnectConfig reconnectConfig = configuration.getConfigData(ReconnectConfig.class);
114+
// The teacher view will be closed by TeacherSynchronizer in reconnect() below
115+
teacherView = ((VirtualMap) virtualMapState.getRoot()).buildTeacherView(reconnectConfig);
116+
117+
logReconnectStart(signedState);
92118
}
93119

94120
/**
@@ -133,8 +159,7 @@ private void resetSocketTimeout() throws ReconnectStateException {
133159
* @throws ReconnectStateException thrown when current thread is interrupted, or when any I/O related errors occur, or
134160
* when there is an error in the underlying protocol
135161
*/
136-
public void execute(final SignedState signedState) throws ReconnectStateException {
137-
162+
public void execute() throws ReconnectStateException {
138163
// If the connection object to be used here has been disconnected on another thread, we can
139164
// not reconnect with this connection.
140165
if (!connection.connected()) {
@@ -145,12 +170,11 @@ public void execute(final SignedState signedState) throws ReconnectStateExceptio
145170
connection.getOtherId());
146171
return;
147172
}
148-
logReconnectStart(signedState);
149173
increaseSocketTimeout();
150174

151175
try {
152-
sendSignatures(signedState);
153-
reconnect(signedState);
176+
sendSignatures();
177+
reconnect();
154178
endReconnectHandshake(connection);
155179
} catch (final InterruptedException e) {
156180
Thread.currentThread().interrupt();
@@ -197,7 +221,7 @@ private void logReconnectFinish() {
197221
*
198222
* @throws InterruptedException thrown if the current thread is interrupted
199223
*/
200-
private void reconnect(final SignedState signedState) throws InterruptedException, IOException {
224+
private void reconnect() throws InterruptedException, IOException {
201225
logger.info(RECONNECT.getMarker(), "Starting synchronization in the role of the sender.");
202226
statistics.incrementSenderStartTimes();
203227

@@ -206,12 +230,11 @@ private void reconnect(final SignedState signedState) throws InterruptedExceptio
206230

207231
final ReconnectConfig reconnectConfig = configuration.getConfigData(ReconnectConfig.class);
208232
final TeachingSynchronizer synchronizer = new TeachingSynchronizer(
209-
platformContext.getConfiguration(),
210233
time,
211234
threadManager,
212235
new MerkleDataInputStream(connection.getDis()),
213236
new MerkleDataOutputStream(connection.getDos()),
214-
signedState.getState().getRoot(),
237+
teacherView,
215238
connection::disconnect,
216239
reconnectConfig);
217240

@@ -227,19 +250,19 @@ private void reconnect(final SignedState signedState) throws InterruptedExceptio
227250
*
228251
* @throws IOException thrown when any I/O related errors occur
229252
*/
230-
private void sendSignatures(final SignedState signedState) throws IOException {
253+
private void sendSignatures() throws IOException {
231254
final StringBuilder sb = new StringBuilder();
232255
sb.append("Sending signatures from nodes ");
233-
formattedList(sb, signedState.getSigSet().iterator());
256+
formattedList(sb, signatures.iterator());
234257
sb.append(" (signing weight = ")
235-
.append(signedState.getSigningWeight())
258+
.append(signingWeight)
236259
.append("/")
237-
.append(RosterUtils.computeTotalWeight(signedState.getRoster()))
260+
.append(RosterUtils.computeTotalWeight(roster))
238261
.append(") for state hash ")
239-
.append(signedState.getState().getHash());
262+
.append(hash);
240263

241264
logger.info(RECONNECT.getMarker(), sb);
242-
connection.getDos().writeSerializable(signedState.getSigSet(), true);
265+
connection.getDos().writeSerializable(signatures, true);
243266
connection.getDos().flush();
244267
}
245268
}

platform-sdk/swirlds-platform-core/src/test/java/com/swirlds/platform/reconnect/ReconnectTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,9 @@ private void executeReconnect(final ReconnectMetrics reconnectMetrics) throws In
137137
final ReconnectStateTeacher sender = buildSender(
138138
new DummyConnection(
139139
platformContext, pairedStreams.getTeacherInput(), pairedStreams.getTeacherOutput()),
140+
signedState,
140141
reconnectMetrics);
141-
sender.execute(signedState);
142+
sender.execute();
142143
} catch (final IOException ex) {
143144
ex.printStackTrace();
144145
}
@@ -156,7 +157,8 @@ private void executeReconnect(final ReconnectMetrics reconnectMetrics) throws In
156157
}
157158

158159
private ReconnectStateTeacher buildSender(
159-
final SocketConnection connection, final ReconnectMetrics reconnectMetrics) throws IOException {
160+
final SocketConnection connection, final SignedState signedState, final ReconnectMetrics reconnectMetrics)
161+
throws IOException {
160162

161163
final PlatformContext platformContext =
162164
TestPlatformContextBuilder.create().build();
@@ -173,6 +175,7 @@ private ReconnectStateTeacher buildSender(
173175
selfId,
174176
otherId,
175177
lastRoundReceived,
178+
signedState,
176179
reconnectMetrics);
177180
}
178181

0 commit comments

Comments
 (0)