Skip to content

Commit 3d30f13

Browse files
committed
add support in test-clients
Signed-off-by: Petar Tonev <[email protected]>
1 parent 6612050 commit 3d30f13

File tree

5 files changed

+171
-12
lines changed

5 files changed

+171
-12
lines changed

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/BlockNodeController.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,36 @@ public void sendResendBlockImmediately(final long index, final long blockNumber)
189189
}
190190
}
191191

192+
/**
193+
* Send a NodeBehindPublisher response immediately to all active streams on all simulated block nodes.
194+
* This indicates that the block node is behind the publisher and needs to catch up.
195+
*
196+
* @param blockNumber the last verified block number
197+
*/
198+
public void sendNodeBehindPublisherImmediately(final long blockNumber) {
199+
for (final SimulatedBlockNodeServer server : simulatedBlockNodes.values()) {
200+
server.sendNodeBehindPublisherImmediately(blockNumber);
201+
}
202+
log.info("Sent immediate NodeBehindPublisher response for block {} on all simulators", blockNumber);
203+
}
204+
205+
/**
206+
* Send a NodeBehindPublisher response immediately to all active streams on a specific simulated block node.
207+
* This indicates that the block node is behind the publisher and needs to catch up.
208+
*
209+
* @param index the index of the simulated block node (0-based)
210+
* @param blockNumber the last verified block number
211+
*/
212+
public void sendNodeBehindPublisherImmediately(final long index, final long blockNumber) {
213+
if (index >= 0 && index < simulatedBlockNodes.size()) {
214+
final SimulatedBlockNodeServer server = simulatedBlockNodes.get(index);
215+
server.sendNodeBehindPublisherImmediately(blockNumber);
216+
log.info("Sent immediate NodeBehindPublisher response for block {} on simulator {}", blockNumber, index);
217+
} else {
218+
log.error("Invalid simulator index: {}, valid range is 0-{}", index, simulatedBlockNodes.size() - 1);
219+
}
220+
}
221+
192222
/**
193223
* Reset all configured responses on all simulated block nodes to default behavior.
194224
*/

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,17 @@ public void sendResendBlockImmediately(final long blockNumber) {
232232
log.info("Sent immediate ResendBlock response for block {} on port {}", blockNumber, port);
233233
}
234234

235+
/**
236+
* Send a NodeBehindPublisher response immediately to all active streams.
237+
* This indicates that the block node is behind the publisher and needs to catch up.
238+
*
239+
* @param blockNumber the last verified block number to include in the response
240+
*/
241+
public void sendNodeBehindPublisherImmediately(final long blockNumber) {
242+
serviceImpl.sendNodeBehindPublisherToAllStreams(blockNumber);
243+
log.info("Sent immediate NodeBehindPublisher response for block {} on port {}", blockNumber, port);
244+
}
245+
235246
/**
236247
* Gets the last verified block number.
237248
*
@@ -625,6 +636,37 @@ public void sendResendBlockToAllStreams(final long blockNumber) {
625636
}
626637
}
627638

639+
/**
640+
* Sends a NodeBehindPublisher response to all active streams.
641+
* This indicates that the block node is behind the publisher and needs to catch up.
642+
*
643+
* @param blockNumber the last verified block number
644+
*/
645+
public void sendNodeBehindPublisherToAllStreams(final long blockNumber) {
646+
log.info(
647+
"Sending NodeBehindPublisher for block {} to {} active streams on port {}",
648+
blockNumber,
649+
activeStreams.size(),
650+
port);
651+
// Use lock for consistent locking strategy with other methods
652+
blockTrackingLock.readLock().lock(); // Read lock is sufficient for iteration
653+
try {
654+
for (final Pipeline<? super PublishStreamResponse> pipeline : activeStreams) {
655+
try {
656+
sendNodeBehindPublisher(pipeline, blockNumber);
657+
} catch (final Exception e) {
658+
log.error(
659+
"Failed to send NodeBehindPublisher to stream {} on port {}",
660+
pipeline.hashCode(),
661+
port,
662+
e);
663+
}
664+
}
665+
} finally {
666+
blockTrackingLock.readLock().unlock();
667+
}
668+
}
669+
628670
// Helper methods for sending specific responses
629671

