diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 357c15c78569..5e418072a7d9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -137,4 +137,10 @@ public DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportio setProperty("datanode_memory_proportion", dataNodeMemoryProportion); return this; } + + @Override + public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) { + setProperty("query_cost_stat_window", String.valueOf(queryCostStatWindow)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index 1af7cb8f613a..bba4c964f957 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -93,4 +93,9 @@ public DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs) public DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion) { return this; } + + @Override + public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 0ae46ffc70f2..d57015b13964 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -51,4 +51,6 @@ DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs); DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion); + + DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow); } diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java new file mode 100644 index 000000000000..66941ec784f8 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.relational.it.query.recent.informationschema; + +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.db.queryengine.execution.QueryState; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; + +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.END_TIME_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.NUMS; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.table.InformationSchema.getSchemaTables; +import static org.apache.iotdb.db.it.utils.TestUtils.createUser; +import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class}) +// This IT will run at least 60s, so we only run it in 1C1D +public class IoTDBCurrentQueriesIT { + private static final int CURRENT_QUERIES_COLUMN_NUM = + getSchemaTables().get("current_queries").getColumnNum(); + private static final int QUERIES_COSTS_HISTOGRAM_COLUMN_NUM = + getSchemaTables().get("queries_costs_histogram").getColumnNum(); + private static final String ADMIN_NAME = + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(); + private static final String ADMIN_PWD = + CommonDescriptor.getInstance().getConfig().getAdminPassword(); + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getDataNodeConfig().setQueryCostStatWindow(1); + EnvFactory.getEnv().initClusterEnvironment(); + createUser("test", "test123123456"); + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testCurrentQueries() { + try { + Connection connection = + EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement(); + statement.execute("USE information_schema"); + + // 1. query current_queries table + String sql = "SELECT * FROM current_queries"; + ResultSet resultSet = statement.executeQuery(sql); + ResultSetMetaData metaData = resultSet.getMetaData(); + Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount()); + int rowNum = 0; + while (resultSet.next()) { + Assert.assertEquals(QueryState.RUNNING.name(), resultSet.getString(STATE_TABLE_MODEL)); + Assert.assertEquals(null, resultSet.getString(END_TIME_TABLE_MODEL)); + Assert.assertEquals(sql, resultSet.getString(STATEMENT_TABLE_MODEL)); + Assert.assertEquals(ADMIN_NAME, resultSet.getString(USER_TABLE_MODEL)); + rowNum++; + } + Assert.assertEquals(1, rowNum); + resultSet.close(); + + // 2. query queries_costs_histogram table + sql = "SELECT * FROM queries_costs_histogram"; + resultSet = statement.executeQuery(sql); + metaData = resultSet.getMetaData(); + Assert.assertEquals(QUERIES_COSTS_HISTOGRAM_COLUMN_NUM, metaData.getColumnCount()); + rowNum = 0; + int queriesCount = 0; + while (resultSet.next()) { + int nums = resultSet.getInt(NUMS); + if (nums > 0) { + queriesCount++; + } + rowNum++; + } + Assert.assertEquals(1, queriesCount); + Assert.assertEquals(61, rowNum); + + // 3. requery current_queries table + sql = "SELECT * FROM current_queries"; + resultSet = statement.executeQuery(sql); + metaData = resultSet.getMetaData(); + Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM, metaData.getColumnCount()); + rowNum = 0; + int finishedQueries = 0; + while (resultSet.next()) { + if (QueryState.FINISHED.name().equals(resultSet.getString(STATE_TABLE_MODEL))) { + finishedQueries++; + } + rowNum++; + } + // three rows in the result, 2 FINISHED and 1 RUNNING + Assert.assertEquals(3, rowNum); + Assert.assertEquals(2, finishedQueries); + resultSet.close(); + + // 4. test the expired QueryInfo was evicted + Thread.sleep(61_001); + resultSet = statement.executeQuery(sql); + rowNum = 0; + while (resultSet.next()) { + rowNum++; + } + // one row in the result, current query + Assert.assertEquals(1, rowNum); + resultSet.close(); + + sql = "SELECT * FROM queries_costs_histogram"; + resultSet = statement.executeQuery(sql); + queriesCount = 0; + while (resultSet.next()) { + int nums = resultSet.getInt(NUMS); + if (nums > 0) { + queriesCount++; + } + } + // the last current_queries table query was recorded, others are evicted + Assert.assertEquals(1, queriesCount); + } catch (Exception e) { + fail(e.getMessage()); + } + + // 5. test privilege + testPrivilege(); + } + + private void testPrivilege() { + // 1. test current_queries table + try (Connection connection = + EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + String sql = "SELECT * FROM information_schema.current_queries"; + + // another user executes a query + try (Connection connection2 = + EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD, BaseEnv.TABLE_SQL_DIALECT)) { + ResultSet resultSet = connection2.createStatement().executeQuery(sql); + resultSet.close(); + } catch (Exception e) { + fail(e.getMessage()); + } + + // current user query current_queries table + ResultSet resultSet = statement.executeQuery(sql); + int rowNum = 0; + while (resultSet.next()) { + rowNum++; + } + // only current query in the result + Assert.assertEquals(1, rowNum); + } catch (SQLException e) { + fail(e.getMessage()); + } + + // 2. test queries_costs_histogram table + try (Connection connection = + EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.executeQuery("SELECT * FROM information_schema.queries_costs_histogram"); + } catch (SQLException e) { + Assert.assertEquals( + "803: Access Denied: No permissions for this operation, please add privilege SYSTEM", + e.getMessage()); + } + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index e7bab16ad1ff..4736d9b0521d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -399,6 +399,7 @@ public void testInformationSchema() throws SQLException { "config_nodes,INF,", "configurations,INF,", "connections,INF,", + "current_queries,INF,", "data_nodes,INF,", "databases,INF,", "functions,INF,", @@ -407,6 +408,7 @@ public void testInformationSchema() throws SQLException { "pipe_plugins,INF,", "pipes,INF,", "queries,INF,", + "queries_costs_histogram,INF,", "regions,INF,", "subscriptions,INF,", "tables,INF,", @@ -634,12 +636,14 @@ public void testInformationSchema() throws SQLException { "information_schema,config_nodes,INF,USING,null,SYSTEM VIEW,", "information_schema,data_nodes,INF,USING,null,SYSTEM VIEW,", "information_schema,connections,INF,USING,null,SYSTEM VIEW,", + "information_schema,current_queries,INF,USING,null,SYSTEM VIEW,", + "information_schema,queries_costs_histogram,INF,USING,null,SYSTEM VIEW,", "test,test,INF,USING,test,BASE TABLE,", "test,view_table,100,USING,null,VIEW FROM TREE,"))); TestUtils.assertResultSetEqual( statement.executeQuery("count devices from tables where status = 'USING'"), "count(devices),", - Collections.singleton("19,")); + Collections.singleton("21,")); TestUtils.assertResultSetEqual( statement.executeQuery( "select * from columns where table_name = 'queries' or database = 'test'"), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index edb1d1c26743..9f67ab268c2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -814,6 +814,9 @@ public class IoTDBConfig { /** time cost(ms) threshold for slow query. Unit: millisecond */ private long slowQueryThreshold = 10000; + /** time window threshold for record of history queries. Unit: minute */ + private int queryCostStatWindow = 0; + private int patternMatchingThreshold = 1000000; /** @@ -2627,6 +2630,14 @@ public void setSlowQueryThreshold(long slowQueryThreshold) { this.slowQueryThreshold = slowQueryThreshold; } + public int getQueryCostStatWindow() { + return queryCostStatWindow; + } + + public void setQueryCostStatWindow(int queryCostStatWindow) { + this.queryCostStatWindow = queryCostStatWindow; + } + public boolean isEnableIndex() { return enableIndex; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index d32b0b51f576..cfdb4c141b41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -813,6 +813,11 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException properties.getProperty( "slow_query_threshold", String.valueOf(conf.getSlowQueryThreshold())))); + conf.setQueryCostStatWindow( + Integer.parseInt( + properties.getProperty( + "query_cost_stat_window", String.valueOf(conf.getQueryCostStatWindow())))); + conf.setDataRegionNum( Integer.parseInt( properties.getProperty("data_region_num", String.valueOf(conf.getDataRegionNum())))); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 66be144edefb..b657523987dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -212,7 +212,8 @@ public Response executeFastLastQueryStatement( t = e; return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); } finally { - long costTime = System.nanoTime() - startTime; + long endTime = System.nanoTime(); + long costTime = endTime - startTime; StatementType statementType = Optional.ofNullable(statement) @@ -227,7 +228,18 @@ public Response executeFastLastQueryStatement( if (queryId != null) { COORDINATOR.cleanupQueryExecution(queryId); } else { - recordQueries(() -> costTime, new FastLastQueryContentSupplier(prefixPathList), t); + IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + + Supplier contentOfQuerySupplier = new FastLastQueryContentSupplier(prefixPathList); + COORDINATOR.recordCurrentQueries( + null, + startTime / 1_000_000, + endTime / 1_000_000, + costTime, + contentOfQuerySupplier, + clientSession.getUsername(), + clientSession.getClientAddress()); + recordQueries(() -> costTime, contentOfQuerySupplier, t); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 3d4222629d6c..cb5347e51f2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -1050,13 +1050,23 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath( resp.setMoreData(false); - long costTime = System.nanoTime() - startTime; + long endTime = System.nanoTime(); + long costTime = endTime - startTime; CommonUtils.addStatementExecutionLatency( OperationType.EXECUTE_QUERY_STATEMENT, StatementType.FAST_LAST_QUERY.name(), costTime); CommonUtils.addQueryLatency(StatementType.FAST_LAST_QUERY, costTime); - recordQueries( - () -> costTime, () -> String.format("thrift fastLastQuery %s", prefixPath), null); + + String statement = String.format("thrift fastLastQuery %s", prefixPath); + COORDINATOR.recordCurrentQueries( + null, + startTime / 1_000_000, + endTime / 1_000_000, + costTime, + () -> statement, + clientSession.getUsername(), + clientSession.getClientAddress()); + recordQueries(() -> costTime, () -> statement, null); return resp; } catch (final Exception e) { return RpcUtils.getTSExecuteStatementResp( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java index a59ce8334bf5..44e67aa7ba60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.common; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -37,6 +38,8 @@ public class QueryId { public static final QueryId MOCK_QUERY_ID = QueryId.valueOf("mock_query_id"); + private static final int DATANODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + private final String id; private int nextPlanNodeIndex; @@ -67,6 +70,10 @@ public String getId() { return id; } + public static int getDataNodeId() { + return DATANODE_ID; + } + @Override public String toString() { return id; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index daceffce6b7b..76d84c741de5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -68,6 +68,7 @@ import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.ConnectionInfo; +import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask; @@ -166,6 +167,10 @@ public static Iterator getSupplier( return new DataNodesSupplier(dataTypes, userEntity); case InformationSchema.CONNECTIONS: return new ConnectionsSupplier(dataTypes, userEntity); + case InformationSchema.CURRENT_QUERIES: + return new CurrentQueriesSupplier(dataTypes, userEntity); + case InformationSchema.QUERIES_COSTS_HISTOGRAM: + return new QueriesCostsHistogramSupplier(dataTypes, userEntity); default: throw new UnsupportedOperationException("Unknown table: " + tableName); } @@ -201,14 +206,11 @@ protected void constructLine() { final IQueryExecution queryExecution = queryExecutions.get(nextConsumedIndex); if (queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) { - final String[] splits = queryExecution.getQueryId().split("_"); - final int dataNodeId = Integer.parseInt(splits[splits.length - 1]); - columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId())); columnBuilders[1].writeLong( TimestampPrecisionUtils.convertToCurrPrecision( queryExecution.getStartExecutionTime(), TimeUnit.MILLISECONDS)); - columnBuilders[2].writeInt(dataNodeId); + columnBuilders[2].writeInt(QueryId.getDataNodeId()); columnBuilders[3].writeFloat( (float) (currTime - queryExecution.getStartExecutionTime()) / 1000); columnBuilders[4].writeBinary( @@ -1181,4 +1183,140 @@ public boolean hasNext() { return sessionConnectionIterator.hasNext(); } } + + private static class CurrentQueriesSupplier extends TsBlockSupplier { + private int nextConsumedIndex; + private List queriesInfo; + + private CurrentQueriesSupplier(final List dataTypes, final UserEntity userEntity) { + super(dataTypes); + queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo(); + try { + accessControl.checkUserGlobalSysPrivilege(userEntity); + } catch (final AccessDeniedException e) { + queriesInfo = + queriesInfo.stream() + .filter(iQueryInfo -> userEntity.getUsername().equals(iQueryInfo.getUser())) + .collect(Collectors.toList()); + } + } + + @Override + protected void constructLine() { + final Coordinator.StatedQueriesInfo queryInfo = queriesInfo.get(nextConsumedIndex); + columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId())); + columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState())); + columnBuilders[2].writeLong( + TimestampPrecisionUtils.convertToCurrPrecision( + queryInfo.getStartTime(), TimeUnit.MILLISECONDS)); + if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) { + columnBuilders[3].appendNull(); + } else { + columnBuilders[3].writeLong( + TimestampPrecisionUtils.convertToCurrPrecision( + queryInfo.getEndTime(), TimeUnit.MILLISECONDS)); + } + columnBuilders[4].writeInt(QueryId.getDataNodeId()); + columnBuilders[5].writeFloat(queryInfo.getCostTime()); + columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement())); + columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser())); + columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost())); + resultBuilder.declarePosition(); + nextConsumedIndex++; + } + + @Override + public boolean hasNext() { + return nextConsumedIndex < queriesInfo.size(); + } + } + + private static class QueriesCostsHistogramSupplier extends TsBlockSupplier { + private int nextConsumedIndex; + private static final Binary[] BUCKETS = + new Binary[] { + BytesUtils.valueOf("[0,1)"), + BytesUtils.valueOf("[1,2)"), + BytesUtils.valueOf("[2,3)"), + BytesUtils.valueOf("[3,4)"), + BytesUtils.valueOf("[4,5)"), + BytesUtils.valueOf("[5,6)"), + BytesUtils.valueOf("[6,7)"), + BytesUtils.valueOf("[7,8)"), + BytesUtils.valueOf("[8,9)"), + BytesUtils.valueOf("[9,10)"), + BytesUtils.valueOf("[10,11)"), + BytesUtils.valueOf("[11,12)"), + BytesUtils.valueOf("[12,13)"), + BytesUtils.valueOf("[13,14)"), + BytesUtils.valueOf("[14,15)"), + BytesUtils.valueOf("[15,16)"), + BytesUtils.valueOf("[16,17)"), + BytesUtils.valueOf("[17,18)"), + BytesUtils.valueOf("[18,19)"), + BytesUtils.valueOf("[19,20)"), + BytesUtils.valueOf("[20,21)"), + BytesUtils.valueOf("[21,22)"), + BytesUtils.valueOf("[22,23)"), + BytesUtils.valueOf("[23,24)"), + BytesUtils.valueOf("[24,25)"), + BytesUtils.valueOf("[25,26)"), + BytesUtils.valueOf("[26,27)"), + BytesUtils.valueOf("[27,28)"), + BytesUtils.valueOf("[28,29)"), + BytesUtils.valueOf("[29,30)"), + BytesUtils.valueOf("[30,31)"), + BytesUtils.valueOf("[31,32)"), + BytesUtils.valueOf("[32,33)"), + BytesUtils.valueOf("[33,34)"), + BytesUtils.valueOf("[34,35)"), + BytesUtils.valueOf("[35,36)"), + BytesUtils.valueOf("[36,37)"), + BytesUtils.valueOf("[37,38)"), + BytesUtils.valueOf("[38,39)"), + BytesUtils.valueOf("[39,40)"), + BytesUtils.valueOf("[40,41)"), + BytesUtils.valueOf("[41,42)"), + BytesUtils.valueOf("[42,43)"), + BytesUtils.valueOf("[43,44)"), + BytesUtils.valueOf("[44,45)"), + BytesUtils.valueOf("[45,46)"), + BytesUtils.valueOf("[46,47)"), + BytesUtils.valueOf("[47,48)"), + BytesUtils.valueOf("[48,49)"), + BytesUtils.valueOf("[49,50)"), + BytesUtils.valueOf("[50,51)"), + BytesUtils.valueOf("[51,52)"), + BytesUtils.valueOf("[52,53)"), + BytesUtils.valueOf("[53,54)"), + BytesUtils.valueOf("[54,55)"), + BytesUtils.valueOf("[55,56)"), + BytesUtils.valueOf("[56,57)"), + BytesUtils.valueOf("[57,58)"), + BytesUtils.valueOf("[58,59)"), + BytesUtils.valueOf("[59,60)"), + BytesUtils.valueOf("60+") + }; + private final int[] currentQueriesCostHistogram; + + private QueriesCostsHistogramSupplier( + final List dataTypes, final UserEntity userEntity) { + super(dataTypes); + accessControl.checkUserGlobalSysPrivilege(userEntity); + currentQueriesCostHistogram = Coordinator.getInstance().getCurrentQueriesCostHistogram(); + } + + @Override + protected void constructLine() { + columnBuilders[0].writeBinary(BUCKETS[nextConsumedIndex]); + columnBuilders[1].writeInt(currentQueriesCostHistogram[nextConsumedIndex]); + resultBuilder.declarePosition(); + nextConsumedIndex++; + } + + @Override + public boolean hasNext() { + return nextConsumedIndex < 61; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 7708f6c18cda..78cdee720da2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -42,6 +43,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator; +import org.apache.iotdb.db.queryengine.execution.QueryState; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; @@ -141,23 +143,37 @@ import org.apache.iotdb.db.utils.SetThreadName; import org.apache.thrift.TBase; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; +import static org.apache.iotdb.db.queryengine.plan.Coordinator.QueryInfo.DEFAULT_END_TIME; import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest; +import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance; +import static org.apache.tsfile.utils.RamUsageEstimator.sizeOfCharArray; /** * The coordinator for MPP. It manages all the queries which are executed in current Node. And it @@ -203,12 +219,32 @@ public class Coordinator { private final ConcurrentHashMap queryExecutionMap; + private final BlockingDeque currentQueriesInfo = new LinkedBlockingDeque<>(); + private final AtomicInteger[] currentQueriesCostHistogram = new AtomicInteger[61]; + private final ScheduledExecutorService retryFailTasksExecutor = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.EXPIRED_QUERIES_INFO_CLEAR.getName()); + private final StatementRewrite statementRewrite; private final List logicalPlanOptimizers; private final List distributionPlanOptimizers; private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier dataNodeLocationSupplier; private final TypeManager typeManager; + { + for (int i = 0; i < 61; i++) { + currentQueriesCostHistogram[i] = new AtomicInteger(); + } + + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + retryFailTasksExecutor, + this::clearExpiredQueriesInfoTask, + 1_000L, + 1_000L, + TimeUnit.MILLISECONDS); + LOGGER.info("Expired-Queries-Info-Clear thread is successfully started."); + } + static { coordinatorMemoryBlock = IoTDBDescriptor.getInstance() @@ -625,12 +661,22 @@ public void cleanupQueryExecution( try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); + boolean isUserQuery = queryExecution.isQuery() && queryExecution.isUserQuery(); + Supplier contentOfQuerySupplier = + new ContentOfQuerySupplier(nativeApiRequest, queryExecution); + if (isUserQuery) { + recordCurrentQueries( + queryExecution.getQueryId(), + queryExecution.getStartExecutionTime(), + System.currentTimeMillis(), + queryExecution.getTotalExecutionTime(), + contentOfQuerySupplier, + queryExecution.getUser(), + queryExecution.getClientHostname()); + } queryExecutionMap.remove(queryId); - if (queryExecution.isQuery() && queryExecution.isUserQuery()) { - recordQueries( - queryExecution::getTotalExecutionTime, - new ContentOfQuerySupplier(nativeApiRequest, queryExecution), - t); + if (isUserQuery) { + recordQueries(queryExecution::getTotalExecutionTime, contentOfQuerySupplier, t); } } } @@ -722,4 +768,249 @@ public DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat public ExecutorService getDispatchExecutor() { return dispatchExecutor; } + + /** record query info in memory data structure */ + public void recordCurrentQueries( + String queryId, + long startTime, + long endTime, + long costTimeInNs, + Supplier contentOfQuerySupplier, + String user, + String clientHost) { + if (CONFIG.getQueryCostStatWindow() <= 0) { + return; + } + + if (queryId == null) { + // fast Last query API executeFastLastDataQueryForOnePrefixPath will enter this + queryId = queryIdGenerator.createNextQueryId().getId(); + } + + // ns -> s + float costTimeInSeconds = costTimeInNs * 1e-9f; + + QueryInfo queryInfo = + new QueryInfo( + queryId, + startTime, + endTime, + costTimeInSeconds, + contentOfQuerySupplier.get(), + user, + clientHost); + + while (!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) { + // try to release memory from the head of queue + QueryInfo queryInfoToRelease = currentQueriesInfo.poll(); + if (queryInfoToRelease == null) { + // no element in the queue and the memory is still not enough, skip this record + return; + } else { + // release memory and unrecord in histogram + coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease)); + unrecordInHistogram(queryInfoToRelease.costTime); + } + } + + currentQueriesInfo.addLast(queryInfo); + recordInHistogram(costTimeInSeconds); + } + + private void recordInHistogram(float costTimeInSeconds) { + int bucket = (int) costTimeInSeconds; + if (bucket < 60) { + currentQueriesCostHistogram[bucket].getAndIncrement(); + } else { + currentQueriesCostHistogram[60].getAndIncrement(); + } + } + + private void unrecordInHistogram(float costTimeInSeconds) { + int bucket = (int) costTimeInSeconds; + if (bucket < 60) { + currentQueriesCostHistogram[bucket].getAndDecrement(); + } else { + currentQueriesCostHistogram[60].getAndDecrement(); + } + } + + private void clearExpiredQueriesInfoTask() { + int queryCostStatWindow = CONFIG.getQueryCostStatWindow(); + if (queryCostStatWindow <= 0) { + return; + } + + // the QueryInfo smaller than expired time will be cleared + long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 1_000L; + // peek head, the head QueryInfo is in the time window, return directly + QueryInfo queryInfo = currentQueriesInfo.peekFirst(); + if (queryInfo == null || queryInfo.endTime >= expiredTime) { + return; + } + + queryInfo = currentQueriesInfo.poll(); + while (queryInfo != null) { + if (queryInfo.endTime < expiredTime) { + // out of time window, clear queryInfo + coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo)); + unrecordInHistogram(queryInfo.costTime); + queryInfo = currentQueriesInfo.poll(); + } else { + // the head of the queue is not expired, add back + currentQueriesInfo.addFirst(queryInfo); + // there is no more candidate to clear + return; + } + } + } + + public List getCurrentQueriesInfo() { + List runningQueries = getAllQueryExecutions(); + Set runningQueryIdSet = + runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet()); + List result = new ArrayList<>(); + + // add History queries (satisfy the time window) info + Iterator historyQueriesIterator = currentQueriesInfo.iterator(); + Set repetitionQueryIdSet = new HashSet<>(); + long currentTime = System.currentTimeMillis(); + long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 1_000L; + while (historyQueriesIterator.hasNext()) { + QueryInfo queryInfo = historyQueriesIterator.next(); + if (queryInfo.endTime < needRecordTime) { + // out of time window, ignore it + } else { + if (runningQueryIdSet.contains(queryInfo.queryId)) { + repetitionQueryIdSet.add(queryInfo.queryId); + } + result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo)); + } + } + + // add Running queries info after remove the repetitions which has recorded in History queries + result.addAll( + runningQueries.stream() + .filter(queryExecution -> !repetitionQueryIdSet.contains(queryExecution.getQueryId())) + .map( + queryExecution -> + new StatedQueriesInfo( + QueryState.RUNNING, + queryExecution.getQueryId(), + queryExecution.getStartExecutionTime(), + DEFAULT_END_TIME, + (currentTime - queryExecution.getStartExecutionTime()) / 1000, + queryExecution.getExecuteSQL().orElse("UNKNOWN"), + queryExecution.getUser(), + queryExecution.getClientHostname())) + .collect(Collectors.toList())); + return result; + } + + public int[] getCurrentQueriesCostHistogram() { + return Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray(); + } + + public static class QueryInfo implements Accountable { + public static final long DEFAULT_END_TIME = -1L; + private static final long INSTANCE_SIZE = shallowSizeOfInstance(QueryInfo.class); + + private final String queryId; + + // unit: millisecond + private final long startTime; + private final long endTime; + // unit: second + private final float costTime; + + private final String statement; + private final String user; + private final String clientHost; + + public QueryInfo( + String queryId, + long startTime, + long endTime, + float costTime, + String statement, + String user, + String clientHost) { + this.queryId = queryId; + this.startTime = startTime; + this.endTime = endTime; + this.costTime = costTime; + this.statement = statement; + this.user = user; + this.clientHost = clientHost; + } + + public String getClientHost() { + return clientHost; + } + + public String getUser() { + return user; + } + + public long getStartTime() { + return startTime; + } + + public long getEndTime() { + return endTime; + } + + public float getCostTime() { + return costTime; + } + + public String getQueryId() { + return queryId; + } + + public String getStatement() { + return statement; + } + + @Override + public long ramBytesUsed() { + return INSTANCE_SIZE + + sizeOfCharArray(statement.length()) + + sizeOfCharArray(user.length()) + + sizeOfCharArray(clientHost.length()); + } + } + + public static class StatedQueriesInfo extends QueryInfo { + private final QueryState queryState; + + private StatedQueriesInfo(QueryState queryState, QueryInfo queryInfo) { + super( + queryInfo.queryId, + queryInfo.startTime, + queryInfo.endTime, + queryInfo.costTime, + queryInfo.statement, + queryInfo.user, + queryInfo.clientHost); + this.queryState = queryState; + } + + private StatedQueriesInfo( + QueryState queryState, + String queryId, + long startTime, + long endTime, + long costTime, + String statement, + String user, + String clientHost) { + super(queryId, startTime, endTime, costTime, statement, user, clientHost); + this.queryState = queryState; + } + + public String getQueryState() { + return queryState.name(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 98257c24293e..e98f016767f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -77,4 +77,6 @@ public interface IQueryExecution { IClientSession.SqlDialect getSQLDialect(); String getUser(); + + String getClientHostname(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 2c1657e839e2..4734db5850a4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -700,6 +700,11 @@ public String getUser() { return context.getSession().getUserName(); } + @Override + public String getClientHostname() { + return context.getCliHostname(); + } + public MPPQueryContext getContext() { return context; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index 1880924f297a..823a620820fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -353,4 +353,8 @@ public IClientSession.SqlDialect getSQLDialect() { public String getUser() { return context.getSession().getUserName(); } + + public String getClientHostname() { + return context.getCliHostname(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index f8cf497546e6..d7d755ddc1da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -86,6 +86,8 @@ public List getDataNodeLocations(final String tableName) { switch (tableName) { case InformationSchema.QUERIES: case InformationSchema.CONNECTIONS: + case InformationSchema.CURRENT_QUERIES: + case InformationSchema.QUERIES_COSTS_HISTOGRAM: return getReadableDataNodeLocations(); case InformationSchema.DATABASES: case InformationSchema.TABLES: diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java index 4ca38a5c8d9f..8fc7d01437cf 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java @@ -1910,5 +1910,10 @@ public QueryType getQueryType() { public boolean isUserQuery() { return false; } + + @Override + public String getClientHostname() { + return SessionConfig.DEFAULT_HOST; + } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java index a4cef177d813..850755702016 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java @@ -83,6 +83,8 @@ public class PlanTester { public List getDataNodeLocations(String table) { switch (table) { case "queries": + case "current_queries": + case "queries_costs_histogram": return ImmutableList.of( genDataNodeLocation(1, "192.0.1.1"), genDataNodeLocation(2, "192.0.1.2")); default: diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java new file mode 100644 index 000000000000..1e3163321ec4 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; +import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; + +import java.util.Optional; + +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.BIN; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.CLIENT_IP; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COST_TIME; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.END_TIME_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.NUMS; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.QUERY_ID_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.START_TIME_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.infoSchemaTableScan; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; + +public class CurrentQueriesTest { + private final PlanTester planTester = new PlanTester(); + + @Test + public void testCurrentQueries() { + LogicalQueryPlan logicalQueryPlan = + planTester.createPlan("select * from information_schema.current_queries"); + assertPlan( + logicalQueryPlan, + output( + infoSchemaTableScan( + "information_schema.current_queries", + Optional.empty(), + ImmutableList.of( + QUERY_ID_TABLE_MODEL, + STATE_TABLE_MODEL, + START_TIME_TABLE_MODEL, + END_TIME_TABLE_MODEL, + DATA_NODE_ID_TABLE_MODEL, + COST_TIME, + STATEMENT_TABLE_MODEL, + USER_TABLE_MODEL, + CLIENT_IP)))); + + // - Exchange + // Output - Collect - Exchange + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange()))); + // TableScan + assertPlan( + planTester.getFragmentPlan(1), + infoSchemaTableScan("information_schema.current_queries", Optional.of(1))); + // TableScan + assertPlan( + planTester.getFragmentPlan(2), + infoSchemaTableScan("information_schema.current_queries", Optional.of(2))); + } + + @Test + public void testQueriesCostsHistogram() { + LogicalQueryPlan logicalQueryPlan = + planTester.createPlan("select * from information_schema.queries_costs_histogram"); + assertPlan( + logicalQueryPlan, + output( + infoSchemaTableScan( + "information_schema.queries_costs_histogram", + Optional.empty(), + ImmutableList.of(BIN, NUMS)))); + + // - Exchange + // Output - Collect - Exchange + assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(), exchange()))); + // TableScan + assertPlan( + planTester.getFragmentPlan(1), + infoSchemaTableScan("information_schema.queries_costs_histogram", Optional.of(1))); + // TableScan + assertPlan( + planTester.getFragmentPlan(2), + infoSchemaTableScan("information_schema.queries_costs_histogram", Optional.of(2))); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java similarity index 94% rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java rename to iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java index 63b7b783d747..7161c68f4e9e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.db.queryengine.plan.relational.analyzer; +package org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester; @@ -32,7 +32,9 @@ import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.QUERY_ID_TABLE_MODEL; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.START_TIME_TABLE_MODEL; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; @@ -61,8 +63,8 @@ public void testNormal() { START_TIME_TABLE_MODEL, DATA_NODE_ID_TABLE_MODEL, ELAPSED_TIME_TABLE_MODEL, - STATEMENT.toLowerCase(Locale.ENGLISH), - USER.toLowerCase(Locale.ENGLISH))))); + STATEMENT_TABLE_MODEL, + USER_TABLE_MODEL)))); // - Exchange // Output - Collect - Exchange diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 4b31bf9a286c..6bbc4124a9fb 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1080,6 +1080,12 @@ max_tsblock_line_number=1000 # Datatype: long slow_query_threshold=10000 +# Time window threshold(min) for record of history queries. +# effectiveMode: hot_reload +# Datatype: int +# Privilege: SYSTEM +query_cost_stat_window=0 + # The max executing time of query. unit: ms # effectiveMode: restart # Datatype: int diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 390e9f80e9b3..6f9f95ca8fe8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -35,6 +35,7 @@ public enum ThreadName { FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"), FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"), DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"), + EXPIRED_QUERIES_INFO_CLEAR("Expired-Queries-Info-Clear"), // -------------------------- MPP -------------------------- MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"), MPP_DATA_EXCHANGE_TASK_EXECUTOR("MPP-Data-Exchange-Task-Executors"), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 943bdeb9cba4..1704a9223458 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -214,11 +214,19 @@ private ColumnHeaderConstant() { public static final String CLIENT_IP = "client_ip"; public static final String QUERY_ID_TABLE_MODEL = "query_id"; - public static final String QUERY_ID_START_TIME_TABLE_MODEL = "start_time"; public static final String DATA_NODE_ID_TABLE_MODEL = "datanode_id"; public static final String START_TIME_TABLE_MODEL = "start_time"; public static final String ELAPSED_TIME_TABLE_MODEL = "elapsed_time"; + // column names for current_queries and queries_costs_histogram + public static final String STATE_TABLE_MODEL = "state"; + public static final String END_TIME_TABLE_MODEL = "end_time"; + public static final String COST_TIME = "cost_time"; + public static final String STATEMENT_TABLE_MODEL = "statement"; + public static final String USER_TABLE_MODEL = "user"; + public static final String BIN = "bin"; + public static final String NUMS = "nums"; + public static final String TABLE_NAME_TABLE_MODEL = "table_name"; public static final String TABLE_TYPE_TABLE_MODEL = "table_type"; public static final String COLUMN_NAME_TABLE_MODEL = "column_name"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index 243bc41c40ce..b8e03423d61c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -50,6 +50,8 @@ public class InformationSchema { public static final String CONFIG_NODES = "config_nodes"; public static final String DATA_NODES = "data_nodes"; public static final String CONNECTIONS = "connections"; + public static final String CURRENT_QUERIES = "current_queries"; + public static final String QUERIES_COSTS_HISTOGRAM = "queries_costs_histogram"; static { final TsTable queriesTable = new TsTable(QUERIES); @@ -57,17 +59,15 @@ public class InformationSchema { new TagColumnSchema(ColumnHeaderConstant.QUERY_ID_TABLE_MODEL, TSDataType.STRING)); queriesTable.addColumnSchema( new AttributeColumnSchema( - ColumnHeaderConstant.QUERY_ID_START_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + ColumnHeaderConstant.START_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); queriesTable.addColumnSchema( new AttributeColumnSchema(ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL, TSDataType.INT32)); queriesTable.addColumnSchema( new AttributeColumnSchema(ColumnHeaderConstant.ELAPSED_TIME_TABLE_MODEL, TSDataType.FLOAT)); queriesTable.addColumnSchema( - new AttributeColumnSchema( - ColumnHeaderConstant.STATEMENT.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + new AttributeColumnSchema(ColumnHeaderConstant.STATEMENT_TABLE_MODEL, TSDataType.STRING)); queriesTable.addColumnSchema( - new AttributeColumnSchema( - ColumnHeaderConstant.USER.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + new AttributeColumnSchema(ColumnHeaderConstant.USER_TABLE_MODEL, TSDataType.STRING)); queriesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(QUERIES, queriesTable); @@ -361,6 +361,37 @@ public class InformationSchema { new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP, TSDataType.STRING)); connectionsTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(CONNECTIONS, connectionsTable); + + final TsTable currentQueriesTable = new TsTable(CURRENT_QUERIES); + currentQueriesTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.QUERY_ID_TABLE_MODEL, TSDataType.STRING)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.STATE_TABLE_MODEL, TSDataType.STRING)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.START_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.END_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL, TSDataType.INT32)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.COST_TIME, TSDataType.FLOAT)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.STATEMENT_TABLE_MODEL, TSDataType.STRING)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.USER_TABLE_MODEL, TSDataType.STRING)); + currentQueriesTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP, TSDataType.STRING)); + currentQueriesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(CURRENT_QUERIES, currentQueriesTable); + + final TsTable queriesCostsHistogramTable = new TsTable(QUERIES_COSTS_HISTOGRAM); + queriesCostsHistogramTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.BIN, TSDataType.STRING)); + queriesCostsHistogramTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.NUMS, TSDataType.INT32)); + queriesCostsHistogramTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(QUERIES_COSTS_HISTOGRAM, queriesCostsHistogramTable); } public static Map getSchemaTables() {