Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.hiero.block.api.PublishStreamRequest;
import org.hiero.block.api.PublishStreamRequest.EndStream;
import org.hiero.block.api.PublishStreamResponse;
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
Expand Down Expand Up @@ -483,26 +484,6 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) {
logger.info("{} Block node orderly ended the stream at block {}.", this, blockNumber);
closeAndReschedule(THIRTY_SECONDS, true);
}
case Code.BEHIND -> {
// The block node is behind us, check if we have the last verified block still available in order to
// restart the stream from there
final long restartBlockNumber = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1;
if (blockBufferService.getBlockState(restartBlockNumber) != null) {
logger.info(
"{} Block node reported it is behind. Will restart stream at block {}.",
this,
restartBlockNumber);

closeAndRestart(restartBlockNumber);
} else {
// If we don't have the block state, we schedule retry for this connection and establish new one
// with different block node
logger.info("{} Block node is behind and block state is not available. Ending the stream.", this);

// Indicate that the block node should recover and catch up from another trustworthy block node
endStreamAndReschedule(TOO_FAR_BEHIND);
}
}
case Code.UNKNOWN -> {
// This should never happen, but if it does, schedule this connection for a retry attempt
// and in the meantime select a new node to stream to
Expand Down Expand Up @@ -570,6 +551,40 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
}
}

/**
* Handles the {@link BehindPublisher} response received from the block node.
* If the consensus node has the requested block state available, it will start streaming it.
* Otherwise, it will close the connection and retry with a different block node.
*
* @param nodeBehind the BehindPublisher response received from the block node
*/
private void handleBlockNodeBehind(@NonNull final BehindPublisher nodeBehind) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BehindPublisher is not an EndStream condition and thus the connection should not be closed and started again with the new block. Additionally, there needs to be a condition to respond with an ERROR if the block requested is above the latest block produced by the CN. Basically the handling of BehindPublisher should be the exact same as handling a ResendBlock message - just logging/metrics changed.

Copy link
Contributor Author

@petreze petreze Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation is inspired from these changes in the protocol document in BN repo.
It says that the block node no longer closes the stream when behind, but
waits for the Publisher to either stream an earlier block or
send "too far behind". By sending too far behind, we are essentially closing the stream and rescheduling

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

       if (blockBufferService.getBlockState(restartBlockNumber) != null) {
            logger.info("{} Block node reported it is behind. Will restart stream at block {}.", this, restartBlockNumber);

            closeAndRestart(restartBlockNumber);
        } else { ... }

The if block above calls #closeAndRestart(long). That is the part I am talking about. The document was updated to indicate this:

      BlockNode -->> Publisher: Respond with BehindPublisher and specify the latest completed block (L)
      Publisher-->>BlockNode: Send from block L+1 or send EndStream(Too Far Behind) and retry with exponential backoff

Only if the block isn't available should the connection be closed, otherwise the connection should not be closed and we should just turn around and start sending the requested block, like we do for ResendBlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, yes. It is addressed now

requireNonNull(nodeBehind, "nodeBehind must not be null");
final long blockNumber = nodeBehind.blockNumber();
logger.debug("{} Received BehindPublisher response for block {}.", this, blockNumber);

final long restartBlockNumber = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1;
// The block node is behind us, check if we have the last verified block still available
// in order to restart the stream from there
if (blockBufferService.getBlockState(restartBlockNumber) != null) {
logger.info(
"{} Block node reported it is behind. Will restart stream at block {}.", this, restartBlockNumber);

streamingBlockNumber.set(restartBlockNumber);
} else {
// If we don't have the block state, we schedule retry for this connection
// and establish new one with different block node
logger.info("{} Block node is behind and block state is not available. Ending the stream.", this);

if (restartBlockNumber < blockBufferService.getEarliestAvailableBlockNumber()) {
// Indicate that the block node should catch up from another trustworthy block node
endStreamAndReschedule(TOO_FAR_BEHIND);
} else if (restartBlockNumber > blockBufferService.getLastBlockNumberProduced()) {
endStreamAndReschedule(ERROR);
}
}
}