630672
/**
@@ -693,6 +735,29 @@ private void sendResendBlock(
693735
log.debug("Sent ResendBlock for block {} to stream {} on port {}", blockNumber, pipeline.hashCode(), port);
694736
}
695737

738+
/**
739+
* Sends a NodeBehindPublisher response to a specific pipeline.
740+
*
741+
* @param pipeline the pipeline to send the response to, must not be null
742+
* @param blockNumber the last verified block number
743+
* @throws NullPointerException if the pipeline is null
744+
*/
745+
private void sendNodeBehindPublisher(
746+
@NonNull final Pipeline<? super PublishStreamResponse> pipeline, final long blockNumber) {
747+
requireNonNull(pipeline, "pipeline cannot be null");
748+
final BehindPublisher behindPublisher =
749+
BehindPublisher.newBuilder().blockNumber(blockNumber).build();
750+
final PublishStreamResponse response = PublishStreamResponse.newBuilder()
751+
.nodeBehindPublisher(behindPublisher)
752+
.build();
753+
pipeline.onNext(response);
754+
log.debug(
755+
"Sent NodeBehindPublisher for block {} to stream {} on port {}",
756+
blockNumber,
757+
pipeline.hashCode(),
758+
port);
759+
}
760+
696761
/**
697762
* Handles sending a BehindPublisher response to a client when the block number is more than 1 ahead of the last verified block.
698763
* This indicates that the client is ahead of the server and should restart streaming from an earlier block.

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeOp.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ private boolean submitSimulatorOp() {
9797
nodeIndex,
9898
verifiedBlock);
9999
break;
100+
case SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY:
101+
controller.sendNodeBehindPublisherImmediately(nodeIndex, blockNumber);
102+
verifiedBlock = controller.getLastVerifiedBlockNumber(nodeIndex);
103+
log.info(
104+
"Sent immediate NodeBehindPublisher response for block {} on simulator {}, last verified block: {}",
105+
blockNumber,
106+
nodeIndex,
107+
verifiedBlock);
108+
break;
100109
case SET_END_OF_STREAM_RESPONSE:
101110
controller.setEndOfStreamResponse(nodeIndex, responseCode, blockNumber);
102111
verifiedBlock = controller.getLastVerifiedBlockNumber(nodeIndex);
@@ -227,6 +236,8 @@ public enum BlockNodeAction {
227236
SEND_SKIP_BLOCK_IMMEDIATELY,
228237
/** Send {@link PublishStreamResponse.ResendBlock} response */
229238
SEND_RESEND_BLOCK_IMMEDIATELY,
239+
/** Send {@link PublishStreamResponse.BehindPublisher} response */
240+
SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY,
230241
/** Set {@link PublishStreamResponse.EndOfStream} response */
231242
SET_END_OF_STREAM_RESPONSE,
232243
/** Reset all responses to default behavior */
@@ -273,6 +284,18 @@ public static SendResendBlockBuilder sendResendBlockImmediately(final long nodeI
273284
return new SendResendBlockBuilder(nodeIndex, blockNumber);
274285
}
275286

