Skip to content
Open
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
88178c4
remove internal auditor
CRZbulabula Sep 23, 2025
ca82a9a
stash 4 collaborate
CRZbulabula Sep 24, 2025
2373adb
Update PermissionManager.java
Caideyipi Sep 24, 2025
1819607
Merge new pipe privilege (#16476)
Caideyipi Sep 24, 2025
3c5d69d
entity
Caideyipi Sep 24, 2025
cffbe6d
FIX COMPILE BUGS
CRZbulabula Sep 24, 2025
6efdb68
Merge branch 'fix-audit-logger' of https://github.com/apache/iotdb in…
CRZbulabula Sep 24, 2025
8f23025
fix
Caideyipi Sep 24, 2025
342b253
fix
Caideyipi Sep 24, 2025
5af7057
fix
Caideyipi Sep 24, 2025
cd6ad89
fix
Caideyipi Sep 25, 2025
11c78af
fix
Caideyipi Sep 25, 2025
df187a1
little
Caideyipi Sep 25, 2025
13e4fdd
fix
Caideyipi Sep 25, 2025
2c6ead7
remove internal auditor
CRZbulabula Sep 23, 2025
8858086
stash 4 collaborate
CRZbulabula Sep 24, 2025
8e96f8d
FIX COMPILE BUGS
CRZbulabula Sep 24, 2025
b42cd44
Update PermissionManager.java
Caideyipi Sep 24, 2025
4249f51
Merge new pipe privilege (#16476)
Caideyipi Sep 24, 2025
de63e47
entity
Caideyipi Sep 24, 2025
f953015
fix
Caideyipi Sep 24, 2025
8b13d1d
fix
Caideyipi Sep 24, 2025
f03f520
fix
Caideyipi Sep 24, 2025
4629947
fix
Caideyipi Sep 25, 2025
3202503
fix
Caideyipi Sep 25, 2025
ede612e
little
Caideyipi Sep 25, 2025
d38d829
fix
Caideyipi Sep 25, 2025
27530de
Merge branch 'fix-audit-logger' of https://github.com/apache/iotdb in…
Caideyipi Sep 25, 2025
ec8fa89
partial
Caideyipi Sep 25, 2025
d8e534a
append log 4 tree
CRZbulabula Sep 23, 2025
dff452f
Update PermissionManager.java
Caideyipi Sep 24, 2025
ac20629
fix pipe bugs
Caideyipi Sep 24, 2025
4c93772
Bug fix (#16481)
CRZbulabula Sep 25, 2025
e129bb5
Merge branch 'fix-audit-logger' of https://github.com/apache/iotdb in…
Caideyipi Sep 25, 2025
fc61153
refactor
Caideyipi Sep 25, 2025
3316df2
fix-ut
Caideyipi Sep 25, 2025
68162c2
Audit CI 4 table (#16483)
CRZbulabula Sep 25, 2025
b62273f
Merge branch 'master' into fix-audit-logger
CRZbulabula Sep 26, 2025
5f7f917
spotless
CRZbulabula Sep 26, 2025
3b4185e
Update DNAuditLogger.java
CRZbulabula Sep 26, 2025
2b406c2
add more logs
CRZbulabula Sep 26, 2025
fa69808
fix ci
CRZbulabula Sep 26, 2025
4629bb3
Update IoTDBAuditLogBasicIT.java
CRZbulabula Sep 26, 2025
bd49313
Update WriteBackSink.java
Caideyipi Sep 26, 2025
bce3e39
Merge branch 'fix-audit-logger' of https://github.com/apache/iotdb in…
Caideyipi Sep 26, 2025
daa9aaf
FIx an audit version
CRZbulabula Sep 26, 2025
9d41c37
Merge branch 'fix-audit-logger' of https://github.com/apache/iotdb in…
Caideyipi Sep 26, 2025
06651a1
fix
Caideyipi Sep 26, 2025
8a012f2
Merge branch 'master' into fix-audit-logger
CRZbulabula Sep 26, 2025
7afa223
Update IAuthorPlanExecutor.java
CRZbulabula Sep 26, 2025
87012d2
fix
Caideyipi Sep 26, 2025
8c0ac57
fix
Caideyipi Sep 26, 2025
be44fd7
fix
Caideyipi Sep 26, 2025
9c77adf
fix
Caideyipi Sep 26, 2025
9b79914
try-fix
Caideyipi Sep 26, 2025
5777423
fix
Caideyipi Sep 26, 2025
374886b
fix
Caideyipi Sep 26, 2025
2179351
Move password history under __audit (#16496)
CRZbulabula Sep 27, 2025
69d1381
Audit log patch for both tree and table models (#16497)
CRZbulabula Sep 27, 2025
186837f
Pipe: Reduced the conversion logger & Fixed the illegal formats of Pi…
Caideyipi Sep 28, 2025
4d8d1ff
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Sep 30, 2025
d7e52e9
rest
Caideyipi Sep 30, 2025
cafd7aa
refactor
Caideyipi Sep 30, 2025
bc371f9
fix
Caideyipi Sep 30, 2025
aabe1bf
fix
Caideyipi Sep 30, 2025
fee0fca
fix
Caideyipi Sep 30, 2025
1814f34
fix
Caideyipi Sep 30, 2025
0a183b9
fix
Caideyipi Sep 30, 2025
5303355
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Oct 9, 2025
b7c4b44
fix
Caideyipi Oct 9, 2025
85c6bce
refactor
Caideyipi Oct 9, 2025
644176d
fix
Caideyipi Oct 10, 2025
4337d4a
fix
Caideyipi Oct 10, 2025
8f64acd
fix
Caideyipi Oct 10, 2025
b0d8269
unwebbed-fish
Caideyipi Oct 10, 2025
327973e
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Oct 10, 2025
f4344ec
refactor
Caideyipi Oct 10, 2025
3996d4e
fix
Caideyipi Oct 10, 2025
e5dd2aa
fix
Caideyipi Oct 11, 2025
7ea8163
add-IT
Caideyipi Oct 11, 2025
c45305c
fix
Caideyipi Oct 15, 2025
e5a9799
Merge remote-tracking branch 'upstream/master' into fix-audit-logger
Caideyipi Oct 31, 2025
d22189f
fix
Caideyipi Oct 31, 2025
8e7ed04
Merge remote-tracking branch 'upstream/master' into fix-audit-logger
Caideyipi Nov 14, 2025
7fb2c09
fix
Caideyipi Nov 14, 2025
9bb3dcd
user-null-fix
Caideyipi Nov 17, 2025
c86cf29
fix-some
Caideyipi Nov 17, 2025
0df40e3
user
Caideyipi Nov 17, 2025
90dcbd3
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Nov 17, 2025
9a4b680
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Nov 20, 2025
088bd1c
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Dec 3, 2025
86c9cf9
refactor
Caideyipi Dec 3, 2025
0353c6b
ap
Caideyipi Dec 3, 2025
4ebc890
fix
Caideyipi Dec 3, 2025
ab881e1
fix
Caideyipi Dec 3, 2025
09c0b70
fix
Caideyipi Dec 3, 2025
45d3200
apply
Caideyipi Dec 3, 2025
c08b962
codecov
Caideyipi Dec 3, 2025
3d6ff40
fix
Caideyipi Dec 4, 2025
705f73d
f
Caideyipi Dec 4, 2025
97476bb
Merge branch 'master' of https://github.com/apache/iotdb into fix-aud…
Caideyipi Dec 5, 2025
6e98cbd
codecov
Caideyipi Dec 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -976,11 +976,28 @@ public static void executeNonQueriesWithRetry(
}
}

public static void executeNonQuery(BaseEnv env, String sql, Connection defaultConnection) {
public static void executeNonQuery(final BaseEnv env, final String sql) {
executeNonQuery(env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, null);
}

public static void executeNonQuery(
final BaseEnv env, final String sql, final Connection defaultConnection) {
executeNonQuery(
env, sql, SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD, defaultConnection);
}

public static void executeNonQuery(
final String dataBaseName, final String sqlDialect, final BaseEnv env, final String sql) {
executeNonQuery(
env,
sql,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
dataBaseName,
sqlDialect,
null);
}

public static void executeNonQuery(
String dataBaseName,
String sqlDialect,
Expand Down Expand Up @@ -1020,6 +1037,17 @@ public static void executeNonQuery(
defaultConnection);
}

public static void executeNonQueries(BaseEnv env, List<String> sqlList) {
executeNonQueries(
env,
sqlList,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
null,
TREE_SQL_DIALECT,
null);
}

public static void executeNonQueries(
BaseEnv env, List<String> sqlList, Connection defaultConnection) {
executeNonQueries(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ public void testSourcePermission() {

// Grant some privilege
TestUtils.executeNonQuery(
"test", BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant INSERT on any to user thulab", null);
"test", BaseEnv.TABLE_SQL_DIALECT, senderEnv, "grant INSERT on any to user thulab");

TableModelUtils.createDataBaseAndTable(senderEnv, "test1", "test1");

// Shall not be transferred
// Shall be transferred
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show tables from test1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,28 +215,28 @@ private void testTableConfigIdempotent(final List<String> beforeSqlList, final S

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "");
extractorAttributes.put("extractor.forwarding-pipe-requests", "false");
extractorAttributes.put("extractor.capture.table", "true");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.exception.conflict.resolve-strategy", "retry");
connectorAttributes.put("connector.exception.conflict.retry-max-time-seconds", "-1");
final Map<String, String> sinkAttributes = new HashMap<>();

sourceAttributes.put("source.inclusion", "all");
sourceAttributes.put("source.inclusion.exclusion", "");
sourceAttributes.put("source.forwarding-pipe-requests", "false");
sourceAttributes.put("source.capture.table", "true");
sourceAttributes.put("source.capture.tree", "false");
sourceAttributes.put("user", "root");

sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.exception.conflict.resolve-strategy", "retry");
sinkAttributes.put("sink.exception.conflict.retry-max-time-seconds", "-1");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,23 @@ public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"),
null);

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "log");
sourceAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.realtime-first", "false");
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink.realtime-first", "false");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Expand Down Expand Up @@ -133,25 +134,26 @@ private void testSinkFormat(final String format) throws Exception {
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
null);

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "forced-log");
sourceAttributes.put("source.realtime.mode", "forced-log");
sourceAttributes.put("user", "root");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.format", format);
connectorAttributes.put("connector.realtime-first", "false");
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink.format", format);
sinkAttributes.put("sink.realtime-first", "false");

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Expand All @@ -178,8 +180,8 @@ private void testSinkFormat(final String format) throws Exception {
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Expand Down Expand Up @@ -211,24 +213,25 @@ public void testLegacyConnector() throws Exception {

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

extractorAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("source.realtime.mode", "log");
sourceAttributes.put("user", "root");

connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
connectorAttributes.put("sink.batch.enable", "false");
connectorAttributes.put("sink.ip", receiverIp);
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink", "iotdb-legacy-pipe-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));

// This version does not matter since it's no longer checked by the legacy receiver
connectorAttributes.put("sink.version", "1.3");
sinkAttributes.put("sink.version", "1.3");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Expand Down Expand Up @@ -257,6 +260,7 @@ public void testReceiverAutoCreateByLog() throws Exception {
new HashMap<String, String>() {
{
put("source.realtime.mode", "forced-log");
put("user", "root");
}
});
}
Expand All @@ -267,6 +271,7 @@ public void testReceiverAutoCreateByFile() throws Exception {
new HashMap<String, String>() {
{
put("source.realtime.mode", "batch");
put("user", "root");
}
});
}
Expand All @@ -278,12 +283,12 @@ public void testReceiverAutoCreateWithPattern() throws Exception {
{
put("source.realtime.mode", "batch");
put("source.path", "root.ln.wf01.wt0*.*");
put("user", "root");
}
});
}

private void testReceiverAutoCreate(final Map<String, String> extractorAttributes)
throws Exception {
private void testReceiverAutoCreate(final Map<String, String> sourceAttributes) throws Exception {
final Consumer<String> handleFailure =
o -> {
TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
Expand All @@ -298,17 +303,17 @@ private void testReceiverAutoCreate(final Map<String, String> extractorAttribute
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

connectorAttributes.put("sink", "iotdb-thrift-sink");
connectorAttributes.put("sink.batch.enable", "false");
connectorAttributes.put("sink.ip", receiverIp);
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());
Expand Down Expand Up @@ -389,24 +394,25 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
null);

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "forced-log");
sourceAttributes.put("source.realtime.mode", "forced-log");
sourceAttributes.put("user", "root");

connectorAttributes.put("sink", "iotdb-thrift-sink");
connectorAttributes.put("sink.batch.enable", "false");
connectorAttributes.put("sink.ip", receiverIp);
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Expand All @@ -433,8 +439,8 @@ private void testReceiverLoadTsFile(final String loadTsFileStrategy) throws Exce
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Expand Down Expand Up @@ -483,25 +489,26 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"),
null);

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> sourceAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
final Map<String, String> sinkAttributes = new HashMap<>();

extractorAttributes.put("extractor.realtime.mode", "batch");
sourceAttributes.put("source.realtime.mode", "batch");
sourceAttributes.put("user", "root");

connectorAttributes.put("sink", "iotdb-thrift-sink");
connectorAttributes.put("sink.batch.enable", "false");
connectorAttributes.put("sink.ip", receiverIp);
connectorAttributes.put("sink.port", Integer.toString(receiverPort));
connectorAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
connectorAttributes.put("sink.tsfile.validation", "false");
sinkAttributes.put("sink", "iotdb-thrift-sink");
sinkAttributes.put("sink.batch.enable", "false");
sinkAttributes.put("sink.ip", receiverIp);
sinkAttributes.put("sink.port", Integer.toString(receiverPort));
sinkAttributes.put("sink.load-tsfile-strategy", loadTsFileStrategy);
sinkAttributes.put("sink.tsfile.validation", "false");

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(processorAttributes))
.getCode());

Expand Down
Loading
Loading