Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ private void updateConfig(Statement statement, int timeout) throws SQLException
statement.setQueryTimeout(timeout);
}

/**
* Executes a SQL query on all read statements in parallel.
*
* <p>Note: For PreparedStatement EXECUTE queries, use the write connection directly instead,
* because PreparedStatements are session-scoped and this method may route queries to different
* nodes where the PreparedStatement doesn't exist.
*/
@Override
public ResultSet executeQuery(String sql) throws SQLException {
return new ClusterTestResultSet(readStatements, readEndpoints, sql, queryTimeout);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ public class ClientSession extends IClientSession {

private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>();

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public ClientSession(Socket clientSocket) {
this.clientSocket = clientSocket;
}
Expand Down Expand Up @@ -103,4 +106,24 @@ public static void removeQueryId(
}
}
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,37 @@ public void setDatabaseName(@Nullable String databaseName) {
this.databaseName = databaseName;
}

/**
* Add a prepared statement to this session.
*
* @param statementName the name of the prepared statement
* @param info the prepared statement information
*/
public abstract void addPreparedStatement(String statementName, PreparedStatementInfo info);

/**
* Remove a prepared statement from this session.
*
* @param statementName the name of the prepared statement
* @return the removed prepared statement info, or null if not found
*/
public abstract PreparedStatementInfo removePreparedStatement(String statementName);

/**
* Get a prepared statement from this session.
*
* @param statementName the name of the prepared statement
* @return the prepared statement info, or null if not found
*/
public abstract PreparedStatementInfo getPreparedStatement(String statementName);

/**
* Get all prepared statement names in this session.
*
* @return set of prepared statement names
*/
public abstract Set<String> getPreparedStatementNames();

public long getLastActiveTime() {
return lastActiveTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,28 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
throw new UnsupportedOperationException(
"InternalClientSession should never call PREPARE statement methods.");
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
throw new UnsupportedOperationException(
"InternalClientSession should never call PREPARE statement methods.");
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
throw new UnsupportedOperationException(
"InternalClientSession should never call PREPARE statement methods.");
}

@Override
public Set<String> getPreparedStatementNames() {
throw new UnsupportedOperationException(
"InternalClientSession should never call PREPARE statement methods.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,28 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
throw new UnsupportedOperationException();
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
throw new UnsupportedOperationException(
"MQTT client session does not support PREPARE statement.");
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
throw new UnsupportedOperationException(
"MQTT client session does not support PREPARE statement.");
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
throw new UnsupportedOperationException(
"MQTT client session does not support PREPARE statement.");
}

@Override
public Set<String> getPreparedStatementNames() {
throw new UnsupportedOperationException(
"MQTT client session does not support PREPARE statement.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.protocol.session;

import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

/**
* Information about a prepared statement stored in a session. The AST is cached here to avoid
* reparsing on EXECUTE.
*/
public class PreparedStatementInfo {

private final String statementName;
private final Statement sql; // Cached AST (contains Parameter nodes)
private final long createTime;
private final long memorySizeInBytes; // Memory size allocated for this PreparedStatement

public PreparedStatementInfo(String statementName, Statement sql, long memorySizeInBytes) {
this.statementName = requireNonNull(statementName, "statementName is null");
this.sql = requireNonNull(sql, "sql is null");
this.createTime = System.currentTimeMillis();
this.memorySizeInBytes = memorySizeInBytes;
}

public PreparedStatementInfo(
String statementName, Statement sql, long createTime, long memorySizeInBytes) {
this.statementName = requireNonNull(statementName, "statementName is null");
this.sql = requireNonNull(sql, "sql is null");
this.createTime = createTime;
this.memorySizeInBytes = memorySizeInBytes;
}

public String getStatementName() {
return statementName;
}

public Statement getSql() {
return sql;
}

public long getCreateTime() {
return createTime;
}

public long getMemorySizeInBytes() {
return memorySizeInBytes;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PreparedStatementInfo that = (PreparedStatementInfo) o;
return Objects.equals(statementName, that.statementName) && Objects.equals(sql, that.sql);
}

@Override
public int hashCode() {
return Objects.hash(statementName, sql);
}

@Override
public String toString() {
return "PreparedStatementInfo{"
+ "statementName='"
+ statementName
+ '\''
+ ", sql="
+ sql
+ ", createTime="
+ createTime
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

public class RestClientSession extends IClientSession {

private final String clientID;

// Map from statement name to PreparedStatementInfo
private final Map<String, PreparedStatementInfo> preparedStatements = new ConcurrentHashMap<>();

public RestClientSession(String clientID) {
this.clientID = clientID;
}
Expand Down Expand Up @@ -76,4 +81,24 @@ public void addQueryId(Long statementId, long queryId) {
public void removeQueryId(Long statementId, Long queryId) {
throw new UnsupportedOperationException();
}

@Override
public void addPreparedStatement(String statementName, PreparedStatementInfo info) {
preparedStatements.put(statementName, info);
}

@Override
public PreparedStatementInfo removePreparedStatement(String statementName) {
return preparedStatements.remove(statementName);
}

@Override
public PreparedStatementInfo getPreparedStatement(String statementName) {
return preparedStatements.get(statementName);
}

@Override
public Set<String> getPreparedStatementNames() {
return preparedStatements.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.protocol.thrift.OperationType;
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.execution.config.session.PreparedStatementMemoryManager;
import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
import org.apache.iotdb.db.utils.DataNodeAuthUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
Expand Down Expand Up @@ -276,6 +277,7 @@
}

private void releaseSessionResource(IClientSession session, LongConsumer releaseQueryResource) {
// Release query resources
Iterable<Long> statementIds = session.getStatementIds();
if (statementIds != null) {
for (Long statementId : statementIds) {
Expand All @@ -287,6 +289,17 @@
}
}
}

// Release PreparedStatement memory resources
try {
PreparedStatementMemoryManager.getInstance().releaseAllForSession(session);
} catch (Exception e) {
LOGGER.warn(
"Failed to release PreparedStatement resources for session {}: {}",
session,
e.getMessage(),
e);
}
}

public TSStatus closeOperation(
Expand All @@ -295,6 +308,7 @@
long statementId,
boolean haveStatementId,
boolean haveSetQueryId,
String preparedStatementName,
LongConsumer releaseByQueryId) {
if (!checkLogin(session)) {
return RpcUtils.getStatus(
Expand All @@ -307,7 +321,7 @@
if (haveSetQueryId) {
this.closeDataset(session, statementId, queryId, releaseByQueryId);
} else {
this.closeStatement(session, statementId, releaseByQueryId);
this.closeStatement(session, statementId, preparedStatementName, releaseByQueryId);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
} else {
Expand Down Expand Up @@ -342,14 +356,35 @@
}

public void closeStatement(
IClientSession session, long statementId, LongConsumer releaseByQueryId) {
IClientSession session,
long statementId,
String preparedStatementName,
LongConsumer releaseByQueryId) {
Set<Long> queryIdSet = session.removeStatementId(statementId);
if (queryIdSet != null) {
for (Long queryId : queryIdSet) {
releaseByQueryId.accept(queryId);
}
}
session.removeStatementId(statementId);

// If preparedStatementName is provided, release the prepared statement resources
if (preparedStatementName != null && !preparedStatementName.isEmpty()) {
try {
PreparedStatementInfo removedInfo = session.removePreparedStatement(preparedStatementName);
if (removedInfo != null) {
// Release the memory allocated for this PreparedStatement
PreparedStatementMemoryManager.getInstance().release(removedInfo.getMemorySizeInBytes());
}
} catch (Exception e) {
LOGGER.warn(
"Failed to release PreparedStatement '{}' resources when closing statement {} for session {}: {}",

Check warning on line 380 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Line is longer than 100 characters (found 110).

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZr8rWZcsi64p9AHez7L&open=AZr8rWZcsi64p9AHez7L&pullRequest=16880
preparedStatementName,
statementId,
session,
e.getMessage(),
e);
}
}
}

public long requestQueryId(IClientSession session, Long statementId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public ServerContext createContext(TProtocol in, TProtocol out) {

public void deleteContext(ServerContext context, TProtocol in, TProtocol out) {
getSessionManager().removeCurrSession();

if (context != null && factory != null) {
((JudgableServerContext) context).whenDisconnect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@
relationSqlParser = new SqlParser();
}

private TSExecuteStatementResp executeStatementInternal(

Check warning on line 300 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 174 to 64, Complexity from 29 to 14, Nesting Level from 4 to 2, Number of Variables from 28 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZr8rWYwsi64p9AHez7K&open=AZr8rWYwsi64p9AHez7K&pullRequest=16880
TSExecuteStatementReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
Expand Down Expand Up @@ -1445,6 +1445,7 @@
req.statementId,
req.isSetStatementId(),
req.isSetQueryId(),
req.isSetPreparedStatementName() ? req.getPreparedStatementName() : null,
COORDINATOR::cleanupQueryExecution);
}

Expand Down
Loading
Loading