Skip to content

Commit ce0e776

Browse files
committed
Implement PreparedStmt on the Server side (#16764) (#16880)
(cherry picked from commit 7436c88)
1 parent 00807e7 commit ce0e776

File tree

28 files changed

+1747
-52
lines changed

28 files changed

+1747
-52
lines changed

integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ private void updateConfig(Statement statement, int timeout) throws SQLException
7878
statement.setQueryTimeout(timeout);
7979
}
8080

81+
/**
82+
* Executes a SQL query on all read statements in parallel.
83+
*
84+
* <p>Note: For PreparedStatement EXECUTE queries, use the write connection directly instead,
85+
* because PreparedStatements are session-scoped and this method may route queries to different
86+
* nodes where the PreparedStatement doesn't exist.
87+
*/
8188
@Override
8289
public ResultSet executeQuery(String sql) throws SQLException {
8390
return new ClusterTestResultSet(readStatements, readEndpoints, sql, queryTimeout);

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBPreparedStatementIT.java

Lines changed: 385 additions & 0 deletions
Large diffs are not rendered by default.

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/ClientSession.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ public class ClientSession extends IClientSession {
3333

3434
private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();
3535

36+
// Map from statement name to PreparedStatementInfo
37+
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();
38+
3639
public ClientSession(Socket clientSocket) {
3740
this.clientSocket = clientSocket;
3841
}
@@ -103,4 +106,24 @@ public static void removeQueryId(
103106
}
104107
}
105108
}
109+
110+
@Override
111+
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
112+
preparedStatements.put(statementName, info);
113+
}
114+
115+
@Override
116+
public PreparedStatementInfo removePreparedStatement(String statementName) {
117+
return preparedStatements.remove(statementName);
118+
}
119+
120+
@Override
121+
public PreparedStatementInfo getPreparedStatement(String statementName) {
122+
return preparedStatements.get(statementName);
123+
}
124+
125+
@Override
126+
public Set<String> getPreparedStatementNames() {
127+
return preparedStatements.keySet();
128+
}
106129
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,37 @@ public void setDatabaseName(@Nullable String databaseName) {
188188
this.databaseName = databaseName;
189189
}
190190