/**
* Send an EndStream request to end the stream and close the connection.
*
Expand Down Expand Up @@ -857,6 +872,11 @@ public void onNext(final @NonNull PublishStreamResponse response) {
blockStreamMetrics.recordLatestBlockResendBlock(
response.resendBlock().blockNumber());
handleResendBlock(response.resendBlock());
} else if (response.hasNodeBehindPublisher()) {
blockStreamMetrics.recordResponseReceived(response.response().kind());
blockStreamMetrics.recordLatestBlockNodeBehindPublisher(
response.nodeBehindPublisher().blockNumber());
handleBlockNodeBehind(response.nodeBehindPublisher());
} else {
blockStreamMetrics.recordUnknownResponseReceived();
logger.debug("{} Unexpected response received: {}.", this, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class BlockStreamMetrics {
private LongGauge connRecv_latestBlockEndOfStreamGauge;
private LongGauge connRecv_latestBlockSkipBlockGauge;
private LongGauge connRecv_latestBlockResendBlockGauge;
private LongGauge connRecv_latestBlockNodeBehindPublisherGauge;

// connectivity metrics
private Counter conn_onCompleteCounter;
Expand Down Expand Up @@ -496,6 +497,11 @@ private void registerConnectionRecvMetrics() {
final LongGauge.Config latestBlockResendCfg = newLongGauge(GROUP_CONN_RECV, "latestBlockResendBlock")
.withDescription("The latest block number received in a ResendBlock response");
this.connRecv_latestBlockResendBlockGauge = metrics.getOrCreate(latestBlockResendCfg);

final LongGauge.Config latestBlockNodeBehindCfg = newLongGauge(
GROUP_CONN_RECV, "latestBlockNodeBehindPublisher")
.withDescription("The latest block number received in a NodeBehindPublisher response");
this.connRecv_latestBlockNodeBehindPublisherGauge = metrics.getOrCreate(latestBlockNodeBehindCfg);
}

/**
Expand Down Expand Up @@ -559,6 +565,14 @@ public void recordLatestBlockResendBlock(final long blockNumber) {
connRecv_latestBlockResendBlockGauge.set(blockNumber);
}

/**
* Record the latest block number received in a NodeBehindPublisher response.
* @param blockNumber the block number from the response
*/
public void recordLatestBlockNodeBehindPublisher(final long blockNumber) {
connRecv_latestBlockNodeBehindPublisherGauge.set(blockNumber);
}

// Connection SEND metrics -----------------------------------------------------------------------------------------

private void registerConnectionSendMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.hiero.block.api.PublishStreamRequest;
import org.hiero.block.api.PublishStreamRequest.EndStream;
import org.hiero.block.api.PublishStreamResponse;
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
import org.hiero.block.api.PublishStreamResponse.ResendBlock;
Expand Down Expand Up @@ -66,6 +67,15 @@ protected static PublishStreamResponse createEndOfStreamResponse(
return PublishStreamResponse.newBuilder().endStream(eos).build();
}

@NonNull
protected static PublishStreamResponse createBlockNodeBehindResponse(final long lastVerifiedBlock) {
final BehindPublisher nodeBehind =
BehindPublisher.newBuilder().blockNumber(lastVerifiedBlock).build();
return PublishStreamResponse.newBuilder()
.nodeBehindPublisher(nodeBehind)
.build();
}

@NonNull
protected static PublishStreamResponse createBlockAckResponse(final long blockNumber) {
final BlockAcknowledgement blockAck =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,49 +629,76 @@ void testOnNext_endOfStream_blockNodeGracefulShutdown() {
}

@Test
void testOnNext_endOfStream_blockNodeBehind_blockExists() {
void testOnNext_blockNodeBehind_blockExists() {
openConnectionAndResetMocks();
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L);
when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L);
final PublishStreamResponse response = createBlockNodeBehindResponse(10L);
when(bufferService.getBlockState(11L)).thenReturn(new BlockState(11L));
connection.updateConnectionState(ConnectionState.ACTIVE);

connection.onNext(response);

verify(metrics).recordLatestBlockEndOfStream(10L);
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
verify(metrics).recordLatestBlockNodeBehindPublisher(10L);
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
verify(bufferService).getBlockState(11L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
}

@Test
void testOnNext_blockNodeBehind_blockDoesNotExist_TooFarBehind() {
openConnectionAndResetMocks();
final PublishStreamResponse response = createBlockNodeBehindResponse(10L);
when(bufferService.getBlockState(11L)).thenReturn(null);
when(bufferService.getEarliestAvailableBlockNumber()).thenReturn(12L);

connection.updateConnectionState(ConnectionState.ACTIVE);
connection.onNext(response);

verify(metrics).recordLatestBlockNodeBehindPublisher(10L);
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verify(requestPipeline).onComplete();
verify(connectionManager).rescheduleConnection(connection, null, 11L, false);
verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND);
verify(metrics).recordRequestLatency(anyLong());
verify(bufferService, atLeastOnce()).getEarliestAvailableBlockNumber();
verify(bufferService, atLeastOnce()).getHighestAckedBlockNumber();
verify(bufferService).getBlockState(11L);
verify(requestPipeline)
.onNext(PublishStreamRequest.newBuilder()
.endStream(EndStream.newBuilder()
.endCode(EndStream.Code.TOO_FAR_BEHIND)
.earliestBlockNumber(12L)
.build())
.build());
verify(requestPipeline).onComplete();
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
}

