Skip to content

Commit a4d18d7

Browse files
committed
feat: add connection/client for BN service API
Signed-off-by: Tim Farber-Newman <[email protected]>
1 parent ae16ecd commit a4d18d7

File tree

7 files changed

+759
-89
lines changed

7 files changed

+759
-89
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/AbstractBlockNodeConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ enum ConnectionType {
2626
/**
2727
* Denotes a connection that intends to stream block data to a block node.
2828
*/
29-
BLOCK_STREAMING("B"), // 'B' for block
29+
BLOCK_STREAMING("STR"), // block STReaming
3030
/**
3131
* Denotes a connection that intends to query server information from a block node.
3232
*/
33-
SERVER_STATUS("S"); // 'S' for status
33+
SERVER_STATUS("SVC"); // block node SerViCe
3434

3535
private final String key;
3636

@@ -83,7 +83,7 @@ enum ConnectionType {
8383
requireNonNull(type, "type is required");
8484

8585
connectionId =
86-
String.format("%s:%05d", type.key, connIdCtrByType.get(type).incrementAndGet());
86+
String.format("%s.%06d", type.key, connIdCtrByType.get(type).incrementAndGet());
8787
stateRef = new AtomicReference<>(ConnectionState.UNINITIALIZED);
8888
}
8989

@@ -118,7 +118,7 @@ final boolean updateConnectionState(
118118

119119
if (!latestState.canTransitionTo(newState)) {
120120
logger.warn(
121-
"{} Attempted to downgrade state from {} to {}, but this is not allowed; ignoring update",
121+
"{} Attempted to downgrade state from {} to {}, but this is not allowed; this is not allowed",
122122
this,
123123
latestState,
124124
newState);

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeClientFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public BlockStreamPublishServiceClient createStreamingClient(
8585
* @param timeout the timeout to use
8686
* @return a new {@link BlockNodeServiceClient} instance
8787
*/
88-
public BlockNodeServiceClient createServiceClient(@NonNull final BlockNodeConfiguration config, @NonNull final Duration timeout) {
88+
public BlockNodeServiceClient createServiceClient(
89+
@NonNull final BlockNodeConfiguration config, @NonNull final Duration timeout) {
8990
final PbjGrpcClient client = buildPbjClient(config, timeout);
9091
return new BlockNodeServiceClient(client, new DefaultRequestOptions());
9192
}

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeServiceConnection.java

Lines changed: 146 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
// SPDX-License-Identifier: Apache-2.0
12
package com.hedera.node.app.blocks.impl.streaming;
23

34
import static java.util.Objects.requireNonNull;
@@ -6,47 +7,55 @@
67
import com.hedera.node.config.ConfigProvider;
78
import com.hedera.node.config.data.BlockNodeConnectionConfig;
89
import edu.umd.cs.findbugs.annotations.NonNull;
10+
import edu.umd.cs.findbugs.annotations.Nullable;
911
import java.time.Duration;
1012
import java.util.concurrent.Callable;
1113
import java.util.concurrent.ExecutionException;
1214
import java.util.concurrent.ExecutorService;
1315
import java.util.concurrent.Future;
1416
import java.util.concurrent.TimeUnit;
15-
import java.util.concurrent.TimeoutException;
17+
import java.util.concurrent.atomic.AtomicLong;
1618
import java.util.concurrent.atomic.AtomicReference;
1719
import org.apache.logging.log4j.LogManager;
1820
import org.apache.logging.log4j.Logger;
1921
import org.hiero.block.api.BlockNodeServiceInterface.BlockNodeServiceClient;
2022
import org.hiero.block.api.ServerStatusRequest;
2123
import org.hiero.block.api.ServerStatusResponse;
2224

25+
/**
26+
* Connection that can be used to query the Block Node Service API (e.g. to retrieve server status).
27+
*/
2328
public class BlockNodeServiceConnection extends AbstractBlockNodeConnection {
2429

2530
private static final Logger logger = LogManager.getLogger(BlockNodeServiceConnection.class);
2631

27-
private final AtomicReference<BlockNodeServiceClient> clientRef = new AtomicReference<>();
32+
private static final AtomicLong clientCtr = new AtomicLong(0);
33+
34+
record ServiceClientHolder(long clientId, BlockNodeServiceClient client) {}
35+
36+
private final AtomicReference<ServiceClientHolder> clientRef = new AtomicReference<>();
2837
private final ExecutorService executorService;
2938
private final BlockNodeClientFactory clientFactory;
3039
private final BlockNodeConnectionConfig bncConfig;
3140

32-
public BlockNodeServiceConnection(@NonNull final ConfigProvider configProvider,
41+
/**
42+
* Create a new instance.
43+
*
44+
* @param configProvider the configuration provider to use
45+
* @param nodeConfig the block node configuration to use for this connection
46+
* @param executorService the executor service to use for executing tasks
47+
* @param clientFactory the factory to use for creating clients to the block node
48+
*/
49+
public BlockNodeServiceConnection(
50+
@NonNull final ConfigProvider configProvider,
3351
@NonNull final BlockNodeConfiguration nodeConfig,
3452
@NonNull final ExecutorService executorService,
3553
@NonNull final BlockNodeClientFactory clientFactory) {
3654
super(ConnectionType.SERVER_STATUS, nodeConfig, configProvider);
3755
this.executorService = requireNonNull(executorService, "pipeline executor is required");
3856
this.clientFactory = requireNonNull(clientFactory, "client factory is required");
3957

40-
bncConfig = configProvider()
41-
.getConfiguration()
42-
.getConfigData(BlockNodeConnectionConfig.class);
43-
}
44-
45-
private Duration timeout() {
46-
return configProvider()
47-
.getConfiguration()
48-
.getConfigData(BlockNodeConnectionConfig.class)
49-
.grpcOverallTimeout();
58+
bncConfig = configProvider().getConfiguration().getConfigData(BlockNodeConnectionConfig.class);
5059
}
5160

5261
@Override
@@ -56,69 +65,124 @@ void initialize() {
5665
return;
5766
}
5867

59-
final Future<?> future = executorService.submit(new CreateClientTask());
68+
Future<?> future = null;
6069

6170
try {
71+
future = executorService.submit(new CreateClientTask());
6272
future.get(bncConfig.pipelineOperationTimeout().toMillis(), TimeUnit.MILLISECONDS);
63-
} catch (final TimeoutException e) {
64-
logger.warn("{} Client initialization timed out (timeout={})", this, bncConfig.pipelineOperationTimeout());
65-
future.cancel(true);
66-
throw new RuntimeException("Error initializing client", e);
67-
} catch (final InterruptedException e) {
68-
logger.warn("{} Client initialization interrupted", this);
69-
Thread.currentThread().interrupt();
70-
throw new RuntimeException("Error initializing client", e);
71-
} catch (final ExecutionException e) {
72-
logger.warn("{} Error initializing client", this, e.getCause());
73-
throw new RuntimeException("Error initializing client", e);
73+
} catch (final Exception e) {
74+
logger.warn("{} Error initializing connection", this, e);
75+
76+
if (future != null) {
77+
future.cancel(true);
78+
}
79+
80+
if (e instanceof InterruptedException) {
81+
Thread.currentThread().interrupt();
82+
}
83+
84+
final Throwable error;
85+
if (e instanceof final ExecutionException ee) {
86+
error = ee.getCause();
87+
} else {
88+
error = e;
89+
}
90+
91+
throw new RuntimeException("Error initializing connection", error);
7492
}
7593
}
7694

95+
/**
96+
* Task used to create a Block Node service client for this connection.
97+
*/
7798
class CreateClientTask implements Runnable {
7899
@Override
79100
public void run() {
80-
final BlockNodeServiceClient client = clientFactory.createServiceClient(configuration(), timeout());
81-
if (clientRef.compareAndSet(null, client)) {
82-
updateConnectionState(ConnectionState.UNINITIALIZED, ConnectionState.ACTIVE); // jump to active
83-
logger.debug("{} Client initialized successfully", BlockNodeServiceConnection.this);
101+
final Duration timeout = configProvider()
102+
.getConfiguration()
103+
.getConfigData(BlockNodeConnectionConfig.class)
104+
.grpcOverallTimeout();
105+
106+
final long clientId = clientCtr.incrementAndGet();
107+
logger.debug("{} Creating new client (clientId: {})", BlockNodeServiceConnection.this, clientId);
108+
final BlockNodeServiceClient client = clientFactory.createServiceClient(configuration(), timeout);
109+
if (clientRef.compareAndSet(null, new BlockNodeServiceConnection.ServiceClientHolder(clientId, client))) {
110+
// unlike the streaming connection, these connections don't really have an intermediate state between
111+
// UNINITIALIZED and ACTIVE, so just set the state to ACTIVE
112+
updateConnectionState(ConnectionState.UNINITIALIZED, ConnectionState.ACTIVE);
113+
logger.info(
114+
"{} Client initialized successfully (clientId: {})", BlockNodeServiceConnection.this, clientId);
84115
} else {
85-
close();
116+
logger.debug(
117+
"{} Another thread has created the client and applied it to this connection; ignoring this attempt",
118+
BlockNodeServiceConnection.this);
119+
closeSilently(new ServiceClientHolder(clientId, client));
86120
}
87121
}
88-
}
89122

90-
class CloseTask implements Runnable {
91-
private final BlockNodeServiceClient client;
123+
/**
124+
* Silently close specified client. This is used for cases where another thread has won initializing the client
125+
* to use for this connection, and thus we want to close the client created by the losing thread. Any errors
126+
* that result from closing the client will be suppressed.
127+
*
128+
* @param holder the client to close
129+
*/
130+
private void closeSilently(@NonNull final ServiceClientHolder holder) {
131+
logger.debug("{} Silently closing client (clientId: {})", BlockNodeServiceConnection.this, holder.clientId);
132+
final Future<?> future = executorService.submit(new CloseClientTask(holder));
133+
try {
134+
future.get(bncConfig.pipelineOperationTimeout().toMillis(), TimeUnit.MILLISECONDS);
135+
} catch (final Exception e) {
136+
logger.debug(
137+
"{} Attempted to close a client (clientId: {}), but it failed; ignoring failure",
138+
BlockNodeServiceConnection.this,
139+
holder.clientId,
140+
e);
141+
}
142+
}
143+
}
92144

93-
CloseTask(@NonNull final BlockNodeServiceClient client) {
94-
this.client = requireNonNull(client);
145+
/**
146+
* Task to close a specific client.
147+
*/
148+
class CloseClientTask implements Runnable {
149+
/**
150+
* The client to close
151+
*/
152+
private final ServiceClientHolder clientHolder;
153+
154+
CloseClientTask(@NonNull final ServiceClientHolder clientHolder) {
155+
this.clientHolder = requireNonNull(clientHolder);
95156
}
96157

97158
@Override
98159
public void run() {
99-
client.close();
160+
logger.debug("{} Closing client (clientId: {})", BlockNodeServiceConnection.this, clientHolder.clientId);
161+
clientHolder.client.close();
100162
}
101163
}
102164

103165
@Override
104166
void close() {
105-
final BlockNodeServiceClient client = clientRef.get();
167+
final ServiceClientHolder clientHolder = clientRef.get();
106168

107-
if (client == null || currentState().isTerminal()) {
169+
if (clientHolder == null || currentState().isTerminal()) {
108170
// either close has already been called or close was called while the connection wasn't initialized
109171
return;
110172
}
111173

112-
logger.info("{} Closing connection", this);
113-
updateConnectionState(ConnectionState.CLOSING);
114-
115-
if (!clientRef.compareAndSet(client, null)) {
174+
if (!clientRef.compareAndSet(clientHolder, null)) {
116175
logger.debug("{} Another thread has closed the connection", this);
176+
return;
117177
}
118178

119-
final Future<?> future = executorService.submit(new CloseTask(client));
179+
logger.info("{} Closing connection", this);
180+
updateConnectionState(ConnectionState.CLOSING);
181+
182+
Future<?> future = null;
120183

121184
try {
185+
future = executorService.submit(new CloseClientTask(clientHolder));
122186
future.get(bncConfig.pipelineOperationTimeout().toMillis(), TimeUnit.MILLISECONDS);
123187
} catch (final Exception e) {
124188
// the connection is being closed... don't propagate the exception
@@ -127,50 +191,71 @@ void close() {
127191
if (e instanceof InterruptedException) {
128192
Thread.currentThread().interrupt();
129193
}
130-
future.cancel(true);
194+
195+
if (future != null) {
196+
future.cancel(true);
197+
}
131198
} finally {
132199
// regardless of outcome, mark this connection as closed
133200
updateConnectionState(ConnectionState.CLOSED);
134201
}
135202
}
136203

137-
public BlockNodeStatus getBlockNodeStatus() {
138-
final BlockNodeServiceClient client = clientRef.get();
139-
140-
if (client == null || currentState() != ConnectionState.ACTIVE) {
204+
/**
205+
* Retrieves the server status of the Block Node associated with this connection. If there are any errors
206+
* experienced during this process, the node will be considered unreachable and as such an "unreachable" response
207+
* will be returned.
208+
*
209+
* @return null if this connection is not active, else the Block Node's status
210+
*/
211+
public @Nullable BlockNodeStatus getBlockNodeStatus() {
212+
final ServiceClientHolder clientHolder = clientRef.get();
213+
214+
if (clientHolder == null || currentState() != ConnectionState.ACTIVE) {
141215
logger.debug("{} Tried to retrieve block node status, but this connection is not active", this);
142216
return null;
143217
}
144218

145219
final long startMillis = System.currentTimeMillis();
146-
final Future<ServerStatusResponse> future = executorService.submit(new GetBlockNodeStatusTask(client));
220+
Future<ServerStatusResponse> future = null;
147221
final ServerStatusResponse response;
148222
final long durationMillis;
149223

150224
try {
225+
future = executorService.submit(new GetBlockNodeStatusTask(clientHolder.client));
151226
response = future.get(bncConfig.pipelineOperationTimeout().toMillis(), TimeUnit.MILLISECONDS);
152227
durationMillis = System.currentTimeMillis() - startMillis;
153-
} catch (final TimeoutException e) {
154-
logger.warn("{} Timed out trying to retrieve server status (timeout={})", this, bncConfig.pipelineOperationTimeout());
155-
future.cancel(true);
156-
return BlockNodeStatus.notReachable();
157-
} catch (final InterruptedException e) {
158-
logger.warn("{} Interrupted while retrieving server status", this);
159-
Thread.currentThread().interrupt();
160-
return BlockNodeStatus.notReachable();
161-
} catch (final ExecutionException e) {
162-
logger.warn("{} Error occurred while retrieving server status", this, e);
228+
} catch (final Exception e) {
229+
logger.warn("{} Error retrieving block node status", this, e);
230+
231+
if (future != null) {
232+
future.cancel(true);
233+
}
234+
235+
if (e instanceof InterruptedException) {
236+
Thread.currentThread().interrupt();
237+
}
238+
163239
return BlockNodeStatus.notReachable();
164240
}
165241

166-
logger.debug("{} Received the following block node server status: lastAvailableBlock={} (latency={}ms)",
167-
this, response.lastAvailableBlock(), durationMillis);
242+
logger.debug(
243+
"{} Received the following block node server status: lastAvailableBlock={} (latency: {}ms)",
244+
this,
245+
response.lastAvailableBlock(),
246+
durationMillis);
168247

169248
return BlockNodeStatus.reachable(durationMillis, response.lastAvailableBlock());
170249
}
171250

251+
/**
252+
* Task to get the server status.
253+
*/
172254
class GetBlockNodeStatusTask implements Callable<ServerStatusResponse> {
173255

256+
/**
257+
* The client to use
258+
*/
174259
private final BlockNodeServiceClient client;
175260

176261
private GetBlockNodeStatusTask(@NonNull final BlockNodeServiceClient client) {

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeStatus.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
1+
// SPDX-License-Identifier: Apache-2.0
12
package com.hedera.node.app.blocks.impl.streaming;
23

4+
/**
5+
* Result of a status check for a specific Block Node.
6+
*
7+
* @param wasReachable true if the Block Node was reachable, else false
8+
* @param latencyMillis the duration the status check took to complete, or -1 if unreachable
9+
* @param latestBlockAvailable the latest block available on the Block Node, or -1 if unreachable
10+
*/
311
public record BlockNodeStatus(boolean wasReachable, long latencyMillis, long latestBlockAvailable) {
412

13+
private static final BlockNodeStatus NOT_REACHABLE = new BlockNodeStatus(false, -1L, -1L);
14+
15+
/**
16+
* Convenience method for creating a status of unreachable.
17+
*
18+
* @return a status marked as unreachable
19+
*/
520
public static BlockNodeStatus notReachable() {
6-
return new BlockNodeStatus(false, -1L, -1L);
21+
return NOT_REACHABLE;
722
}
823

24+
/**
25+
* Convenience method for creating a status of reachable.
26+
*
27+
* @param latencyMillis the latency (in milliseconds) the status check took
28+
* @param latestBlockAvailable the latest block available
29+
* @return a status marked as reachable
30+
*/
931
public static BlockNodeStatus reachable(final long latencyMillis, final long latestBlockAvailable) {
1032
return new BlockNodeStatus(true, latencyMillis, latestBlockAvailable);
1133
}

0 commit comments

Comments
 (0)