191+
/**
192+
* Add a prepared statement to this session.
193+
*
194+
* @param statementName the name of the prepared statement
195+
* @param info the prepared statement information
196+
*/
197+
public abstract void addPreparedStatement(String statementName, PreparedStatementInfo info);
198+
199+
/**
200+
* Remove a prepared statement from this session.
201+
*
202+
* @param statementName the name of the prepared statement
203+
* @return the removed prepared statement info, or null if not found
204+
*/
205+
public abstract PreparedStatementInfo removePreparedStatement(String statementName);
206+
207+
/**
208+
* Get a prepared statement from this session.
209+
*
210+
* @param statementName the name of the prepared statement
211+
* @return the prepared statement info, or null if not found
212+
*/
213+
public abstract PreparedStatementInfo getPreparedStatement(String statementName);
214+
215+
/**
216+
* Get all prepared statement names in this session.
217+
*
218+
* @return set of prepared statement names
219+
*/
220+
public abstract Set<String> getPreparedStatementNames();
221+
191222
public long getLastActiveTime() {
192223
return lastActiveTime;
193224
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/InternalClientSession.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,28 @@ public void addQueryId(Long statementId, long queryId) {
8888
public void removeQueryId(Long statementId, Long queryId) {
8989
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
9090
}
91+
92+
@Override
93+
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
94+
throw new UnsupportedOperationException(
95+
"InternalClientSession should never call PREPARE statement methods.");
96+
}
97+
98+
@Override
99+
public PreparedStatementInfo removePreparedStatement(String statementName) {
100+
throw new UnsupportedOperationException(
101+
"InternalClientSession should never call PREPARE statement methods.");
102+
}
103+
104+
@Override
105+
public PreparedStatementInfo getPreparedStatement(String statementName) {
106+
throw new UnsupportedOperationException(
107+
"InternalClientSession should never call PREPARE statement methods.");
108+
}
109+
110+
@Override
111+
public Set<String> getPreparedStatementNames() {
112+
throw new UnsupportedOperationException(
113+
"InternalClientSession should never call PREPARE statement methods.");
114+
}
91115
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/MqttClientSession.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,28 @@ public void addQueryId(Long statementId, long queryId) {
7676
public void removeQueryId(Long statementId, Long queryId) {
7777
throw new UnsupportedOperationException();
7878
}
79+
80+
@Override
81+
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
82+
throw new UnsupportedOperationException(
83+
"MQTT client session does not support PREPARE statement.");
84+
}
85+
86+
@Override
87+
public PreparedStatementInfo removePreparedStatement(String statementName) {
88+
throw new UnsupportedOperationException(
89+
"MQTT client session does not support PREPARE statement.");
90+
}
91+
92+
@Override
93+
public PreparedStatementInfo getPreparedStatement(String statementName) {
94+
throw new UnsupportedOperationException(
95+
"MQTT client session does not support PREPARE statement.");
96+
}
97+
98+
@Override
99+
public Set<String> getPreparedStatementNames() {
100+
throw new UnsupportedOperationException(
101+
"MQTT client session does not support PREPARE statement.");
102+
}
79103
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.protocol.session;
21+
22+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
23+
24+
import java.util.Objects;
25+
26+
import static java.util.Objects.requireNonNull;
27+
28+
/**
29+
* Information about a prepared statement stored in a session. The AST is cached here to avoid
30+
* reparsing on EXECUTE.
31+
*/
32+
public class PreparedStatementInfo {
33+
34+
private final String statementName;
35+
private final Statement sql; // Cached AST (contains Parameter nodes)
36+
private final long createTime;
37+
private final long memorySizeInBytes; // Memory size allocated for this PreparedStatement
38+
39+
public PreparedStatementInfo(String statementName, Statement sql, long memorySizeInBytes) {
40+
this.statementName = requireNonNull(statementName, "statementName is null");
41+
this.sql = requireNonNull(sql, "sql is null");
42+
this.createTime = System.currentTimeMillis();
43+
this.memorySizeInBytes = memorySizeInBytes;
44+
}
45+
46+
public PreparedStatementInfo(
47+
String statementName, Statement sql, long createTime, long memorySizeInBytes) {
48+
this.statementName = requireNonNull(statementName, "statementName is null");
49+
this.sql = requireNonNull(sql, "sql is null");
50+
this.createTime = createTime;
51+
this.memorySizeInBytes = memorySizeInBytes;
52+
}
53+
54+
public String getStatementName() {
55+
return statementName;
56+
}
57+
58+
public Statement getSql() {
59+
return sql;
60+
}
61+
62+
public long getCreateTime() {
63+
return createTime;
64+
}
65+
66+
public long getMemorySizeInBytes() {
67+
return memorySizeInBytes;
68+
}
69+
70+
@Override
71+
public boolean equals(Object o) {
72+
if (this == o) {
73+
return true;
74+
}
75+
if (o == null || getClass() != o.getClass()) {
76+
return false;
77+
}
78+
PreparedStatementInfo that = (PreparedStatementInfo) o;
79+
return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql);
80+
}
81+
82+
@Override
83+
public int hashCode() {
84+
return Objects.hash(statementName, sql);
85+
}
86+
87+
@Override
88+
public String toString() {
89+
return "PreparedStatementInfo{"
90+
+ "statementName='"
91+
+ statementName
92+
+ '\''
93+
+ ", sql="
94+
+ sql
95+
+ ", createTime="
96+
+ createTime
97+
+ '}';
98+
}
99+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,17 @@
2222
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
2323

2424
import java.util.Collections;
25+
import java.util.Map;
2526
import java.util.Set;
27+
import java.util.concurrent.ConcurrentHashMap;
2628

2729
public class RestClientSession extends IClientSession {
2830

2931
private final String clientID;
3032

33+
// Map from statement name to PreparedStatementInfo
34+
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();
35+
3136
public RestClientSession(String clientID) {
3237
this.clientID = clientID;
3338
}
@@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) {
7681
public void removeQueryId(Long statementId, Long queryId) {
7782
throw new UnsupportedOperationException();
7883
}
84+
85+
@Override
86+
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
87+
preparedStatements.put(statementName, info);
88+
}
89+
90+
@Override
91+
public PreparedStatementInfo removePreparedStatement(String statementName) {
92+
return preparedStatements.remove(statementName);
93+
}
94+
95+
@Override
96+
public PreparedStatementInfo getPreparedStatement(String statementName) {
97+
return preparedStatements.get(statementName);
98+
}
99+
100+
@Override
101+
public Set<String> getPreparedStatementNames() {
102+
return preparedStatements.keySet();
103+
}
79104
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.iotdb.db.protocol.thrift.OperationType;
4141
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
4242
import org.apache.iotdb.db.queryengine.common.SessionInfo;
43+
import org.apache.iotdb.db.queryengine.plan.execution.config.session.PreparedStatementMemoryManager;
4344
import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
4445
import org.apache.iotdb.db.utils.DataNodeAuthUtils;
4546
import org.apache.iotdb.metrics.utils.MetricLevel;
@@ -276,6 +277,7 @@ public boolean closeSession(IClientSession session, LongConsumer releaseByQueryI
276277
}
277278

278279
private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) {
280+
// Release query resources
279281
Iterable<Long> statementIds = session.getStatementIds();
280282
if (statementIds != null) {
281283
for (Long statementId : statementIds) {
@@ -287,6 +289,17 @@ private void releaseSessionResource(IClientSession session, LongConsumer release
287289
}
288290
}
289291
}
292+
293+
// Release PreparedStatement memory resources
294+
try {
295+
PreparedStatementMemoryManager.getInstance().releaseAllForSession(session);
296+
} catch (Exception e) {
297+
LOGGER.warn(
298+
"Failed to release PreparedStatement resources for session {}: {}",
299+
session,
300+
e.getMessage(),
301+
e);
302+
}
290303
}
291304