287+
/**
288+
* Creates a builder for sending an immediate {@link PublishStreamResponse.BehindPublisher} response to a block node simulator.
289+
*
290+
* @param nodeIndex the index of the block node simulator (0-based)
291+
* @param blockNumber the last verified block number
292+
* @return a builder for the operation
293+
*/
294+
public static SendNodeBehindPublisherBuilder sendNodeBehindPublisherImmediately(
295+
final long nodeIndex, final long blockNumber) {
296+
return new SendNodeBehindPublisherBuilder(nodeIndex, blockNumber);
297+
}
298+
276299
/**
277300
* Creates a builder for shutting down a specific block node immediately.
278301
*
@@ -481,6 +504,42 @@ protected boolean submitOp(final HapiSpec spec) throws Throwable {
481504
}
482505
}
483506

507+
/**
508+
* Builder for sending an immediate NodeBehindPublisher response to a block node simulator.
509+
* This builder also implements UtilOp so it can be used directly in HapiSpec without calling build().
510+
*/
511+
public static class SendNodeBehindPublisherBuilder extends UtilOp {
512+
private final long nodeIndex;
513+
private final long blockNumber;
514+
515+
private SendNodeBehindPublisherBuilder(final long nodeIndex, final long blockNumber) {
516+
this.nodeIndex = nodeIndex;
517+
this.blockNumber = blockNumber;
518+
}
519+
520+
/**
521+
* Builds the operation.
522+
*
523+
* @return the operation
524+
*/
525+
public BlockNodeOp build() {
526+
return new BlockNodeOp(
527+
nodeIndex,
528+
BlockNodeAction.SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY,
529+
null,
530+
blockNumber,
531+
null,
532+
null,
533+
true,
534+
true);
535+
}
536+
537+
@Override
538+
protected boolean submitOp(final HapiSpec spec) throws Throwable {
539+
return build().submitOp(spec);
540+
}
541+
}
542+
484543
public static class ShutdownBuilder extends UtilOp {
485544
private final long nodeIndex;
486545
private boolean persistState = true;

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeVerbs.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ public BlockNodeOp sendResendBlockImmediately(long blockNumber) {
7979
.build();
8080
}
8181

82+
/**
83+
* Sends an immediate NodeBehindPublisher response to the block node simulator.
84+
*
85+
* @param blockNumber the last verified block number
86+
* @return the operation
87+
*/
88+
public BlockNodeOp sendNodeBehindPublisherImmediately(long blockNumber) {
89+
return BlockNodeOp.sendNodeBehindPublisherImmediately(nodeIndex, blockNumber)
90+
.build();
91+
}
92+
8293
/**
8394
* Shuts down the block node simulator immediately.
8495
*

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -335,17 +335,14 @@ final Stream<DynamicTest> node0StreamingBlockNodeConnectionDropsCanStreamGenesis
335335
doingContextual(
336336
spec -> LockSupport.parkNanos(Duration.ofSeconds(10).toNanos())),
337337
doingContextual(spec -> time.set(Instant.now())),
338-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(Long.MAX_VALUE),
338+
blockNode(0).sendNodeBehindPublisherImmediately(Long.MAX_VALUE),
339339
sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe(
340340
byNodeId(0),
341341
time::get,
342342
Duration.ofSeconds(30),
343343
Duration.ofSeconds(45),
344344
String.format(
345345
"/localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.",
346-
portNumbers.getFirst()),
347-
String.format(
348-
"/localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND).",
349346
portNumbers.getFirst()))),
350347
doingContextual(
351348
spec -> LockSupport.parkNanos(Duration.ofSeconds(10).toNanos())));
@@ -811,13 +808,13 @@ final Stream<DynamicTest> node0StreamingExponentialBackoff() {
811808
return hapiTest(
812809
waitUntilNextBlocks(1).withBackgroundTraffic(true),
813810
doingContextual(spec -> time.set(Instant.now())),
814-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(1L),
811+
blockNode(0).sendNodeBehindPublisherImmediately(1L),
815812
waitUntilNextBlocks(1).withBackgroundTraffic(true),
816-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(2L),
813+
blockNode(0).sendNodeBehindPublisherImmediately(2L),
817814
waitUntilNextBlocks(1).withBackgroundTraffic(true),
818-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(3L),
815+
blockNode(0).sendNodeBehindPublisherImmediately(3L),
819816
waitUntilNextBlocks(1).withBackgroundTraffic(true),
820-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(4L),
817+
blockNode(0).sendNodeBehindPublisherImmediately(4L),
821818
sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe(
822819
byNodeId(0),
823820
time::get,
@@ -902,15 +899,12 @@ final Stream<DynamicTest> testCNReactionToPublishStreamResponses() {
902899
String.format(
903900
"/localhost:%s/ACTIVE] BlockAcknowledgement received for block",
904901
portNumbers.getFirst()))),
905-
blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(Long.MAX_VALUE),
902+
blockNode(0).sendNodeBehindPublisherImmediately(Long.MAX_VALUE),
906903
sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe(
907904
byNodeId(0),
908905
time::get,
909906
Duration.ofSeconds(20),
910907
Duration.ofSeconds(20),
911-
String.format(
912-
"/localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND)",
913-
portNumbers.getFirst()),
914908
String.format(
915909
"/localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.",
916910
portNumbers.getFirst()))),

0 commit comments

Comments
 (0)