@Test
void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist() {
void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist_Error() {
openConnectionAndResetMocks();
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L);
final PublishStreamResponse response = createBlockNodeBehindResponse(10L);
when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L);
when(bufferService.getBlockState(11L)).thenReturn(null);

connection.updateConnectionState(ConnectionState.ACTIVE);
connection.onNext(response);

verify(metrics).recordLatestBlockEndOfStream(10L);
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
verify(metrics).recordLatestBlockNodeBehindPublisher(10L);
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND);
verify(metrics).recordRequestEndStreamSent(EndStream.Code.ERROR);
verify(metrics).recordRequestLatency(anyLong());
verify(bufferService, atLeastOnce()).getEarliestAvailableBlockNumber();
verify(bufferService, atLeastOnce()).getHighestAckedBlockNumber();
verify(bufferService).getBlockState(11L);
verify(requestPipeline)
.onNext(PublishStreamRequest.newBuilder()
.endStream(EndStream.newBuilder()
.endCode(EndStream.Code.TOO_FAR_BEHIND)
.endCode(EndStream.Code.ERROR)
.latestBlockNumber(10L)
.build())
.build());
Expand Down Expand Up @@ -1615,26 +1642,23 @@ void testOnNext_endOfStream_clientFailures_maxValueBlockNumber(final EndOfStream
assertThat(connection.currentState()).isEqualTo(ConnectionState.CLOSED);
}

// Tests EndOfStream BEHIND code with Long.MAX_VALUE edge case (should restart at block 0)
// Tests BehindPublisher code with Long.MAX_VALUE edge case (should restart at block 0)
@Test
void testOnNext_endOfStream_blockNodeBehind_maxValueBlockNumber() {
void testOnNext_blockNodeBehind_maxValueBlockNumber() {
openConnectionAndResetMocks();
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, Long.MAX_VALUE);
final PublishStreamResponse response = createBlockNodeBehindResponse(Long.MAX_VALUE);
when(bufferService.getBlockState(0L)).thenReturn(new BlockState(0L));
connection.updateConnectionState(ConnectionState.ACTIVE);

connection.onNext(response);

verify(metrics).recordLatestBlockEndOfStream(Long.MAX_VALUE);
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
verify(requestPipeline).onComplete();
verify(connectionManager)
.rescheduleConnection(connection, null, 0L, false); // Should restart at 0 for MAX_VALUE
verify(metrics).recordLatestBlockNodeBehindPublisher(Long.MAX_VALUE);
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
verify(bufferService).getBlockState(0L);
verify(metrics).recordConnectionClosed();
verify(metrics).recordActiveConnectionIp(-1L);
verifyNoMoreInteractions(metrics);
verifyNoMoreInteractions(requestPipeline);
verifyNoMoreInteractions(connectionManager);
verifyNoMoreInteractions(bufferService);
}

// Tests stream failure handling without calling onComplete on the pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7224,6 +7224,120 @@
],
"type": "table"
},
{
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"fieldConfig": {
"defaults": {
"color": {
"fixedColor": "blue",
"mode": "fixed"
},
"custom": {
"align": "auto",
"cellOptions": {
"type": "auto"
},
"footer": {
"reducers": []
},
"inspect": false,
"wrapText": false
},
"fieldMinMax": false,
"mappings": [],
"noValue": "0",
"thresholds": {
"mode": "percentage",
"steps": [
{
"color": "green",
"value": 0
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 6,
"x": 12,
"y": 174
},
"id": 61,
"options": {
"cellHeight": "sm",
"frameIndex": 0,
"showHeader": true
},
"pluginVersion": "12.2.0",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${datasource}"
},
"disableTextWrap": false,
"editorMode": "builder",
"exemplar": false,
"expr": "max by(node) (blockStream_connRecv_latestBlockNodeBehindPublisher{node=~\"$NodeId\", environment=\"$environment\"})",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "{{node}}",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Latest Block received in NodeBehindPublisher",
"transformations": [
{
"id": "joinByLabels",
"options": {
"value": "node"
}
},
{
"id": "reduce",
"options": {
"includeTimeField": false,
"labelsToFields": false,
"mode": "seriesToRows",
"reducers": [
"lastNotNull"
]
}
},
{
"id": "organize",
"options": {
"excludeByName": {},
"includeByName": {},
"indexByName": {},
"renameByName": {
"Field": "Node",
"Last *": "Block"
}
}
},
{
"id": "sortBy",
"options": {
"fields": {},
"sort": [
{
"field": "Node"
}
]
}
}
],
"type": "table"
},
{
"collapsed": false,
"gridPos": {
Expand Down
Loading
Loading