Skip to content

Commit 587e4bf

Browse files
committed
implement
Signed-off-by: Weihao Li <[email protected]>
1 parent 1f956e9 commit 587e4bf

File tree

13 files changed

+504
-15
lines changed

13 files changed

+504
-15
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ public Response executeFastLastQueryStatement(
212212
t = e;
213213
return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
214214
} finally {
215-
long costTime = System.nanoTime() - startTime;
215+
long endTime = System.nanoTime();
216+
long costTime = endTime - startTime;
216217

217218
StatementType statementType =
218219
Optional.ofNullable(statement)
@@ -227,6 +228,16 @@ public Response executeFastLastQueryStatement(
227228
if (queryId != null) {
228229
COORDINATOR.cleanupQueryExecution(queryId);
229230
} else {
231+
IClientSession clientSession = SESSION_MANAGER.getCurrSession();
232+
233+
COORDINATOR.recordCurrentQueries(
234+
null,
235+
startTime / 1_000_000,
236+
endTime / 1_000_000,
237+
costTime,
238+
restFastLastQueryReq(prefixPathList),
239+
clientSession.getUsername(),
240+
clientSession.getClientAddress());
230241
recordQueries(() -> costTime, new FastLastQueryContentSupplier(prefixPathList), t);
231242
}
232243
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,13 +1021,23 @@ public TSExecuteStatementResp executeFastLastDataQueryForOnePrefixPath(
10211021

10221022
resp.setMoreData(false);
10231023

1024-
long costTime = System.nanoTime() - startTime;
1024+
long endTime = System.nanoTime();
1025+
long costTime = endTime - startTime;
10251026

10261027
CommonUtils.addStatementExecutionLatency(
10271028
OperationType.EXECUTE_QUERY_STATEMENT, StatementType.FAST_LAST_QUERY.name(), costTime);
10281029
CommonUtils.addQueryLatency(StatementType.FAST_LAST_QUERY, costTime);
1029-
recordQueries(
1030-
() -> costTime, () -> String.format("thrift fastLastQuery %s", prefixPath), null);
1030+
1031+
String statement = String.format("thrift fastLastQuery %s", prefixPath);
1032+
COORDINATOR.recordCurrentQueries(
1033+
null,
1034+
startTime / 1_000_000,
1035+
endTime / 1_000_000,
1036+
costTime,
1037+
statement,
1038+
clientSession.getUsername(),
1039+
clientSession.getClientAddress());
1040+
recordQueries(() -> costTime, () -> statement, null);
10311041
return resp;
10321042
} catch (final Exception e) {
10331043
return RpcUtils.getTSExecuteStatementResp(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ public String getId() {
6767
return id;
6868
}
6969

70+
public int getDataNodeId() {
71+
return getDataNodeId(id);
72+
}
73+
74+
public static int getDataNodeId(String queryId) {
75+
String[] splits = queryId.split("_");
76+
return Integer.parseInt(splits[splits.length - 1]);
77+
}
78+
7079
@Override
7180
public String toString() {
7281
return id;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java

Lines changed: 142 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import org.apache.iotdb.db.protocol.session.IClientSession;
7474
import org.apache.iotdb.db.protocol.session.SessionManager;
7575
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
76+
import org.apache.iotdb.db.queryengine.common.QueryId;
7677
import org.apache.iotdb.db.queryengine.plan.Coordinator;
7778
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
7879
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask;
@@ -173,6 +174,10 @@ public static Iterator<TsBlock> getSupplier(
173174
return new DataNodesSupplier(dataTypes, userEntity);
174175
case InformationSchema.CONNECTIONS:
175176
return new ConnectionsSupplier(dataTypes, userEntity);
177+
case InformationSchema.CURRENT_QUERIES:
178+
return new CurrentQueriesSupplier(dataTypes, userEntity);
179+
case InformationSchema.QUERIES_COSTS_HISTOGRAM:
180+
return new QueriesCostsHistogramSupplier(dataTypes, userEntity);
176181
default:
177182
throw new UnsupportedOperationException("Unknown table: " + tableName);
178183
}
@@ -208,14 +213,11 @@ protected void constructLine() {
208213
final IQueryExecution queryExecution = queryExecutions.get(nextConsumedIndex);
209214

210215
if (queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
211-
final String[] splits = queryExecution.getQueryId().split("_");
212-
final int dataNodeId = Integer.parseInt(splits[splits.length - 1]);
213-
214216
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId()));
215217
columnBuilders[1].writeLong(
216218
TimestampPrecisionUtils.convertToCurrPrecision(
217219
queryExecution.getStartExecutionTime(), TimeUnit.MILLISECONDS));
218-
columnBuilders[2].writeInt(dataNodeId);
220+
columnBuilders[2].writeInt(QueryId.getDataNodeId(queryExecution.getQueryId()));
219221
columnBuilders[3].writeFloat(
220222
(float) (currTime - queryExecution.getStartExecutionTime()) / 1000);
221223
columnBuilders[4].writeBinary(
@@ -1294,4 +1296,140 @@ public boolean hasNext() {
12941296
return sessionConnectionIterator.hasNext();
12951297
}
12961298
}
1299+
1300+
private static class CurrentQueriesSupplier extends TsBlockSupplier {
1301+
protected int nextConsumedIndex;
1302+
private List<Coordinator.StatedQueriesInfo> queriesInfo;
1303+
1304+
private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) {
1305+
super(dataTypes);
1306+
queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
1307+
try {
1308+
accessControl.checkUserGlobalSysPrivilege(userEntity);
1309+
} catch (final AccessDeniedException e) {
1310+
queriesInfo =
1311+
queriesInfo.stream()
1312+
.filter(iQueryInfo -> userEntity.getUsername().equals(iQueryInfo.getUser()))
1313+
.collect(Collectors.toList());
1314+
}
1315+
}
1316+
1317+
@Override
1318+
protected void constructLine() {
1319+
final Coordinator.StatedQueriesInfo queryInfo = queriesInfo.get(nextConsumedIndex);
1320+
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
1321+
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
1322+
columnBuilders[2].writeLong(
1323+
TimestampPrecisionUtils.convertToCurrPrecision(
1324+
queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
1325+
if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
1326+
columnBuilders[3].appendNull();
1327+
} else {
1328+
columnBuilders[3].writeLong(
1329+
TimestampPrecisionUtils.convertToCurrPrecision(
1330+
queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
1331+
}
1332+
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
1333+
columnBuilders[5].writeFloat(queryInfo.getCostTime());
1334+
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
1335+
columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
1336+
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
1337+
resultBuilder.declarePosition();
1338+
nextConsumedIndex++;
1339+
}
1340+
1341+
@Override
1342+
public boolean hasNext() {
1343+
return nextConsumedIndex < queriesInfo.size();
1344+
}
1345+
}
1346+
1347+
private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
1348+
protected int nextConsumedIndex;
1349+
private static final Binary[] BUCKETS =
1350+
new Binary[] {
1351+
BytesUtils.valueOf("[0,1)"),
1352+
BytesUtils.valueOf("[1,2)"),
1353+
BytesUtils.valueOf("[2,3)"),
1354+
BytesUtils.valueOf("[3,4)"),
1355+
BytesUtils.valueOf("[4,5)"),
1356+
BytesUtils.valueOf("[5,6)"),
1357+
BytesUtils.valueOf("[6,7)"),
1358+
BytesUtils.valueOf("[7,8)"),
1359+
BytesUtils.valueOf("[8,9)"),
1360+
BytesUtils.valueOf("[9,10)"),
1361+
BytesUtils.valueOf("[10,11)"),
1362+
BytesUtils.valueOf("[11,12)"),
1363+
BytesUtils.valueOf("[12,13)"),
1364+
BytesUtils.valueOf("[13,14)"),
1365+
BytesUtils.valueOf("[14,15)"),
1366+
BytesUtils.valueOf("[15,16)"),
1367+
BytesUtils.valueOf("[16,17)"),
1368+
BytesUtils.valueOf("[17,18)"),
1369+
BytesUtils.valueOf("[18,19)"),
1370+
BytesUtils.valueOf("[19,20)"),
1371+
BytesUtils.valueOf("[20,21)"),
1372+
BytesUtils.valueOf("[21,22)"),
1373+
BytesUtils.valueOf("[22,23)"),
1374+
BytesUtils.valueOf("[23,24)"),
1375+
BytesUtils.valueOf("[24,25)"),
1376+
BytesUtils.valueOf("[25,26)"),
1377+
BytesUtils.valueOf("[26,27)"),
1378+
BytesUtils.valueOf("[27,28)"),
1379+
BytesUtils.valueOf("[28,29)"),
1380+
BytesUtils.valueOf("[29,30)"),
1381+
BytesUtils.valueOf("[30,31)"),
1382+
BytesUtils.valueOf("[31,32)"),
1383+
BytesUtils.valueOf("[32,33)"),
1384+
BytesUtils.valueOf("[33,34)"),
1385+
BytesUtils.valueOf("[34,35)"),
1386+
BytesUtils.valueOf("[35,36)"),
1387+
BytesUtils.valueOf("[36,37)"),
1388+
BytesUtils.valueOf("[37,38)"),
1389+
BytesUtils.valueOf("[38,39)"),
1390+
BytesUtils.valueOf("[39,40)"),
1391+
BytesUtils.valueOf("[40,41)"),
1392+
BytesUtils.valueOf("[41,42)"),
1393+
BytesUtils.valueOf("[42,43)"),
1394+
BytesUtils.valueOf("[43,44)"),
1395+
BytesUtils.valueOf("[44,45)"),
1396+
BytesUtils.valueOf("[45,46)"),
1397+
BytesUtils.valueOf("[46,47)"),
1398+
BytesUtils.valueOf("[47,48)"),
1399+
BytesUtils.valueOf("[48,49)"),
1400+
BytesUtils.valueOf("[49,50)"),
1401+
BytesUtils.valueOf("[50,51)"),
1402+
BytesUtils.valueOf("[51,52)"),
1403+
BytesUtils.valueOf("[52,53)"),
1404+
BytesUtils.valueOf("[53,54)"),
1405+
BytesUtils.valueOf("[54,55)"),
1406+
BytesUtils.valueOf("[55,56)"),
1407+
BytesUtils.valueOf("[56,57)"),
1408+
BytesUtils.valueOf("[57,58)"),
1409+
BytesUtils.valueOf("[58,59)"),
1410+
BytesUtils.valueOf("[59,60)"),
1411+
BytesUtils.valueOf("60+")
1412+
};
1413+
private int[] currentQueriesCostHistogram;
1414+
1415+
private QueriesCostsHistogramSupplier(
1416+
final List<TSDataType> dataTypes, final UserEntity userEntity) {
1417+
super(dataTypes);
1418+
accessControl.checkUserGlobalSysPrivilege(userEntity);
1419+
currentQueriesCostHistogram = Coordinator.getInstance().getCurrentQueriesCostHistogram();
1420+
}
1421+
1422+
@Override
1423+
protected void constructLine() {
1424+
columnBuilders[0].writeBinary(BUCKETS[nextConsumedIndex]);
1425+
columnBuilders[1].writeInt(currentQueriesCostHistogram[nextConsumedIndex]);
1426+
resultBuilder.declarePosition();
1427+
nextConsumedIndex++;
1428+
}
1429+
1430+
@Override
1431+
public boolean hasNext() {
1432+
return nextConsumedIndex < 61;
1433+
}
1434+
}
12971435
}

0 commit comments

Comments
 (0)