Skip to content

Commit e3127e6

Browse files
authored
Pipe: Ignore logging when returnSelf is called in the event of an exception in AsyncClient. (#16827)
1 parent bd813d8 commit e3127e6

File tree

4 files changed

+56
-13
lines changed

4 files changed

+56
-13
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTrackableHandler.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,15 @@ protected boolean tryTransfer(
9797
clearEventsReferenceCount();
9898
connector.eliminateHandler(this, true);
9999
client.setShouldReturnSelf(true);
100-
try {
101-
client.returnSelf();
102-
} catch (final IllegalStateException e) {
103-
LOGGER.info(
104-
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
105-
}
100+
client.returnSelf(
101+
(e) -> {
102+
if (e instanceof IllegalStateException) {
103+
LOGGER.info(
104+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
105+
return true;
106+
}
107+
return false;
108+
});
106109
this.client = null;
107110
return false;
108111
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,12 +431,15 @@ private void returnClientIfNecessary() {
431431
}
432432

433433
client.setShouldReturnSelf(true);
434-
try {
435-
client.returnSelf();
436-
} catch (final IllegalStateException e) {
437-
LOGGER.info(
438-
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
439-
}
434+
client.returnSelf(
435+
(e) -> {
436+
if (e instanceof IllegalStateException) {
437+
LOGGER.info(
438+
"Illegal state when return the client to object pool, maybe the pool is already cleared. Will ignore.");
439+
return true;
440+
}
441+
return false;
442+
});
440443
client = null;
441444
}
442445

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.slf4j.LoggerFactory;
3030

3131
import java.util.Optional;
32+
import java.util.function.Function;
3233

3334
public class ClientManager<K, V> implements IClientManager<K, V> {
3435

@@ -79,6 +80,30 @@ public void returnClient(K node, V client) {
7980
}
8081
}
8182

83+
/**
84+
* return a client V for node K to the {@link ClientManager}, and ignore some exception
85+
*
86+
* <p>Note: We do not define this interface in {@link IClientManager} to make you aware that the
87+
* return of a client is automatic whenever a particular client is used.
88+
*/
89+
public void returnClient(K node, V client, Function<Exception, Boolean> ignoreError) {
90+
if (node != null) {
91+
try {
92+
pool.returnObject(node, client);
93+
} catch (Exception e) {
94+
if (!Boolean.TRUE.equals(ignoreError.apply(e))) {
95+
LOGGER.warn("Return client {} for node {} to pool failed.", client, node, e);
96+
}
97+
}
98+
} else if (client instanceof ThriftClient) {
99+
((ThriftClient) client).invalidateAll();
100+
LOGGER.warn(
101+
"Return client {} to pool failed because the node is null. "
102+
+ "This may cause resource leak, please check your code.",
103+
client);
104+
}
105+
}
106+
82107
@Override
83108
public void clear(K node) {
84109
Optional.ofNullable(node)

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.io.IOException;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.function.Function;
4142

4243
public class AsyncPipeDataTransferServiceClient extends IClientRPCService.AsyncClient
4344
implements ThriftClient {
@@ -84,7 +85,8 @@ public void onComplete() {
8485
public void onError(final Exception e) {
8586
super.onError(e);
8687
ThriftClient.resolveException(e, this);
87-
returnSelf();
88+
returnSelf(
89+
(i) -> i instanceof IllegalStateException && "Client has an error!".equals(i.getMessage()));
8890
}
8991

9092
@Override
@@ -114,6 +116,16 @@ public void returnSelf() {
114116
}
115117
}
116118

119+
/**
120+
* return self, the method doesn't need to be called by the user and will be triggered after the
121+
* RPC is finished.
122+
*/
123+
public void returnSelf(Function<Exception, Boolean> ignoreError) {
124+
if (shouldReturnSelf.get()) {
125+
clientManager.returnClient(endpoint, this, ignoreError);
126+
}
127+
}
128+
117129
public void setShouldReturnSelf(final boolean shouldReturnSelf) {
118130
this.shouldReturnSelf.set(shouldReturnSelf);
119131
}

0 commit comments

Comments
 (0)