Skip to content

Commit 6612050

Browse files
committed
first pass
Signed-off-by: Petar Tonev <[email protected]>
1 parent 37f05c7 commit 6612050

File tree

8 files changed

+208
-46
lines changed

8 files changed

+208
-46
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.hiero.block.api.PublishStreamRequest;
4949
import org.hiero.block.api.PublishStreamRequest.EndStream;
5050
import org.hiero.block.api.PublishStreamResponse;
51+
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
5152
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
5253
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
5354
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
@@ -631,26 +632,6 @@ private void handleEndOfStream(@NonNull final EndOfStream endOfStream) {
631632
logger.info("{} Block node orderly ended the stream at block {}.", this, blockNumber);
632633
closeAndReschedule(THIRTY_SECONDS, true);
633634
}
634-
case Code.BEHIND -> {
635-
// The block node is behind us, check if we have the last verified block still available in order to
636-
// restart the stream from there
637-
final long restartBlockNumber = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1;
638-
if (blockBufferService.getBlockState(restartBlockNumber) != null) {
639-
logger.info(
640-
"{} Block node reported it is behind. Will restart stream at block {}.",
641-
this,
642-
restartBlockNumber);
643-
644-
closeAndRestart(restartBlockNumber);
645-
} else {
646-
// If we don't have the block state, we schedule retry for this connection and establish new one
647-
// with different block node
648-
logger.info("{} Block node is behind and block state is not available. Ending the stream.", this);
649-
650-
// Indicate that the block node should recover and catch up from another trustworthy block node
651-
endStreamAndReschedule(TOO_FAR_BEHIND);
652-
}
653-
}
654635
case Code.UNKNOWN -> {
655636
// This should never happen, but if it does, schedule this connection for a retry attempt
656637
// and in the meantime select a new node to stream to
@@ -718,6 +699,35 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
718699
}
719700
}
720701

702+
/**
703+
* Handles the {@link BehindPublisher} response received from the block node.
704+
* If the consensus node has the requested block state available, it will start streaming it.
705+
* Otherwise, it will close the connection and retry with a different block node.
706+
*
707+
* @param nodeBehind the BehindPublisher response received from the block node
708+
*/
709+
private void handleBlockNodeBehind(@NonNull final BehindPublisher nodeBehind) {
710+
requireNonNull(nodeBehind, "nodeBehind must not be null");
711+
final long blockNumber = nodeBehind.blockNumber();
712+
713+
// The block node is behind us, check if we have the last verified block still available
714+
// in order to restart the stream from there
715+
final long restartBlockNumber = blockNumber == Long.MAX_VALUE ? 0 : blockNumber + 1;
716+
if (blockBufferService.getBlockState(restartBlockNumber) != null) {
717+
logger.info(
718+
"{} Block node reported it is behind. Will restart stream at block {}.", this, restartBlockNumber);
719+
720+
closeAndRestart(restartBlockNumber);
721+
} else {
722+
// If we don't have the block state, we schedule retry for this connection and establish new one
723+
// with different block node
724+
logger.info("{} Block node is behind and block state is not available. Ending the stream.", this);
725+
726+
// Indicate that the block node should recover and catch up from another trustworthy block node
727+
endStreamAndReschedule(TOO_FAR_BEHIND);
728+
}
729+
}
730+
721731
/**
722732
* Send an EndStream request to end the stream and close the connection.
723733
*
@@ -1010,6 +1020,11 @@ public void onNext(final @NonNull PublishStreamResponse response) {
10101020
blockStreamMetrics.recordLatestBlockResendBlock(
10111021
response.resendBlock().blockNumber());
10121022
handleResendBlock(response.resendBlock());
1023+
} else if (response.hasNodeBehindPublisher()) {
1024+
blockStreamMetrics.recordResponseReceived(response.response().kind());
1025+
blockStreamMetrics.recordLatestBlockNodeBehindPublisher(
1026+
response.nodeBehindPublisher().blockNumber());
1027+
handleBlockNodeBehind(response.nodeBehindPublisher());
10131028
} else {
10141029
blockStreamMetrics.recordUnknownResponseReceived();
10151030
logger.debug("{} Unexpected response received: {}.", this, response);

hedera-node/hedera-app/src/main/java/com/hedera/node/app/metrics/BlockStreamMetrics.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class BlockStreamMetrics {
5858
private LongGauge connRecv_latestBlockEndOfStreamGauge;
5959
private LongGauge connRecv_latestBlockSkipBlockGauge;
6060
private LongGauge connRecv_latestBlockResendBlockGauge;
61+
private LongGauge connRecv_latestBlockNodeBehindPublisherGauge;
6162

6263
// connectivity metrics
6364
private Counter conn_onCompleteCounter;
@@ -496,6 +497,11 @@ private void registerConnectionRecvMetrics() {
496497
final LongGauge.Config latestBlockResendCfg = newLongGauge(GROUP_CONN_RECV, "latestBlockResendBlock")
497498
.withDescription("The latest block number received in a ResendBlock response");
498499
this.connRecv_latestBlockResendBlockGauge = metrics.getOrCreate(latestBlockResendCfg);
500+
501+
final LongGauge.Config latestBlockNodeBehindCfg = newLongGauge(
502+
GROUP_CONN_RECV, "latestBlockNodeBehindPublisher")
503+
.withDescription("The latest block number received in a NodeBehindPublisher response");
504+
this.connRecv_latestBlockNodeBehindPublisherGauge = metrics.getOrCreate(latestBlockNodeBehindCfg);
499505
}
500506

501507
/**
@@ -559,6 +565,14 @@ public void recordLatestBlockResendBlock(final long blockNumber) {
559565
connRecv_latestBlockResendBlockGauge.set(blockNumber);
560566
}
561567

568+
/**
569+
* Record the latest block number received in a NodeBehindPublisher response.
570+
* @param blockNumber the block number from the response
571+
*/
572+
public void recordLatestBlockNodeBehindPublisher(final long blockNumber) {
573+
connRecv_latestBlockNodeBehindPublisherGauge.set(blockNumber);
574+
}
575+
562576
// Connection SEND metrics -----------------------------------------------------------------------------------------
563577

