diff --git a/hapi/hapi/build.gradle.kts b/hapi/hapi/build.gradle.kts index 1c2bf9ffec17..2d6ecc3fad9c 100644 --- a/hapi/hapi/build.gradle.kts +++ b/hapi/hapi/build.gradle.kts @@ -20,7 +20,7 @@ tasks.withType().configureEach { // block would not be needed. dependencies { protobuf(platform(project(":hiero-dependency-versions"))) - protobuf("org.hiero.block:block-node-protobuf-sources") + protobuf("org.hiero.block-node:protobuf-sources") } sourceSets { diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index 1c4236090566..9d0080150bcd 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -9,6 +9,10 @@ 5. [Lifecycle](#lifecycle) 6. [State Machine Diagrams](#state-machine-diagrams) 7. [Error Handling](#error-handling) + - [Consensus Node Behavior on EndOfStream Response Codes](#consensus-node-behavior-on-endofstream-response-codes) + - [Consensus Node Behavior on BehindPublisher Response](#consensus-node-behavior-on-behindpublisher-response) + - [EndOfStream Rate Limiting](#endofstream-rate-limiting) + - [Pipeline Operation Timeout](#pipeline-operation-timeout) ## Abstract @@ -67,7 +71,7 @@ the current streaming block needs to be updated. If this is the first time the w the following will happen: - If the connection was initialized with an explicit block to start with, then that block will be loaded from the block buffer. - If the connection wasn't initialized with an explicit block, then the most recent block produced is chosen as the block to start streaming. -- If at any point during the lifespan of the connection a `SkipBlock` or `ResendBlock` response is received, then the +- If at any point during the lifespan of the connection a `SkipBlock`, `ResendBlock` or `BehindPublisher` response is received, then the worker will detect this and switch to that block. If the block is not available yet (e.g. no items yet) then the worker will go back to sleep and try again later. If the @@ -94,7 +98,7 @@ Under normal situations, the worker will continue advancing to the next block af associated with the current block. This process will repeat until the connection is terminated for any reason. (A conditional check is performed each time the worker wakes up ensuring the connection is not in a terminal state.) -For cases where an `EndStream` response is received from the block node or some other internal error condition is encountered +For cases where an `EndOfStream` response is received from the block node or some other internal error condition is encountered \- e.g. transient network error - then the connection will transition to a `CLOSING` state. This state signals that the connection has entered a terminal state and is in the process of stopping and being cleaned up. Once final cleanup processes complete, the connection will transition to the final end state: `CLOSED`. Once a connection enters a terminal state, no further @@ -155,6 +159,7 @@ stateDiagram-v2 ACTIVE --> CLOSING : EndOfStream UNKNOWN ACTIVE --> CLOSING : Block not found in buffer ACTIVE --> CLOSING : ResendBlock unavailable + ACTIVE --> CLOSING : BehindPublisher unavailable ACTIVE --> CLOSING : gRPC onError ACTIVE --> CLOSING : Stream failure ACTIVE --> CLOSING : Pipeline operation timeout @@ -162,8 +167,8 @@ stateDiagram-v2 ACTIVE --> ACTIVE : BlockAcknowledgement ACTIVE --> ACTIVE : SkipBlock ACTIVE --> ACTIVE : ResendBlock available + ACTIVE --> ACTIVE : BehindPublisher available ACTIVE --> ACTIVE : Normal streaming - ACTIVE --> CLOSING : EndOfStream BEHIND
restart at next block ACTIVE --> CLOSING : EndOfStream TIMEOUT
restart at next block ACTIVE --> CLOSING : EndOfStream DUPLICATE_BLOCK
restart at next block ACTIVE --> CLOSING : EndOfStream BAD_BLOCK_PROOF
restart at next block @@ -217,24 +222,36 @@ sequenceDiagram ### Consensus Node Behavior on EndOfStream Response Codes -| Code | Connect to Other Node | Retry Behavior | Initial Retry Delay | Exponential Backoff | Restart at Block | Special Behaviour | -|:------------------------------|:----------------------|:--------------------|:--------------------|:--------------------|:-----------------|:----------------------------------------------------------------------------------------------------| -| `SUCCESS` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | -| `BEHIND` with block in buffer | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | -| `BEHIND` w/o block in buffer | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | CN sends `EndStream.TOO_FAR_BEHIND` to indicate the BN to look for the block from other Block Nodes | -| `ERROR` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | -| `PERSISTENCE_FAILED` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | -| `TIMEOUT` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | -| `DUPLICATE_BLOCK` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | -| `BAD_BLOCK_PROOF` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | -| `INVALID_REQUEST` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | -| `UNKNOWN` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | +| Code | Connect to Other Node | Retry Behavior | Initial Retry Delay | Exponential Backoff | Restart at Block | Special Behaviour | +|:---------------------|:----------------------|:--------------------|:--------------------|:--------------------|:-----------------|:------------------| +| `SUCCESS` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | +| `ERROR` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | +| `PERSISTENCE_FAILED` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | +| `TIMEOUT` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | +| `DUPLICATE_BLOCK` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | +| `BAD_BLOCK_PROOF` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | +| `INVALID_REQUEST` | No (retry same) | Exponential backoff | 1 second | Yes (2x, jittered) | blockNumber + 1 | | +| `UNKNOWN` | Yes (immediate) | Fixed delay | 30 seconds | No | Latest | | **Notes:** - **Exponential Backoff**: When enabled, delay starts at 1 second and doubles (2x multiplier) on each retry attempt with jitter applied (delay/2 + random(0, delay/2)) to spread out retry attempts. Max backoff is configurable via `maxBackoffDelay`. - **Connect to Other Node**: When "Yes (immediate)", the manager will immediately attempt to connect to the next available priority node while the failed node is rescheduled for retry. - **Restart at Block**: "Latest" means reconnection starts at the latest produced block; "blockNumber + 1" means reconnection continues from the block following the acknowledged block. +### Consensus Node Behavior on BehindPublisher Response + +The `BehindPublisher` response indicates that the block node is behind the publisher (consensus node) and needs earlier blocks. This is a separate response type from `EndOfStream`, and crucially **does not close the stream** when the consensus node can help. + +| Condition | Connection Behavior | Stream Closed | Resume at Block | Special Behaviour | +|:-------------------------------------|:---------------------|:--------------|:----------------|:----------------------------------------------------------------------------------------------| +| Block available in buffer | Stay connected | No | blockNumber + 1 | Connection remains open; streaming resumes from the earlier block | +| Block not available (too far behind) | Close and reschedule | Yes | Latest | CN sends `EndStream.TOO_FAR_BEHIND` to indicate the BN should catch up from other Block Nodes | +| Block not available (future block) | Close and reschedule | Yes | Latest | CN sends `EndStream.ERROR` - this indicates an unexpected state | + +**Key Behavioral Notes:** +- If the consensus node has the requested block in its buffer, it simply updates the streaming block and continues on the same connection. +- The connection is only closed when the consensus node cannot provide the requested block. + ### EndOfStream Rate Limiting The connection implements a configurable rate limiting mechanism for EndOfStream responses to prevent rapid reconnection cycles and manage system resources effectively. diff --git a/hedera-node/docs/design/app/blocks/BlockNodeStreamingMetrics.md b/hedera-node/docs/design/app/blocks/BlockNodeStreamingMetrics.md index cb9e388ac4e9..0a49351ef226 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeStreamingMetrics.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeStreamingMetrics.md @@ -50,23 +50,24 @@ with "conn" for identification. These metrics relate to responses received from a block node. They are identified using the "connRecv" prefix. -| Metric Name | Type | Description | -|----------------------------------------------------|--------------|-------------------------------------------------------------| -| `blockStream_connRecv_unknown` | Counter | Number of responses received that are of unknown types | -| `blockStream_connRecv_acknowledgement` | Counter | Number of Acknowledgement responses received | -| `blockStream_connRecv_skipBlock` | Counter | Number of SkipBlock responses received | -| `blockStream_connRecv_resendBlock` | Counter | Number of ResendBlock responses received | -| `blockStream_connRecv_latestBlockEndOfStream` | Gauge (long) | The latest block number received in an EndOfStream response | -| `blockStream_connRecv_latestBlockSkipBlock` | Gauge (long) | The latest block number received in a SkipBlock response | -| `blockStream_connRecv_latestBlockResendBlock` | Gauge (long) | The latest block number received in a ResendBlock response | -| `blockStream_connRecv_endStream_success` | Counter | Number of EndStream.Success responses received | -| `blockStream_connRecv_endStream_invalidRequest` | Counter | Number of EndStream.InvalidRequest responses received | -| `blockStream_connRecv_endStream_error` | Counter | Number of EndStream.Error responses received | -| `blockStream_connRecv_endStream_timeout` | Counter | Number of EndStream.Timeout responses received | -| `blockStream_connRecv_endStream_duplicateBlock` | Counter | Number of EndStream.DuplicateBlock responses received | -| `blockStream_connRecv_endStream_badBlockProof` | Counter | Number of EndStream.BadBlockProof responses received | -| `blockStream_connRecv_endStream_behind` | Counter | Number of EndStream.Behind responses received | -| `blockStream_connRecv_endStream_persistenceFailed` | Counter | Number of EndStream.PersistenceFailed responses received | +| Metric Name | Type | Description | +|----------------------------------------------------|--------------|----------------------------------------------------------------| +| `blockStream_connRecv_unknown` | Counter | Number of responses received that are of unknown types | +| `blockStream_connRecv_acknowledgement` | Counter | Number of Acknowledgement responses received | +| `blockStream_connRecv_skipBlock` | Counter | Number of SkipBlock responses received | +| `blockStream_connRecv_resendBlock` | Counter | Number of ResendBlock responses received | +| `blockStream_connRecv_nodeBehindPublisher` | Counter | Number of BehindPublisher responses received | +| `blockStream_connRecv_latestBlockEndOfStream` | Gauge (long) | The latest block number received in an EndOfStream response | +| `blockStream_connRecv_latestBlockSkipBlock` | Gauge (long) | The latest block number received in a SkipBlock response | +| `blockStream_connRecv_latestBlockResendBlock` | Gauge (long) | The latest block number received in a ResendBlock response | +| `blockStream_connRecv_latestBlockBehindPublisher` | Gauge (long) | The latest block number received in a BehindPublisher response | +| `blockStream_connRecv_endStream_success` | Counter | Number of EndStream.Success responses received | +| `blockStream_connRecv_endStream_invalidRequest` | Counter | Number of EndStream.InvalidRequest responses received | +| `blockStream_connRecv_endStream_error` | Counter | Number of EndStream.Error responses received | +| `blockStream_connRecv_endStream_timeout` | Counter | Number of EndStream.Timeout responses received | +| `blockStream_connRecv_endStream_duplicateBlock` | Counter | Number of EndStream.DuplicateBlock responses received | +| `blockStream_connRecv_endStream_badBlockProof` | Counter | Number of EndStream.BadBlockProof responses received | +| `blockStream_connRecv_endStream_persistenceFailed` | Counter | Number of EndStream.PersistenceFailed responses received | ## Connection Send Metrics diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 03674988e1ac..b38bf796159c 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -42,6 +42,7 @@ import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamRequest.EndStream; import org.hiero.block.api.PublishStreamResponse; +import org.hiero.block.api.PublishStreamResponse.BehindPublisher; import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement; import org.hiero.block.api.PublishStreamResponse.EndOfStream; import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code; @@ -483,26 +484,6 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) { logger.info("{} Block node orderly ended the stream at block {}.", this, blockNumber); closeAndReschedule(THIRTY_SECONDS, true); } - case Code.BEHIND -> { - // The block node is behind us, check if we have the last verified block still available in order to - // restart the stream from there - final long restartBlockNumber = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1; - if (blockBufferService.getBlockState(restartBlockNumber) != null) { - logger.info( - "{} Block node reported it is behind. Will restart stream at block {}.", - this, - restartBlockNumber); - - closeAndRestart(restartBlockNumber); - } else { - // If we don't have the block state, we schedule retry for this connection and establish new one - // with different block node - logger.info("{} Block node is behind and block state is not available. Ending the stream.", this); - - // Indicate that the block node should recover and catch up from another trustworthy block node - endStreamAndReschedule(TOO_FAR_BEHIND); - } - } case Code.UNKNOWN -> { // This should never happen, but if it does, schedule this connection for a retry attempt // and in the meantime select a new node to stream to @@ -570,6 +551,39 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) { } } + /** + * Handles the {@link BehindPublisher} response received from the block node. + * If the consensus node has the requested block state available, it will start streaming it. + * Otherwise, it will close the connection and retry with a different block node. + * + * @param nodeBehind the BehindPublisher response received from the block node + */ + private void handleBlockNodeBehind(@NonNull final BehindPublisher nodeBehind) { + requireNonNull(nodeBehind, "nodeBehind must not be null"); + final long blockNumber = nodeBehind.blockNumber(); + logger.info("{} Received BehindPublisher response for block {}.", this, blockNumber); + + final long blockToStream = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1; + // The block node is behind us, check if we have the last verified block still available + // to start streaming from there + if (blockBufferService.getBlockState(blockToStream) != null) { + logger.info("{} Block node reported it is behind. Will start streaming block {}.", this, blockToStream); + + streamingBlockNumber.set(blockToStream); + } else { + // If we don't have the block state, we schedule retry for this connection + // and establish new one with different block node + logger.info("{} Block node is behind and block state is not available. Ending the stream.", this); + + if (blockToStream < blockBufferService.getEarliestAvailableBlockNumber()) { + // Indicate that the block node should catch up from another trustworthy block node + endStreamAndReschedule(TOO_FAR_BEHIND); + } else if (blockToStream > blockBufferService.getLastBlockNumberProduced()) { + endStreamAndReschedule(ERROR); + } + } + } + /** * Send an EndStream request to end the stream and close the connection. * @@ -857,6 +871,11 @@ public void onNext(final @NonNull PublishStreamResponse response) { blockStreamMetrics.recordLatestBlockResendBlock( response.resendBlock().blockNumber()); handleResendBlock(response.resendBlock()); + } else if (response.hasNodeBehindPublisher()) { + blockStreamMetrics.recordResponseReceived(response.response().kind()); + blockStreamMetrics.recordLatestBlockBehindPublisher( + response.nodeBehindPublisher().blockNumber()); + handleBlockNodeBehind(response.nodeBehindPublisher()); } else { blockStreamMetrics.recordUnknownResponseReceived(); logger.debug("{} Unexpected response received: {}.", this, response); diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java index f7a3725ee20c..fda31be94543 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java @@ -58,6 +58,7 @@ public class BlockStreamMetrics { private LongGauge connRecv_latestBlockEndOfStreamGauge; private LongGauge connRecv_latestBlockSkipBlockGauge; private LongGauge connRecv_latestBlockResendBlockGauge; + private LongGauge connRecv_latestBlockBehindPublisherGauge; // connectivity metrics private Counter conn_onCompleteCounter; @@ -496,6 +497,10 @@ private void registerConnectionRecvMetrics() { final LongGauge.Config latestBlockResendCfg = newLongGauge(GROUP_CONN_RECV, "latestBlockResendBlock") .withDescription("The latest block number received in a ResendBlock response"); this.connRecv_latestBlockResendBlockGauge = metrics.getOrCreate(latestBlockResendCfg); + + final LongGauge.Config latestBlockBehindCfg = newLongGauge(GROUP_CONN_RECV, "latestBlockBehindPublisher") + .withDescription("The latest block number received in a BehindPublisher response"); + this.connRecv_latestBlockBehindPublisherGauge = metrics.getOrCreate(latestBlockBehindCfg); } /** @@ -559,6 +564,14 @@ public void recordLatestBlockResendBlock(final long blockNumber) { connRecv_latestBlockResendBlockGauge.set(blockNumber); } + /** + * Record the latest block number received in a BehindPublisher response. + * @param blockNumber the block number from the response + */ + public void recordLatestBlockBehindPublisher(final long blockNumber) { + connRecv_latestBlockBehindPublisherGauge.set(blockNumber); + } + // Connection SEND metrics ----------------------------------------------------------------------------------------- private void registerConnectionSendMetrics() { diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java index 260f9e572309..db3baf2bcea8 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java @@ -30,6 +30,7 @@ import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamRequest.EndStream; import org.hiero.block.api.PublishStreamResponse; +import org.hiero.block.api.PublishStreamResponse.BehindPublisher; import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement; import org.hiero.block.api.PublishStreamResponse.EndOfStream; import org.hiero.block.api.PublishStreamResponse.ResendBlock; @@ -66,6 +67,15 @@ protected static PublishStreamResponse createEndOfStreamResponse( return PublishStreamResponse.newBuilder().endStream(eos).build(); } + @NonNull + protected static PublishStreamResponse createBlockNodeBehindResponse(final long lastVerifiedBlock) { + final BehindPublisher nodeBehind = + BehindPublisher.newBuilder().blockNumber(lastVerifiedBlock).build(); + return PublishStreamResponse.newBuilder() + .nodeBehindPublisher(nodeBehind) + .build(); + } + @NonNull protected static PublishStreamResponse createBlockAckResponse(final long blockNumber) { final BlockAcknowledgement blockAck = diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 63822e01fcf5..d602d9f873db 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -629,41 +629,68 @@ void testOnNext_endOfStream_blockNodeGracefulShutdown() { } @Test - void testOnNext_endOfStream_blockNodeBehind_blockExists() { + void testOnNext_blockNodeBehind_blockExists() { openConnectionAndResetMocks(); - final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L); - when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L); + final PublishStreamResponse response = createBlockNodeBehindResponse(10L); when(bufferService.getBlockState(11L)).thenReturn(new BlockState(11L)); connection.updateConnectionState(ConnectionState.ACTIVE); connection.onNext(response); - verify(metrics).recordLatestBlockEndOfStream(10L); - verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND); + verify(metrics).recordLatestBlockBehindPublisher(10L); + verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER); + verify(bufferService).getBlockState(11L); + verifyNoMoreInteractions(metrics); + verifyNoMoreInteractions(requestPipeline); + } + + @Test + void testOnNext_blockNodeBehind_blockDoesNotExist_TooFarBehind() { + openConnectionAndResetMocks(); + final PublishStreamResponse response = createBlockNodeBehindResponse(10L); + when(bufferService.getBlockState(11L)).thenReturn(null); + when(bufferService.getEarliestAvailableBlockNumber()).thenReturn(12L); + + connection.updateConnectionState(ConnectionState.ACTIVE); + connection.onNext(response); + + verify(metrics).recordLatestBlockBehindPublisher(10L); + verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER); verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); - verify(requestPipeline).onComplete(); - verify(connectionManager).rescheduleConnection(connection, null, 11L, false); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND); + verify(metrics).recordRequestLatency(anyLong()); + verify(bufferService, atLeastOnce()).getEarliestAvailableBlockNumber(); + verify(bufferService, atLeastOnce()).getHighestAckedBlockNumber(); verify(bufferService).getBlockState(11L); + verify(requestPipeline) + .onNext(PublishStreamRequest.newBuilder() + .endStream(EndStream.newBuilder() + .endCode(EndStream.Code.TOO_FAR_BEHIND) + .earliestBlockNumber(12L) + .build()) + .build()); + verify(requestPipeline).onComplete(); + verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); } @Test - void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist() { + void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist_Error() { openConnectionAndResetMocks(); - final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L); + final PublishStreamResponse response = createBlockNodeBehindResponse(10L); when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L); when(bufferService.getBlockState(11L)).thenReturn(null); connection.updateConnectionState(ConnectionState.ACTIVE); connection.onNext(response); - verify(metrics).recordLatestBlockEndOfStream(10L); - verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND); + verify(metrics).recordLatestBlockBehindPublisher(10L); + verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER); verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); - verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.ERROR); verify(metrics).recordRequestLatency(anyLong()); verify(bufferService, atLeastOnce()).getEarliestAvailableBlockNumber(); verify(bufferService, atLeastOnce()).getHighestAckedBlockNumber(); @@ -671,7 +698,7 @@ void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist() { verify(requestPipeline) .onNext(PublishStreamRequest.newBuilder() .endStream(EndStream.newBuilder() - .endCode(EndStream.Code.TOO_FAR_BEHIND) + .endCode(EndStream.Code.ERROR) .latestBlockNumber(10L) .build()) .build()); @@ -1615,26 +1642,23 @@ void testOnNext_endOfStream_clientFailures_maxValueBlockNumber(final EndOfStream assertThat(connection.currentState()).isEqualTo(ConnectionState.CLOSED); } - // Tests EndOfStream BEHIND code with Long.MAX_VALUE edge case (should restart at block 0) + // Tests BehindPublisher code with Long.MAX_VALUE edge case (should restart at block 0) @Test - void testOnNext_endOfStream_blockNodeBehind_maxValueBlockNumber() { + void testOnNext_blockNodeBehind_maxValueBlockNumber() { openConnectionAndResetMocks(); - final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, Long.MAX_VALUE); + final PublishStreamResponse response = createBlockNodeBehindResponse(Long.MAX_VALUE); when(bufferService.getBlockState(0L)).thenReturn(new BlockState(0L)); connection.updateConnectionState(ConnectionState.ACTIVE); connection.onNext(response); - verify(metrics).recordLatestBlockEndOfStream(Long.MAX_VALUE); - verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND); - verify(requestPipeline).onComplete(); - verify(connectionManager) - .rescheduleConnection(connection, null, 0L, false); // Should restart at 0 for MAX_VALUE + verify(metrics).recordLatestBlockBehindPublisher(Long.MAX_VALUE); + verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER); verify(bufferService).getBlockState(0L); - verify(metrics).recordConnectionClosed(); - verify(metrics).recordActiveConnectionIp(-1L); verifyNoMoreInteractions(metrics); verifyNoMoreInteractions(requestPipeline); + verifyNoMoreInteractions(connectionManager); + verifyNoMoreInteractions(bufferService); } // Tests stream failure handling without calling onComplete on the pipeline diff --git a/hedera-node/infrastructure/grafana/dashboards/production/hedera-node/block-stream.json b/hedera-node/infrastructure/grafana/dashboards/production/hedera-node/block-stream.json index a79f8a7c0035..e4defa0294c5 100644 --- a/hedera-node/infrastructure/grafana/dashboards/production/hedera-node/block-stream.json +++ b/hedera-node/infrastructure/grafana/dashboards/production/hedera-node/block-stream.json @@ -6738,10 +6738,10 @@ "gridPos": { "h": 8, "w": 12, - "x": 12, - "y": 158 + "x": 0, + "y": 166 }, - "id": 37, + "id": 38, "options": { "legend": { "calcs": [ @@ -6766,7 +6766,7 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "max by(node) (delta(blockStream_connRecv_endStream_behind_total{node=~\"$NodeId\", environment=\"$environment\"}[1m]))", + "expr": "max by(node) (delta(blockStream_connRecv_endStream_persistenceFailed_total{node=~\"$NodeId\", environment=\"$environment\"}[1m]))", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "node {{node}}", @@ -6775,7 +6775,7 @@ "useBackend": false } ], - "title": "EndStream:Behind Responses Received (1m delta)", + "title": "EndStream:PersistenceFailed Responses Received (1m delta)", "type": "timeseries" }, { @@ -6786,53 +6786,29 @@ "fieldConfig": { "defaults": { "color": { - "mode": "palette-classic" + "fixedColor": "blue", + "mode": "fixed" }, "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" + "align": "auto", + "cellOptions": { + "type": "auto" }, - "showPoints": "never", - "showValues": false, - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" + "footer": { + "reducers": [] }, - "thresholdsStyle": { - "mode": "off" - } + "inspect": false, + "wrapText": false }, + "fieldMinMax": false, "mappings": [], + "noValue": "0", "thresholds": { - "mode": "absolute", + "mode": "percentage", "steps": [ { "color": "green", "value": 0 - }, - { - "color": "red", - "value": 80 } ] } @@ -6841,25 +6817,15 @@ }, "gridPos": { "h": 8, - "w": 12, - "x": 0, + "w": 6, + "x": 12, "y": 166 }, - "id": 38, + "id": 61, "options": { - "legend": { - "calcs": [ - "last" - ], - "displayMode": "table", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "hideZeros": false, - "mode": "multi", - "sort": "none" - } + "cellHeight": "sm", + "frameIndex": 0, + "showHeader": true }, "pluginVersion": "12.4.0-19576184216", "targets": [ @@ -6870,17 +6836,61 @@ }, "disableTextWrap": false, "editorMode": "builder", - "expr": "max by(node) (delta(blockStream_connRecv_endStream_persistenceFailed_total{node=~\"$NodeId\", environment=\"$environment\"}[1m]))", + "exemplar": false, + "expr": "max by(node) (blockStream_connRecv_latestBlockResendBlock{node=~\"$NodeId\", environment=\"$environment\"})", "fullMetaSearch": false, "includeNullMetadata": true, - "legendFormat": "node {{node}}", + "instant": false, + "legendFormat": "{{node}}", "range": true, "refId": "A", "useBackend": false } ], - "title": "EndStream:PersistenceFailed Responses Received (1m delta)", - "type": "timeseries" + "title": "Latest Block received in ResendBlock", + "transformations": [ + { + "id": "joinByLabels", + "options": { + "value": "node" + } + }, + { + "id": "reduce", + "options": { + "includeTimeField": false, + "labelsToFields": false, + "mode": "seriesToRows", + "reducers": [ + "lastNotNull" + ] + } + }, + { + "id": "organize", + "options": { + "excludeByName": {}, + "includeByName": {}, + "indexByName": {}, + "renameByName": { + "Field": "Node", + "Last *": "Block" + } + } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "field": "Node" + } + ] + } + } + ], + "type": "table" }, { "datasource": { @@ -6922,10 +6932,10 @@ "gridPos": { "h": 8, "w": 6, - "x": 12, + "x": 18, "y": 166 }, - "id": 61, + "id": 60, "options": { "cellHeight": "sm", "frameIndex": 0, @@ -6941,7 +6951,7 @@ "disableTextWrap": false, "editorMode": "builder", "exemplar": false, - "expr": "max by(node) (blockStream_connRecv_latestBlockResendBlock{node=~\"$NodeId\", environment=\"$environment\"})", + "expr": "max by(node) (blockStream_connRecv_latestBlockSkipBlock{node=~\"$NodeId\", environment=\"$environment\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -6951,7 +6961,7 @@ "useBackend": false } ], - "title": "Latest Block received in ResendBlock", + "title": "Latest Block received in SkipBlock", "transformations": [ { "id": "joinByLabels", @@ -7036,10 +7046,10 @@ "gridPos": { "h": 8, "w": 6, - "x": 18, - "y": 166 + "x": 0, + "y": 174 }, - "id": 60, + "id": 59, "options": { "cellHeight": "sm", "frameIndex": 0, @@ -7055,7 +7065,7 @@ "disableTextWrap": false, "editorMode": "builder", "exemplar": false, - "expr": "max by(node) (blockStream_connRecv_latestBlockSkipBlock{node=~\"$NodeId\", environment=\"$environment\"})", + "expr": "max by(node) (blockStream_connRecv_latestBlockEndOfStream{node=~\"$NodeId\", environment=\"$environment\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -7065,7 +7075,7 @@ "useBackend": false } ], - "title": "Latest Block received in SkipBlock", + "title": "Latest Block received in EndOfStream", "transformations": [ { "id": "joinByLabels", @@ -7148,18 +7158,18 @@ "overrides": [] }, "gridPos": { - "h": 8, + "h": 9, "w": 6, - "x": 0, + "x": 12, "y": 174 }, - "id": 59, + "id": 61, "options": { "cellHeight": "sm", "frameIndex": 0, "showHeader": true }, - "pluginVersion": "12.4.0-19576184216", + "pluginVersion": "12.2.0", "targets": [ { "datasource": { @@ -7169,7 +7179,7 @@ "disableTextWrap": false, "editorMode": "builder", "exemplar": false, - "expr": "max by(node) (blockStream_connRecv_latestBlockEndOfStream{node=~\"$NodeId\", environment=\"$environment\"})", + "expr": "max by(node) (blockStream_connRecv_latestBlockBehindPublisher{node=~\"$NodeId\", environment=\"$environment\"})", "fullMetaSearch": false, "includeNullMetadata": true, "instant": false, @@ -7179,7 +7189,7 @@ "useBackend": false } ], - "title": "Latest Block received in EndOfStream", + "title": "Latest Block received in NodeBehindPublisher", "transformations": [ { "id": "joinByLabels", diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/containers/BlockNodeContainer.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/containers/BlockNodeContainer.java index eadbf32b19da..a8a1da41e183 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/containers/BlockNodeContainer.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/containers/BlockNodeContainer.java @@ -12,7 +12,7 @@ * A test container for running a block node server instance. */ public class BlockNodeContainer extends GenericContainer { - private static final String BLOCK_NODE_VERSION = "0.23.1"; + private static final String BLOCK_NODE_VERSION = "0.25.0"; private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("ghcr.io/hiero-ledger/hiero-block-node:" + BLOCK_NODE_VERSION); private static final int GRPC_PORT = 40840; diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/BlockNodeController.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/BlockNodeController.java index 67ea7d463a2b..14e6cee0c0a4 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/BlockNodeController.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/BlockNodeController.java @@ -189,6 +189,36 @@ public void sendResendBlockImmediately(final long index, final long blockNumber) } } + /** + * Send a NodeBehindPublisher response immediately to all active streams on all simulated block nodes. + * This indicates that the block node is behind the publisher and needs to catch up. + * + * @param blockNumber the last verified block number + */ + public void sendNodeBehindPublisherImmediately(final long blockNumber) { + for (final SimulatedBlockNodeServer server : simulatedBlockNodes.values()) { + server.sendNodeBehindPublisherImmediately(blockNumber); + } + log.info("Sent immediate NodeBehindPublisher response for block {} on all simulators", blockNumber); + } + + /** + * Send a NodeBehindPublisher response immediately to all active streams on a specific simulated block node. + * This indicates that the block node is behind the publisher and needs to catch up. + * + * @param index the index of the simulated block node (0-based) + * @param blockNumber the last verified block number + */ + public void sendNodeBehindPublisherImmediately(final long index, final long blockNumber) { + if (index >= 0 && index < simulatedBlockNodes.size()) { + final SimulatedBlockNodeServer server = simulatedBlockNodes.get(index); + server.sendNodeBehindPublisherImmediately(blockNumber); + log.info("Sent immediate NodeBehindPublisher response for block {} on simulator {}", blockNumber, index); + } else { + log.error("Invalid simulator index: {}, valid range is 0-{}", index, simulatedBlockNodes.size() - 1); + } + } + /** * Reset all configured responses on all simulated block nodes to default behavior. */ diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java index cb2f2fa50b6b..c7d68cfb06b8 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/hedera/simulator/SimulatedBlockNodeServer.java @@ -33,6 +33,7 @@ import org.hiero.block.api.BlockStreamPublishServiceInterface; import org.hiero.block.api.PublishStreamRequest; import org.hiero.block.api.PublishStreamResponse; +import org.hiero.block.api.PublishStreamResponse.BehindPublisher; import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement; import org.hiero.block.api.PublishStreamResponse.EndOfStream; import org.hiero.block.api.PublishStreamResponse.ResendBlock; @@ -231,6 +232,17 @@ public void sendResendBlockImmediately(final long blockNumber) { log.info("Sent immediate ResendBlock response for block {} on port {}", blockNumber, port); } + /** + * Send a NodeBehindPublisher response immediately to all active streams. + * This indicates that the block node is behind the publisher and needs to catch up. + * + * @param blockNumber the last verified block number to include in the response + */ + public void sendNodeBehindPublisherImmediately(final long blockNumber) { + serviceImpl.sendNodeBehindPublisherToAllStreams(blockNumber); + log.info("Sent immediate NodeBehindPublisher response for block {} on port {}", blockNumber, port); + } + /** * Gets the last verified block number. * @@ -370,7 +382,7 @@ public void onNext(final PublishStreamRequest request) { final long lastVerifiedBlockNum = lastVerifiedBlockNumber.get(); if (blockNumber - lastVerifiedBlockNum > 1) { - handleBehindResponse(replies, blockNumber, lastVerifiedBlockNum); + handleBehindPublisherResponse(replies, blockNumber, lastVerifiedBlockNum); return; } @@ -624,6 +636,37 @@ public void sendResendBlockToAllStreams(final long blockNumber) { } } + /** + * Sends a NodeBehindPublisher response to all active streams. + * This indicates that the block node is behind the publisher and needs to catch up. + * + * @param blockNumber the last verified block number + */ + public void sendNodeBehindPublisherToAllStreams(final long blockNumber) { + log.info( + "Sending NodeBehindPublisher for block {} to {} active streams on port {}", + blockNumber, + activeStreams.size(), + port); + // Use lock for consistent locking strategy with other methods + blockTrackingLock.readLock().lock(); // Read lock is sufficient for iteration + try { + for (final Pipeline pipeline : activeStreams) { + try { + sendNodeBehindPublisher(pipeline, blockNumber); + } catch (final Exception e) { + log.error( + "Failed to send NodeBehindPublisher to stream {} on port {}", + pipeline.hashCode(), + port, + e); + } + } + } finally { + blockTrackingLock.readLock().unlock(); + } + } + // Helper methods for sending specific responses /** @@ -661,7 +704,7 @@ private void sendEndOfStream( * * @param pipeline the pipeline to send the response to, must not be null * @param blockNumber the block number to skip - * @throws NullPointerException if pipeline is null + * @throws NullPointerException if the pipeline is null */ private void sendSkipBlock( @NonNull final Pipeline pipeline, final long blockNumber) { @@ -679,7 +722,7 @@ private void sendSkipBlock( * * @param pipeline the pipeline to send the response to, must not be null * @param blockNumber the block number to resend - * @throws NullPointerException if pipeline is null + * @throws NullPointerException if the pipeline is null */ private void sendResendBlock( @NonNull final Pipeline pipeline, final long blockNumber) { @@ -693,38 +736,61 @@ private void sendResendBlock( } /** - * Handles sending a BEHIND response to a client when the block number is more than 1 ahead of the last verified block. + * Sends a NodeBehindPublisher response to a specific pipeline. + * + * @param pipeline the pipeline to send the response to, must not be null + * @param blockNumber the last verified block number + * @throws NullPointerException if the pipeline is null + */ + private void sendNodeBehindPublisher( + @NonNull final Pipeline pipeline, final long blockNumber) { + requireNonNull(pipeline, "pipeline cannot be null"); + final BehindPublisher behindPublisher = + BehindPublisher.newBuilder().blockNumber(blockNumber).build(); + final PublishStreamResponse response = PublishStreamResponse.newBuilder() + .nodeBehindPublisher(behindPublisher) + .build(); + pipeline.onNext(response); + log.debug( + "Sent NodeBehindPublisher for block {} to stream {} on port {}", + blockNumber, + pipeline.hashCode(), + port); + } + + /** + * Handles sending a BehindPublisher response to a client when the block number is more than 1 ahead of the last verified block. * This indicates that the client is ahead of the server and should restart streaming from an earlier block. * * @param pipeline The pipeline to send the response to, must not be null * @param blockNumber The block number that was requested * @param lastVerifiedBlockNum The last verified block number - * @throws NullPointerException if pipeline is null + * @throws NullPointerException if the pipeline is null */ - private void handleBehindResponse( + private void handleBehindPublisherResponse( @NonNull final Pipeline pipeline, final long blockNumber, final long lastVerifiedBlockNum) { requireNonNull(pipeline, "pipeline cannot be null"); - final EndOfStream eos = EndOfStream.newBuilder() + final BehindPublisher behindPublisher = BehindPublisher.newBuilder() .blockNumber(lastVerifiedBlockNum) - .status(EndOfStream.Code.BEHIND) .build(); - final PublishStreamResponse response = - PublishStreamResponse.newBuilder().endStream(eos).build(); + final PublishStreamResponse response = PublishStreamResponse.newBuilder() + .nodeBehindPublisher(behindPublisher) + .build(); try { pipeline.onNext(response); log.debug( - "Sent EndOfStream BEHIND for block {} to stream {} on port {}. Last verified: {}", + "Sent BehindPublisher for block {} to stream {} on port {}. Last verified: {}", blockNumber, pipeline.hashCode(), port, lastVerifiedBlockNum); } catch (final Exception e) { log.error( - "Failed to send EndOfStream BEHIND for block {} to stream {} on port {}. Removing stream.", + "Failed to send BehindPublisher for block {} to stream {} on port {}. Removing stream.", blockNumber, pipeline.hashCode(), port, @@ -739,7 +805,7 @@ private void handleBehindResponse( * Acquires the necessary write lock to ensure thread safety. * * @param pipeline The pipeline to remove. - * @throws NullPointerException if pipeline is null + * @throws NullPointerException if the pipeline is null */ private void removeStreamFromTracking(@NonNull final Pipeline pipeline) { requireNonNull(pipeline, "pipeline cannot be null"); @@ -756,7 +822,7 @@ private void removeStreamFromTracking(@NonNull final Pipeline pipeline) { requireNonNull(pipeline, "pipeline cannot be null"); @@ -894,7 +960,7 @@ private void handleStreamError(@NonNull final Pipeline pipeline) { diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/HgcaaLogValidator.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/HgcaaLogValidator.java index 08210b2c2c44..51546fc25989 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/HgcaaLogValidator.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/HgcaaLogValidator.java @@ -98,7 +98,7 @@ private static class ProblemTracker { List.of("HintsSubmissions", "Failed to submit", "(PLATFORM_NOT_ACTIVE)"), List.of("Ignoring invalid partial signature"), List.of("Action stack prematurely empty"), - List.of("Block node", "reported it is behind. Will restart stream at block"), + List.of("Block node", "reported it is behind. Will start streaming block"), List.of("BlockNodeConnectionManager", "Block stream worker interrupted"), List.of("BlockNodeConnectionManager", "No active connections available for streaming"), List.of("No block nodes available to connect to"), diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeOp.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeOp.java index e25b6bc88c0f..e510146d3b32 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeOp.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeOp.java @@ -97,6 +97,15 @@ private boolean submitSimulatorOp() { nodeIndex, verifiedBlock); break; + case SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY: + controller.sendNodeBehindPublisherImmediately(nodeIndex, blockNumber); + verifiedBlock = controller.getLastVerifiedBlockNumber(nodeIndex); + log.info( + "Sent immediate NodeBehindPublisher response for block {} on simulator {}, last verified block: {}", + blockNumber, + nodeIndex, + verifiedBlock); + break; case SET_END_OF_STREAM_RESPONSE: controller.setEndOfStreamResponse(nodeIndex, responseCode, blockNumber); verifiedBlock = controller.getLastVerifiedBlockNumber(nodeIndex); @@ -227,6 +236,8 @@ public enum BlockNodeAction { SEND_SKIP_BLOCK_IMMEDIATELY, /** Send {@link PublishStreamResponse.ResendBlock} response */ SEND_RESEND_BLOCK_IMMEDIATELY, + /** Send {@link PublishStreamResponse.BehindPublisher} response */ + SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY, /** Set {@link PublishStreamResponse.EndOfStream} response */ SET_END_OF_STREAM_RESPONSE, /** Reset all responses to default behavior */ @@ -273,6 +284,18 @@ public static SendResendBlockBuilder sendResendBlockImmediately(final long nodeI return new SendResendBlockBuilder(nodeIndex, blockNumber); } + /** + * Creates a builder for sending an immediate {@link PublishStreamResponse.BehindPublisher} response to a block node simulator. + * + * @param nodeIndex the index of the block node simulator (0-based) + * @param blockNumber the last verified block number + * @return a builder for the operation + */ + public static SendNodeBehindPublisherBuilder sendNodeBehindPublisherImmediately( + final long nodeIndex, final long blockNumber) { + return new SendNodeBehindPublisherBuilder(nodeIndex, blockNumber); + } + /** * Creates a builder for shutting down a specific block node immediately. * @@ -481,6 +504,42 @@ protected boolean submitOp(final HapiSpec spec) throws Throwable { } } + /** + * Builder for sending an immediate NodeBehindPublisher response to a block node simulator. + * This builder also implements UtilOp so it can be used directly in HapiSpec without calling build(). + */ + public static class SendNodeBehindPublisherBuilder extends UtilOp { + private final long nodeIndex; + private final long blockNumber; + + private SendNodeBehindPublisherBuilder(final long nodeIndex, final long blockNumber) { + this.nodeIndex = nodeIndex; + this.blockNumber = blockNumber; + } + + /** + * Builds the operation. + * + * @return the operation + */ + public BlockNodeOp build() { + return new BlockNodeOp( + nodeIndex, + BlockNodeAction.SEND_NODE_BEHIND_PUBLISHER_IMMEDIATELY, + null, + blockNumber, + null, + null, + true, + true); + } + + @Override + protected boolean submitOp(final HapiSpec spec) throws Throwable { + return build().submitOp(spec); + } + } + public static class ShutdownBuilder extends UtilOp { private final long nodeIndex; private boolean persistState = true; diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeVerbs.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeVerbs.java index d112946568f8..974f2b8a2f29 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeVerbs.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/BlockNodeVerbs.java @@ -79,6 +79,17 @@ public BlockNodeOp sendResendBlockImmediately(long blockNumber) { .build(); } + /** + * Sends an immediate NodeBehindPublisher response to the block node simulator. + * + * @param blockNumber the last verified block number + * @return the operation + */ + public BlockNodeOp sendNodeBehindPublisherImmediately(long blockNumber) { + return BlockNodeOp.sendNodeBehindPublisherImmediately(nodeIndex, blockNumber) + .build(); + } + /** * Shuts down the block node simulator immediately. * diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 117505b2771e..cd7cf70a6e6e 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -337,17 +337,17 @@ final Stream node0StreamingBlockNodeConnectionDropsCanStreamGenesis doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(10).toNanos())), doingContextual(spec -> time.set(Instant.now())), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(Long.MAX_VALUE), + blockNode(0).sendNodeBehindPublisherImmediately(Long.MAX_VALUE), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), time::get, Duration.ofSeconds(30), Duration.ofSeconds(45), String.format( - "/localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.", + "/localhost:%s/ACTIVE] Received BehindPublisher response for block 9223372036854775807.", portNumbers.getFirst()), String.format( - "/localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND).", + "/localhost:%s/ACTIVE] Block node reported it is behind. Will start streaming block 0.", portNumbers.getFirst()))), doingContextual( spec -> LockSupport.parkNanos(Duration.ofSeconds(10).toNanos()))); @@ -769,10 +769,11 @@ final Stream node0StreamingMultipleEndOfStreamsReceived() { portNumbers.add(spec.getBlockNodePortById(0)); portNumbers.add(spec.getBlockNodePortById(1)); }), - waitUntilNextBlocks(5).withBackgroundTraffic(true), + waitUntilNextBlocks(1).withBackgroundTraffic(true), doingContextual(spec -> time.set(Instant.now())), - blockNode(0).sendEndOfStreamImmediately(Code.TIMEOUT).withBlockNumber(9L), - blockNode(0).sendEndOfStreamImmediately(Code.TIMEOUT).withBlockNumber(10L), + blockNode(0).sendEndOfStreamImmediately(Code.TIMEOUT).withBlockNumber(1L), + waitUntilNextBlocks(1).withBackgroundTraffic(true), + blockNode(0).sendEndOfStreamImmediately(Code.TIMEOUT).withBlockNumber(2L), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), time::get, @@ -813,13 +814,13 @@ final Stream node0StreamingExponentialBackoff() { return hapiTest( waitUntilNextBlocks(1).withBackgroundTraffic(true), doingContextual(spec -> time.set(Instant.now())), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(1L), + blockNode(0).sendEndOfStreamImmediately(Code.TIMEOUT).withBlockNumber(1L), waitUntilNextBlocks(1).withBackgroundTraffic(true), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(2L), + blockNode(0).sendEndOfStreamImmediately(Code.DUPLICATE_BLOCK).withBlockNumber(2L), waitUntilNextBlocks(1).withBackgroundTraffic(true), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(3L), + blockNode(0).sendEndOfStreamImmediately(Code.BAD_BLOCK_PROOF).withBlockNumber(3L), waitUntilNextBlocks(1).withBackgroundTraffic(true), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(4L), + blockNode(0).sendEndOfStreamImmediately(Code.INVALID_REQUEST).withBlockNumber(4L), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), time::get, @@ -904,17 +905,17 @@ final Stream testCNReactionToPublishStreamResponses() { String.format( "/localhost:%s/ACTIVE] BlockAcknowledgement received for block", portNumbers.getFirst()))), - blockNode(0).sendEndOfStreamImmediately(Code.BEHIND).withBlockNumber(Long.MAX_VALUE), + blockNode(0).sendNodeBehindPublisherImmediately(Long.MAX_VALUE), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), time::get, Duration.ofSeconds(20), Duration.ofSeconds(20), String.format( - "/localhost:%s/ACTIVE] Received EndOfStream response (block=9223372036854775807, responseCode=BEHIND)", + "/localhost:%s/ACTIVE] Received BehindPublisher response for block 9223372036854775807.", portNumbers.getFirst()), String.format( - "/localhost:%s/ACTIVE] Block node reported it is behind. Will restart stream at block 0.", + "/localhost:%s/ACTIVE] Block node reported it is behind. Will start streaming block 0.", portNumbers.getFirst()))), waitUntilNextBlocks(1).withBackgroundTraffic(true), blockNode(0).sendSkipBlockImmediately(Long.MAX_VALUE), diff --git a/hiero-dependency-versions/build.gradle.kts b/hiero-dependency-versions/build.gradle.kts index 6062fda3324b..99e009f023c6 100644 --- a/hiero-dependency-versions/build.gradle.kts +++ b/hiero-dependency-versions/build.gradle.kts @@ -22,7 +22,7 @@ val log4j = "2.25.0" val mockito = "5.18.0" val pbj = pluginVersions.version("com.hedera.pbj.pbj-compiler") val protobuf = "4.31.1" -val blockNodeProtobufSources = "0.21.2" +val blockNodeProtobufSources = "0.25.0" val testContainers = "2.0.2" val tuweni = "2.4.2" val webcompare = "2.1.8" @@ -137,12 +137,12 @@ dependencies.constraints { // Versions of additional tools that are not part of the product or test module paths api("com.google.protobuf:protoc:${protobuf}") api("io.grpc:protoc-gen-grpc-java:${grpc}") - api("org.hiero.block:block-node-protobuf-sources:$blockNodeProtobufSources") { + api("org.hiero.block-node:protobuf-sources:$blockNodeProtobufSources") { because("External block node protobuf sources") } tasks.checkVersionConsistency { excludes.add("com.google.protobuf:protoc") excludes.add("io.grpc:protoc-gen-grpc-java") - excludes.add("org.hiero.block:block-node-protobuf-sources") + excludes.add("org.hiero.block-node:protobuf-sources") } }