Skip to content

Commit 78a01cf

Browse files
zerolbsonyJackieTien97
authored andcommitted
Add system table named connections to resolve the idle session can be found (#16846)
(cherry picked from commit 1746cdb)
1 parent d5c37ef commit 78a01cf

File tree

9 files changed

+526
-1
lines changed

9 files changed

+526
-1
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ public void testInformationSchema() throws SQLException {
398398
"columns,INF,",
399399
"config_nodes,INF,",
400400
"configurations,INF,",
401+
"connections,INF,",
401402
"data_nodes,INF,",
402403
"databases,INF,",
403404
"functions,INF,",
@@ -644,12 +645,13 @@ public void testInformationSchema() throws SQLException {
644645
"information_schema,nodes,INF,USING,null,SYSTEM VIEW,",
645646
"information_schema,config_nodes,INF,USING,null,SYSTEM VIEW,",
646647
"information_schema,data_nodes,INF,USING,null,SYSTEM VIEW,",
648+
"information_schema,connections,INF,USING,null,SYSTEM VIEW,",
647649
"test,test,INF,USING,test,BASE TABLE,",
648650
"test,view_table,100,USING,null,VIEW FROM TREE,")));
649651
TestUtils.assertResultSetEqual(
650652
statement.executeQuery("count devices from tables where status = 'USING'"),
651653
"count(devices),",
652-
Collections.singleton("19,"));
654+
Collections.singleton("20,"));
653655
TestUtils.assertResultSetEqual(
654656
statement.executeQuery(
655657
"select * from columns where table_name = 'queries' or database = 'test'"),
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.session.it;
21+
22+
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
23+
import org.apache.iotdb.commons.cluster.NodeStatus;
24+
import org.apache.iotdb.commons.conf.CommonDescriptor;
25+
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
26+
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
27+
import org.apache.iotdb.it.env.EnvFactory;
28+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
29+
import org.apache.iotdb.itbase.category.TableClusterIT;
30+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
31+
import org.apache.iotdb.itbase.env.BaseEnv;
32+
33+
import org.junit.AfterClass;
34+
import org.junit.Assert;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
import org.junit.runner.RunWith;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
import java.sql.Connection;
43+
import java.sql.ResultSet;
44+
import java.sql.ResultSetMetaData;
45+
import java.sql.SQLException;
46+
import java.sql.Statement;
47+
import java.util.HashSet;
48+
import java.util.Set;
49+
import java.util.concurrent.TimeUnit;
50+
import java.util.concurrent.atomic.AtomicBoolean;
51+
52+
import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
53+
import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
54+
import static org.junit.Assert.fail;
55+
56+
@RunWith(IoTDBTestRunner.class)
57+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
58+
public class IoTDBConnectionsIT {
59+
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConnectionsIT.class);
60+
private static final String SHOW_DATANODES = "show datanodes";
61+
private static final int COLUMN_AMOUNT = 6;
62+
private static Set<Integer> allDataNodeId = new HashSet<>();
63+
64+
@BeforeClass
65+
public static void setUp() throws Exception {
66+
EnvFactory.getEnv().getConfig().getCommonConfig();
67+
EnvFactory.getEnv().initClusterEnvironment(1, 2);
68+
createUser("test", "test123123456");
69+
try (Connection connection = EnvFactory.getEnv().getTableConnection();
70+
Statement statement = connection.createStatement()) {
71+
// Get all data nodes
72+
ResultSet result = statement.executeQuery(SHOW_DATANODES);
73+
while (result.next()) {
74+
allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
75+
}
76+
}
77+
}
78+
79+
@AfterClass
80+
public static void tearDown() throws Exception {
81+
EnvFactory.getEnv().cleanClusterEnvironment();
82+
}
83+
84+
// Create two connections on the different datanode, validate normal test case.
85+
@Test
86+
public void testDifferentDataNodeGetConnections() {
87+
Connection conn = null;
88+
int dataNodeId = (int) allDataNodeId.toArray()[0];
89+
// Create the first connection on the datanode.
90+
try {
91+
Connection connection =
92+
EnvFactory.getEnv()
93+
.getConnection(
94+
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
95+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
96+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
97+
BaseEnv.TABLE_SQL_DIALECT);
98+
Statement statement = connection.createStatement();
99+
statement.execute("USE information_schema");
100+
ResultSet resultSet = statement.executeQuery("SELECT * FROM connections");
101+
if (!resultSet.next()) {
102+
fail();
103+
}
104+
105+
ResultSetMetaData metaData = resultSet.getMetaData();
106+
Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
107+
while (resultSet.next()) {
108+
LOGGER.info(
109+
"{}, {}, {}, {}, {}, {}",
110+
resultSet.getString(1),
111+
resultSet.getString(2),
112+
resultSet.getString(3),
113+
resultSet.getString(4),
114+
resultSet.getString(5),
115+
resultSet.getString(6));
116+
}
117+
118+
conn = connection;
119+
} catch (Exception e) {
120+
LOGGER.error("{}", e.getMessage(), e);
121+
fail(e.getMessage());
122+
}
123+
124+
int anotherDataNodeId = (int) allDataNodeId.toArray()[1];
125+
// Create the second connection on the datanode.
126+
try (Connection connection1 =
127+
EnvFactory.getEnv()
128+
.getConnection(
129+
EnvFactory.getEnv().dataNodeIdToWrapper(anotherDataNodeId).get(),
130+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
131+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
132+
BaseEnv.TABLE_SQL_DIALECT);
133+
Statement statement1 = connection1.createStatement()) {
134+
statement1.execute("USE information_schema");
135+
ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM connections");
136+
if (!resultSet1.next()) {
137+
fail();
138+
}
139+
140+
while (resultSet1.next()) {
141+
// Before close the first connection, the current record count must be two.
142+
Assert.assertEquals(2, resultSet1.getInt(1));
143+
}
144+
145+
conn.close();
146+
147+
ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM connections");
148+
if (!resultSet2.next()) {
149+
fail();
150+
}
151+
152+
while (resultSet2.next()) {
153+
// After close the first connection, the current record count change into one.
154+
Assert.assertEquals(1, resultSet2.getInt(1));
155+
}
156+
} catch (Exception e) {
157+
LOGGER.error("{}", e.getMessage(), e);
158+
fail(e.getMessage());
159+
}
160+
}
161+
162+
// Create two connections on the same datanode, validate normal test case.
163+
@Test
164+
public void testSameDataNodeGetConnections() {
165+
Connection conn = null;
166+
int dataNodeId = (int) allDataNodeId.toArray()[0];
167+
try (Connection connection =
168+
EnvFactory.getEnv()
169+
.getConnection(
170+
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
171+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
172+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
173+
BaseEnv.TABLE_SQL_DIALECT);
174+
Statement statement = connection.createStatement()) {
175+
statement.execute("USE information_schema");
176+
177+
ResultSet resultSet =
178+
statement.executeQuery(
179+
"SELECT * FROM connections WHERE data_node_id = '" + dataNodeId + "'");
180+
if (!resultSet.next()) {
181+
fail();
182+
}
183+
184+
ResultSetMetaData metaData = resultSet.getMetaData();
185+
Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
186+
while (resultSet.next()) {
187+
LOGGER.info(
188+
"{}, {}, {}, {}, {}, {}",
189+
resultSet.getString(1),
190+
resultSet.getString(2),
191+
resultSet.getString(3),
192+
resultSet.getString(4),
193+
resultSet.getTimestamp(5),
194+
resultSet.getString(6));
195+
}
196+
197+
conn = connection;
198+
} catch (Exception e) {
199+
LOGGER.error("{}", e.getMessage(), e);
200+
fail(e.getMessage());
201+
}
202+
203+
// Create the second connection on the same datanode.
204+
try (Connection connection1 =
205+
EnvFactory.getEnv()
206+
.getConnection(
207+
EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
208+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
209+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
210+
BaseEnv.TABLE_SQL_DIALECT);
211+
Statement statement1 = connection1.createStatement()) {
212+
statement1.execute("USE information_schema");
213+
ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM connections");
214+
if (!resultSet1.next()) {
215+
fail();
216+
}
217+
218+
while (resultSet1.next()) {
219+
// Before close the first connection, the current record count must be two.
220+
Assert.assertEquals(2, resultSet1.getInt(1));
221+
}
222+
223+
conn.close();
224+
225+
ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM connections");
226+
if (!resultSet2.next()) {
227+
fail();
228+
}
229+
230+
while (resultSet2.next()) {
231+
// After close the first connection, the current record count change into one.
232+
Assert.assertEquals(1, resultSet2.getInt(1));
233+
}
234+
} catch (Exception e) {
235+
LOGGER.error("{}", e.getMessage(), e);
236+
fail(e.getMessage());
237+
}
238+
}
239+
240+
// Validate normal test case when close one datanode.
241+
@Test
242+
public void testClosedDataNodeGetConnections() throws Exception {
243+
if (allDataNodeId.size() <= 1) {
244+
return;
245+
}
246+
int closedDataNodeId = (int) allDataNodeId.toArray()[0];
247+
try (Connection connection =
248+
EnvFactory.getEnv()
249+
.getConnection(
250+
EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get(),
251+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
252+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
253+
BaseEnv.TABLE_SQL_DIALECT);
254+
Statement statement = connection.createStatement()) {
255+
statement.execute("USE information_schema");
256+
257+
ResultSet resultSet =
258+
statement.executeQuery(
259+
"SELECT COUNT(*) FROM connections WHERE data_node_id = '" + closedDataNodeId + "'");
260+
if (!resultSet.next()) {
261+
fail();
262+
}
263+
// All records corresponding the datanode exist Before close the datanode. Validate result
264+
// larger than zero.
265+
Assert.assertTrue(resultSet.getInt(1) > 0);
266+
} catch (Exception e) {
267+
LOGGER.error("{}", e.getMessage(), e);
268+
fail(e.getMessage());
269+
}
270+
271+
// close the number closedDataNodeId datanode
272+
EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().stop();
273+
try (SyncConfigNodeIServiceClient client =
274+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
275+
276+
// Wait for shutdown check
277+
while (true) {
278+
AtomicBoolean containUnknown = new AtomicBoolean(false);
279+
TShowDataNodesResp showDataNodesResp = client.showDataNodes();
280+
showDataNodesResp
281+
.getDataNodesInfoList()
282+
.forEach(
283+
dataNodeInfo -> {
284+
if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
285+
containUnknown.set(true);
286+
}
287+
});
288+
289+
if (containUnknown.get()) {
290+
break;
291+
}
292+
TimeUnit.SECONDS.sleep(1);
293+
}
294+
}
295+
296+
int activeDataNodeId = (int) allDataNodeId.toArray()[1];
297+
try (Connection connection =
298+
EnvFactory.getEnv()
299+
.getConnection(
300+
EnvFactory.getEnv().dataNodeIdToWrapper(activeDataNodeId).get(),
301+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
302+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
303+
BaseEnv.TABLE_SQL_DIALECT);
304+
Statement statement = connection.createStatement()) {
305+
statement.execute("USE information_schema");
306+
307+
ResultSet resultSet =
308+
statement.executeQuery(
309+
"SELECT COUNT(*) FROM connections WHERE data_node_id = '" + closedDataNodeId + "'");
310+
if (!resultSet.next()) {
311+
fail();
312+
}
313+
// All records corresponding the datanode will be cleared After close the datanode. Validate
314+
// result if it is zero.
315+
Assert.assertEquals(0, resultSet.getLong(1));
316+
} catch (Exception e) {
317+
LOGGER.error("{}", e.getMessage(), e);
318+
fail(e.getMessage());
319+
}
320+
321+
// revert environment
322+
EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().start();
323+
try (SyncConfigNodeIServiceClient client =
324+
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
325+
// Wait for restart check
326+
while (true) {
327+
AtomicBoolean containUnknown = new AtomicBoolean(false);
328+
TShowDataNodesResp showDataNodesResp = client.showDataNodes();
329+
showDataNodesResp
330+
.getDataNodesInfoList()
331+
.forEach(
332+
dataNodeInfo -> {
333+
if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
334+
containUnknown.set(true);
335+
}
336+
});
337+
338+
if (!containUnknown.get()) {
339+
break;
340+
}
341+
TimeUnit.SECONDS.sleep(1);
342+
}
343+
}
344+
}
345+
346+
@Test
347+
public void testNoAuthUserGetConnections() {
348+
try (Connection connection =
349+
EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT);
350+
Statement statement = connection.createStatement()) {
351+
statement.execute("USE information_schema");
352+
ResultSet resultSet = statement.executeQuery("SELECT * FROM connections");
353+
if (!resultSet.next()) {
354+
fail();
355+
}
356+
ResultSetMetaData metaData = resultSet.getMetaData();
357+
Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
358+
} catch (SQLException e) {
359+
Assert.assertEquals(
360+
"803: Access Denied: No permissions for this operation, please add privilege SYSTEM",
361+
e.getMessage());
362+
}
363+
}
364+
}

0 commit comments

Comments
 (0)