564578
private void registerConnectionSendMetrics() {

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.hiero.block.api.PublishStreamRequest;
3333
import org.hiero.block.api.PublishStreamRequest.EndStream;
3434
import org.hiero.block.api.PublishStreamResponse;
35+
import org.hiero.block.api.PublishStreamResponse.BehindPublisher;
3536
import org.hiero.block.api.PublishStreamResponse.BlockAcknowledgement;
3637
import org.hiero.block.api.PublishStreamResponse.EndOfStream;
3738
import org.hiero.block.api.PublishStreamResponse.ResendBlock;
@@ -66,6 +67,15 @@ protected static PublishStreamResponse createEndOfStreamResponse(
6667
return PublishStreamResponse.newBuilder().endStream(eos).build();
6768
}
6869

70+
@NonNull
71+
protected static PublishStreamResponse createBlockNodeBehindResponse(final long lastVerifiedBlock) {
72+
final BehindPublisher nodeBehind =
73+
BehindPublisher.newBuilder().blockNumber(lastVerifiedBlock).build();
74+
return PublishStreamResponse.newBuilder()
75+
.nodeBehindPublisher(nodeBehind)
76+
.build();
77+
}
78+
6979
@NonNull
7080
protected static PublishStreamResponse createBlockAckResponse(final long blockNumber) {
7181
final BlockAcknowledgement blockAck =

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -620,15 +620,14 @@ void testOnNext_endOfStream_blockNodeGracefulShutdown() {
620620
@Test
621621
void testOnNext_endOfStream_blockNodeBehind_blockExists() {
622622
openConnectionAndResetMocks();
623-
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L);
624-
when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L);
623+
final PublishStreamResponse response = createBlockNodeBehindResponse(10L);
625624
when(bufferService.getBlockState(11L)).thenReturn(new BlockState(11L));
626625
connection.updateConnectionState(ConnectionState.ACTIVE);
627626

628627
connection.onNext(response);
629628

630-
verify(metrics).recordLatestBlockEndOfStream(10L);
631-
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
629+
verify(metrics).recordLatestBlockNodeBehindPublisher(10L);
630+
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
632631
verify(metrics).recordConnectionClosed();
633632
verify(metrics).recordActiveConnectionIp(-1L);
634633
verify(requestPipeline).onComplete();
@@ -641,15 +640,15 @@ void testOnNext_endOfStream_blockNodeBehind_blockExists() {
641640
@Test
642641
void testOnNext_endOfStream_blockNodeBehind_blockDoesNotExist() {
643642
openConnectionAndResetMocks();
644-
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, 10L);
643+
final PublishStreamResponse response = createBlockNodeBehindResponse(10L);
645644
when(bufferService.getHighestAckedBlockNumber()).thenReturn(10L);
646645
when(bufferService.getBlockState(11L)).thenReturn(null);
647646

648647
connection.updateConnectionState(ConnectionState.ACTIVE);
649648
connection.onNext(response);
650649

651-
verify(metrics).recordLatestBlockEndOfStream(10L);
652-
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
650+
verify(metrics).recordLatestBlockNodeBehindPublisher(10L);
651+
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
653652
verify(metrics).recordConnectionClosed();
654653
verify(metrics).recordActiveConnectionIp(-1L);
655654
verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND);
@@ -1608,14 +1607,14 @@ void testOnNext_endOfStream_clientFailures_maxValueBlockNumber(final EndOfStream
16081607
@Test
16091608
void testOnNext_endOfStream_blockNodeBehind_maxValueBlockNumber() {
16101609
openConnectionAndResetMocks();
1611-
final PublishStreamResponse response = createEndOfStreamResponse(Code.BEHIND, Long.MAX_VALUE);
1610+
final PublishStreamResponse response = createBlockNodeBehindResponse(Long.MAX_VALUE);
16121611
when(bufferService.getBlockState(0L)).thenReturn(new BlockState(0L));
16131612
connection.updateConnectionState(ConnectionState.ACTIVE);
16141613

16151614
connection.onNext(response);
16161615

1617-
verify(metrics).recordLatestBlockEndOfStream(Long.MAX_VALUE);
1618-
verify(metrics).recordResponseEndOfStreamReceived(Code.BEHIND);
1616+
verify(metrics).recordLatestBlockNodeBehindPublisher(Long.MAX_VALUE);
1617+
verify(metrics).recordResponseReceived(ResponseOneOfType.NODE_BEHIND_PUBLISHER);
16191618
verify(requestPipeline).onComplete();
16201619
verify(connectionManager)
16211620
.rescheduleConnection(connection, null, 0L, false); // Should restart at 0 for MAX_VALUE

hedera-node/infrastructure/grafana/dashboards/production/hedera-node/block-stream.json

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7215,6 +7215,120 @@
72157215
],
72167216
"type": "table"
72177217
},
7218+
{
7219+
"datasource": {
7220+
"type": "prometheus",
7221+
"uid": "${datasource}"
7222+
},
7223+
"fieldConfig": {
7224+
"defaults": {
7225+
"color": {
7226+
"fixedColor": "blue",
7227+
"mode": "fixed"
7228+
},
7229+
"custom": {
7230+
"align": "auto",
7231+
"cellOptions": {
7232+
"type": "auto"
7233+
},
7234+
"footer": {
7235+
"reducers": []
7236+
},
7237+
"inspect": false,
7238+
"wrapText": false
7239+
},
7240+
"fieldMinMax": false,
7241+
"mappings": [],
7242+
"noValue": "0",
7243+
"thresholds": {
7244+
"mode": "percentage",
7245+
"steps": [
7246+
{
7247+
"color": "green",
7248+
"value": 0
7249+
}
7250+
]
7251+
}
7252+
},
7253+
"overrides": []
7254+
},
7255+
"gridPos": {
7256+
"h": 9,
7257+
"w": 6,
7258+
"x": 12,
7259+
"y": 174
7260+
},
7261+
"id": 61,
7262+
"options": {
7263+
"cellHeight": "sm",
7264+
"frameIndex": 0,
7265+
"showHeader": true
7266+
},
7267+
"pluginVersion": "12.2.0",
7268+
"targets": [
7269+
{
7270+
"datasource": {
7271+
"type": "prometheus",
7272+
"uid": "${datasource}"
7273+
},
7274+
"disableTextWrap": false,
7275+
"editorMode": "builder",
7276+
"exemplar": false,
7277+
"expr": "max by(node) (blockStream_connRecv_latestBlockNodeBehindPublisher{node=~\"$NodeId\", environment=\"$environment\"})",
7278+
"fullMetaSearch": false,
7279+
"includeNullMetadata": true,
7280+
"instant": false,
7281+
"legendFormat": "{{node}}",
7282+
"range": true,
7283+
"refId": "A",
7284+
"useBackend": false
7285+
}
7286+
],
7287+
"title": "Latest Block received in NodeBehindPublisher",
7288+
"transformations": [
7289+
{
7290+
"id": "joinByLabels",
7291+
"options": {
7292+
"value": "node"
7293+
}
7294+
},
7295+
{
7296+
"id": "reduce",
7297+
"options": {
7298+
"includeTimeField": false,
7299+
"labelsToFields": false,
7300+
"mode": "seriesToRows",
7301+
"reducers": [
7302+
"lastNotNull"
7303+
]
7304+
}
7305+
},
7306+
{
7307+
"id": "organize",
7308+
"options": {
7309+
"excludeByName": {},
7310+
"includeByName": {},
7311+
"indexByName": {},
7312+
"renameByName": {
7313+
"Field": "Node",
7314+
"Last *": "Block"
7315+
}
7316+
}
7317+
},
7318+
{
7319+
"id": "sortBy",
7320+
"options": {
7321+
"fields": {},
7322+
"sort": [
7323+
{
7324+
"field": "Node"
7325+
}
7326+
]
7327+
}
7328+
}
7329+
],
7330+
"type": "table"
7331+
},
72187332
{
72197333
"collapsed": false,
72207334
"gridPos": {

0 commit comments

Comments
 (0)