292305
public TSStatus closeOperation(
@@ -295,6 +308,7 @@ public TSStatus closeOperation(
295308
long statementId,
296309
boolean haveStatementId,
297310
boolean haveSetQueryId,
311+
String preparedStatementName,
298312
LongConsumer releaseByQueryId) {
299313
if (!checkLogin(session)) {
300314
return RpcUtils.getStatus(
@@ -307,7 +321,7 @@ public TSStatus closeOperation(
307321
if (haveSetQueryId) {
308322
this.closeDataset(session, statementId, queryId, releaseByQueryId);
309323
} else {
310-
this.closeStatement(session, statementId, releaseByQueryId);
324+
this.closeStatement(session, statementId, preparedStatementName, releaseByQueryId);
311325
}
312326
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
313327
} else {
@@ -342,14 +356,35 @@ public long requestStatementId(IClientSession session) {
342356
}
343357

344358
public void closeStatement(
345-
IClientSession session, long statementId, LongConsumer releaseByQueryId) {
359+
IClientSession session,
360+
long statementId,
361+
String preparedStatementName,
362+
LongConsumer releaseByQueryId) {
346363
Set<Long> queryIdSet = session.removeStatementId(statementId);
347364
if (queryIdSet != null) {
348365
for (Long queryId : queryIdSet) {
349366
releaseByQueryId.accept(queryId);
350367
}
351368
}
352-
session.removeStatementId(statementId);
369+
370+
// If preparedStatementName is provided, release the prepared statement resources
371+
if (preparedStatementName != null && !preparedStatementName.isEmpty()) {
372+
try {
373+
PreparedStatementInfo removedInfo = session.removePreparedStatement(preparedStatementName);
374+
if (removedInfo != null) {
375+
// Release the memory allocated for this PreparedStatement
376+
PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemorySizeInBytes());
377+
}
378+
} catch (Exception e) {
379+
LOGGER.warn(
380+
"Failed to release PreparedStatement '{}' resources when closing statement {} for session {}: {}",
381+
preparedStatementName,
382+
statementId,
383+
session,
384+
e.getMessage(),
385+
e);
386+
}
387+
}
353388
}
354389

355390
public long requestQueryId(IClientSession session, Long statementId) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/BaseServerContextHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public ServerContext createContext(TProtocol in, TProtocol out) {
7171

7272
public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
7373
getSessionManager().removeCurrSession();
74+
7475
if (context != null && factory != null) {
7576
((JudgableServerContext) context).whenDisconnect();
7677
}

0 commit comments

Comments
 (0)