Skip to content

feat: add DolphinDB v2.x + v3.x adapters (MTW writes + native query API)#540

Merged
JackieTien97 merged 53 commits into
masterfrom
dolphindb
May 30, 2026
Merged

feat: add DolphinDB v2.x + v3.x adapters (MTW writes + native query API)#540
JackieTien97 merged 53 commits into
masterfrom
dolphindb

Conversation

@JackieTien97
Copy link
Copy Markdown
Collaborator

@JackieTien97 JackieTien97 commented May 28, 2026

Summary

  • 新增 dolphindb-3.0dolphindb-2.0 两个 Maven 模块,适配 DolphinDB v2.x / v3.x 作为 iot-benchmark 测试目标
  • 写入采用 DolphinDB 官方推荐的 MultithreadedTableWriter(每 client 线程一个 MTW 实例,threadCount=1,保持与其它适配器一致的并发语义);查询使用 原生 DBConnection API(dolphindb-javaapi 未注册 JDBC SPI)
  • 单宽表 device_data,DFS 复合分区:一级 RANGE(ts) 按 7 天、二级 HASH([SYMBOL, 1000]) on deviceId,对齐 DolphinDB 官方 IoT 示例
  • 11 种 SensorType 全覆盖(BOOLEAN/INT32/INT64/FLOAT/DOUBLE/TEXT/STRING/BLOB/TIMESTAMP/DATE/OBJECT);core 加 DolphinDB 例外允许非 IoTDB 数据库使用 STRING/BLOB/TIMESTAMP/DATE/OBJECT
  • 两个模块代码完全同构(Java API 在 v2/v3 之间无破坏性变更),仅 pom 依赖版本不同:
    • dolphindb-3.0com.dolphindb:dolphindb-javaapi:3.00.0.2
    • dolphindb-2.0com.dolphindb:dolphindb-javaapi:2.00.11.1

Key changes

范畴 内容
core 集成 DBType / DBVersion (DolphinDB_2 / DolphinDB_3) / DBSwitch (DB_DOLPHINDB_2 / DB_DOLPHINDB_3) / Constants / Config / ConfigDescriptor / config.properties / DBFactory
新增配置 DOLPHINDB_PARTITION_DAYS=7DOLPHINDB_DEVICE_HASH_BUCKETS=1000
新增模块 dolphindb-3.0/dolphindb-2.0/(各含 pom + assembly + log4j + DolphinDB.java + 单元测试)
文档 README × 2、DifferentTestDatabase × 2、OperationSupportMatrix × 2
设计/计划 docs/superpowers/specs/2026-05-28-dolphindb-support-design.mddocs/superpowers/plans/2026-05-28-dolphindb-support.md

Test plan

  • mvn test -pl core,dolphindb-3.0,dolphindb-2.0 通过(含两个模块的 DolphinDBTypeMapTest,各覆盖 11 个 SensorType)
  • mvn spotless:check 通过
  • DolphinDB v3.x 本地 Docker(dolphindb/dolphindb:v3.00.5)端到端跑通:
    • Smoke test(OPERATION_PROPORTION=1:0:...):INGESTION okOperation=1000、failOperation=0、~131K point/s;DolphinDB 侧 count(*)=100000distinct deviceId=10
    • Mixed test(OPERATION_PROPORTION=1:1:...:1,LOOP=500):13 个操作(Write + Q1–Q12)全部 okOperation>0failOperation=0
  • DolphinDB v2.x 本地 Docker(dolphindb/dolphindb:v2.00.18)端到端跑通:
    • Smoke test:INGESTION okOperation=1000、failOperation=0、~147K point/s;DolphinDB 侧 count(*)=100000distinct deviceId=10
    • Mixed test:13 个操作全部 okOperation>0failOperation=0零代码改动(v2 vs v3 Java API 表面无差异)

Follow-ups(不阻塞本 PR)

  • awaitDrain() 增加超时上限(DolphinDB 假死时避免无限阻塞)
  • convertValue DATE 路径加显式 LocalDate 强制路径

🤖 Generated with Claude Code

JackieTien97 and others added 30 commits May 28, 2026 11:30
新增 DolphinDB 适配器的完整设计文档(dolphindb-3.0 模块),
覆盖模块注册改动清单、MTW 写入路径、JDBC 查询路径、
Docker 安装与 e2e 测试流程。

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
按 spec 拆出 30 个 bite-sized 任务,覆盖:
- Phase A (Tasks 1-8): core 模块 enum/Config/DBFactory/config.properties 改动
- Phase B (Tasks 9-13): dolphindb-3.0 模块脚手架 + 构建验证
- Phase C (Tasks 14-24): DolphinDB.java 写入 + 13 个查询实现
- Phase D (Tasks 25-27): Docker 启动 + smoke + 混合读写 e2e
- Phase E (Tasks 28-30): 6 个文档文件同步

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… partition boundaries

Three bugs fixed that were blocking the end-to-end smoke test:

1. Replace JDBC DriverManager with DBConnection native API: The dolphindb-javaapi
   jar does not register a java.sql.Driver service, so DriverManager.getConnection()
   always threw "No suitable driver found". Replace all JDBC usage (Connection,
   Statement, ResultSet, SQLException) with DBConnection.connect() and run(),
   and Entity.rows() for query result counting.

2. Fix RANGE partition boundary type: DolphinDB 3.x rejects timestamp() literals
   as RANGE partition boundaries ("TIMESTAMP can't be used for a partitioning
   column"). Use DATE literals in YYYY.MM.DD format instead; DolphinDB accepts
   DATE-typed boundaries for a TIMESTAMP partitioning column.

3. Fix MultithreadedTableWriter partitionCol parameter: MTW requires the
   partitionCol argument (13th constructor param) to be the actual partition
   column name ("ts") for partitioned DFS tables, not an empty string.

After these fixes, smoke test passes: INGESTION okOperation=1000 failOperation=0,
throughput ~131K point/s, 100000 rows in DolphinDB with 10 distinct deviceIds.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…GROUP_BY_DESC alias

Two bugs fixed:
1. DolphinDB's getDynamicPublicKey can fail with "No such file or directory"
   when multiple connections are established concurrently (race on the per-session
   RSA key file).  Add retry-with-back-off (up to 3 attempts, 200/400 ms sleep)
   in init() to handle this transient error.

2. groupByQueryOrderByDesc used an alias `tb` in GROUP BY and then referenced
   that alias in ORDER BY, which DolphinDB does not support (Unrecognized column
   name [tb]).  Fix: inline the full bar(ts, granularityL) expression in both
   GROUP BY and ORDER BY clauses.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@JackieTien97 JackieTien97 changed the title feat: add DolphinDB v3.x adapter (MTW writes + native query API) feat: add DolphinDB v2.x + v3.x adapters (MTW writes + native query API) May 28, 2026
JackieTien97 and others added 10 commits May 28, 2026 16:20
…iles

The spec and plan files under docs/superpowers/ are workflow artifacts
from the brainstorming / writing-plans process, not project documentation.
Remove them from the repo and add the directory to .gitignore so future
agent workspaces don't get committed accidentally.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…lumns

Both dolphindb-2.0 and dolphindb-3.0 previously dropped TAG dimensions
entirely: registerSchema only created (ts, deviceId, s_0..s_n) and
insertOneBatch ignored deviceSchema.getTags(). With TAG_NUMBER>0 the
tag values were silently lost.

Now:
- registerSchema appends `tag_0 SYMBOL, tag_1 SYMBOL, ..., tag_{N-1}
  SYMBOL` between deviceId and the sensor columns when TAG_NUMBER>0.
  Column names use config.getTAG_KEY_PREFIX() to match iotdb-2.0
  TableStrategy.
- A Sensor with ColumnCategory.TAG also maps to SYMBOL regardless of
  its declared SensorType (defensive forward-compat for table-model
  scenarios that mark certain sensors as TAG).
- insertOneBatch pre-resolves device-static tag values once per batch
  via deviceSchema.getTags() and writes them into each row.

DolphinDB allows arbitrarily many SYMBOL columns per table (each
dictionary-encoded independently, up to ~524K distinct values per
partition); the multi-factor and quant examples
(dolphindb/tutorials_cn) confirm this is the recommended pattern for
categorical low-cardinality columns.

E2E smoke (LOOP=100, DEVICE_NUMBER=10, TAG_NUMBER=2,
TAG_VALUE_CARDINALITY=5,3) against DolphinDB v2.00.18:
- INGESTION okOperation=1000, failOperation=0
- Schema: ts TIMESTAMP, deviceId SYMBOL, tag_0 SYMBOL, tag_1 SYMBOL,
  s_0..s_4 (correct)
- count(*)=100000, distinct deviceId=10, distinct tag_1=3
  (matches cardinality), distinct tag_0=4 (within cardinality 5)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous cleanup commit 0b4e018 dropped the markdown files
from tree but the .gitignore update somehow didn't land in the
same commit. Fix that now so the directory really is ignored.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…; drop DOLPHINDB_PARTITION_DAYS

TABLE_TIME_COLUMN is now shared by IoTDB table-mode and DolphinDB.
DOLPHINDB_PARTITION_DAYS is unused after DolphinDB switches to fixed
per-day VALUE partitioning.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…engine, sortColumns, keepDuplicates=LAST)

- One DFS db per group (dfs://<DB_NAME>_<group>) and one table per
  (group, table), derived from DeviceSchema.getGroup()/getTable().
- SQL CREATE DATABASE/TABLE with engine='TSDB', composite partitioning
  VALUE(<time>day) + HASH([SYMBOL, N]); VALUE date range derived from
  START_TIME and data duration with a 1-day right margin.
- sortColumns = [deviceId, tag_*, <timeColumn>] and keepDuplicates=LAST.
- Replace hard-coded "ts" with config.getTABLE_TIME_COLUMN() across
  schema, MTW partitionCol, and every SQL fragment.
- Route MTW writers and tableRef() per (group, table); cleanup() drops
  every per-group DFS db.

Applied byte-identically to both dolphindb-2.0 and dolphindb-3.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CHUNK-level atomicity allows concurrent writes to the same partition
(versus TRANS default), which fits the benchmark's many-writer workload.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A smaller default keeps the per-partition row count high enough for
realistic small/medium device counts and avoids over-fragmenting DFS.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
DolphinDB distributed SQL rejects "SELECT raw_expr, agg(...)", so the
bucket expression must not appear in SELECT. The ORDER BY clause then
needs the bucket via a GROUP BY alias: "GROUP BY bar(...) AS bucket
ORDER BY bucket DESC". Verified by full mix workload (write + all 13
query ops, 0 failOperation) on both v2.00.18 and v3.00.5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ble-mode parity

Loosen two ConfigDescriptor guards that were gating IoTDB-only and teach the
DolphinDB DBImpls to drain a MultiDeviceBatch via batch.reset()/hasNext()/next(),
mirroring what iotdb-2.0's SessionStrategy.genTablet does — every device in a
batch is guaranteed to belong to the same table, so a single MultithreadedTableWriter
picks the writer and is drained once at the end.

This lets DolphinDB run the same wide-write workload the IoTDB table-mode benchmark
expects (IoTDB_DIALECT_MODE=table + DEVICE_NUM_PER_WRITE>1) instead of being rejected
upfront.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator Author

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review:DolphinDB 适配器(对照 iotdb-2.0 表模型)

整体接入质量不错。先说确认没问题的部分:core 接入(DBType / DBVersion / DBSwitch / Constants / DBFactory / 三个 enum)逐一核对全部正确——DOLPHINDB2_CLASS / DOLPHINDB3_CLASS 的全限定类名与实际 dolphindb2.DolphinDB / dolphindb3.DolphinDB 精确匹配,DBFactoryDB_DOLPHINDB_2/3 → DOLPHINDB2/3_CLASS 映射无误,没有 ClassNotFound 风险。ConfigDescriptor 里三处被“放宽”的校验(DEVICE_NUM_PER_WRITE、TABLE/TREE 两个 insert-mode 守卫)都是白名单式新增或收敛到 IoTDB,对 Influx/Timescale 等其它库无副作用;数据类型例外(STRING/BLOB/TIMESTAMP/DATE/OBJECT)也是严格三库白名单,不削弱其它库校验。DDL(per-group DFS、TSDB 引擎、复合分区、sortColumnskeepDuplicates=LASTatomic='CHUNK')方向正确。dolphindb-2.0dolphindb-3.0DolphinDB.java 逐字节相同(仅 package 行不同),符合“Java API v2/v3 无破坏性变更”的说明。

下面按严重程度排序,重点是查询语义 / 写入计数是否对齐 iotdb-2.0 表模型。所有“iotdb-2.0 表模型行为”均已核对到具体行号。

阻塞级(逻辑错误,直接影响结果正确性 / 可比性)

  1. 查询点数口径错误executeQueryAndCount L733):用了 rows * QUERY_SENSOR_NUM * QUERY_DEVICE_NUM,而 iotdb-2.0 表模型是 rows * QUERY_SENSOR_NUMSessionStrategy L367-368,不乘 QUERY_DEVICE_NUM)。非聚合查询会多计 QUERY_DEVICE_NUM
  2. latestPointQuery 语义错误(L506-519):全局 last() 单行 vs 参考的 last_by(...) + GROUP BY device_id 每设备一行。
  3. 聚合查询缺 GROUP BY deviceIdaggRange/aggValue/aggRangeValue,L394-473):DolphinDB 塌缩成单行 vs 参考每设备一行。
  4. groupByQuery 丢设备维度(L476-503):只 GROUP BY bar(time,g) vs 参考 group by device_id, date_bin(...),且未投影桶列。
  5. setOpQuery 子查询丢时间/设备列(L612-638):集合运算只按值列去重,行标识错误。

健壮性 / 设计

  1. 值过滤未按类型处理(L364-392 等):对所有 sensor 拼 > <double>,参考对 DATE 列 CAST(... AS DATE)、阈值转 int。
  2. awaitDrain 无超时(L316-323):服务端 hang 会永久阻塞写线程(PR 已列为 follow-up,建议本 PR 直接修)。
  3. MTW 13 个 positional 参数可读性差(L647-664);threadCount=1+每 batch awaitDrain 的同步写取舍建议在注释写明。

次要 / 确认项

  1. 分区右边界按理想时间跨度估算(L196),建议确认乱序写入不越界。
  2. 配置项 rename IoTDB_TABLE_TIME_COLUMN → TABLE_TIME_COLUMN:对老 IoTDB 用户是静默失效的兼容性破坏(老 key 不再被读取,回落默认 time,无报错),建议在 release note / 迁移说明里提示。TableStrategy.java 的 38 行改动经核对是纯机械 rename,对现有 IoTDB 表模型用户行为不变。

关于“写入方式是否最优”:用 MTW(官方推荐)+ native DBConnection 查询(javaapi 未注册 JDBC SPI)方向正确;主要欠缺是 threadCount=1 退化成同步写(取舍可接受但建议注释说明)和 awaitDrain 无超时。

核心结论:接入框架没问题,问题集中在 1-5 的查询语义 / 计数口径——这些会让 DolphinDB 的查询结果和吞吐与 IoTDB 表模型不可比,建议对齐后再合入。(注:因我是本 PR 作者,GitHub 不允许对自己的 PR 提交 “Request changes”,故以 Comment 形式提交,请把上述 1-5 视为需要修改项。)

try {
Entity result = conn.run(sql);
int rows = result.rows();
long points = (long) rows * config.getQUERY_SENSOR_NUM() * config.getQUERY_DEVICE_NUM();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑错误(最重要):结果点数口径与 iotdb-2.0 表模型不一致,会让吞吐统计失真。

iotdb-2.0 表模型的口径(SessionStrategy.executeQueryAndGetStatusImpl L365-368)是:

long resultPointNum = line.get();              // line = 实际遍历到的结果行数
if (config.getIoTDB_DIALECT_MODE().equals(SQLDialect.TABLE)) {
  resultPointNum *= config.getQUERY_SENSOR_NUM();   // 只乘 sensor 数
}

实际结果行数 * QUERY_SENSOR_NUM,对所有 query 统一处理,不乘 QUERY_DEVICE_NUM(因为表模型里设备维度由结果行/GROUP BY device_id 承载,行里已经包含设备)。

这里 DolphinDB 用的是 rows * QUERY_SENSOR_NUM * QUERY_DEVICE_NUM,多了一个 QUERY_DEVICE_NUM 因子。对 precise / range / valueRange / 两个 OrderByDesc / setOp 这些非聚合查询,result.rows() 已经包含所有设备的行(deviceId IN (...)),再乘 QUERY_DEVICE_NUM 就是把设备数算了两遍——相对参考实现多计 QUERY_DEVICE_NUM

默认配置 QUERY_DEVICE_NUM=1 时数值上看不出来;一旦 QUERY_DEVICE_NUM>1(多设备查询)points/s 会被整体放大,和 IoTDB 不可比。

建议对齐为 points = rows * config.getQUERY_SENSOR_NUM(),去掉 QUERY_DEVICE_NUM

Comment on lines +506 to +519
public Status latestPointQuery(LatestPointQuery latestPointQuery) {
List<DeviceSchema> devs = latestPointQuery.getDeviceSchema();
String lastCols =
devs.get(0).getSensors().stream()
.map(s -> "last(" + s.getName() + ")")
.collect(Collectors.joining(", "));
String sql =
"SELECT "
+ lastCols
+ " FROM "
+ tableRef(devs.get(0))
+ " WHERE deviceId IN "
+ deviceInList(devs);
return executeQueryAndCount(sql);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑错误:latestPointQuery 与 iotdb-2.0 表模型语义不一致。

iotdb-2.0 表模型(TableStrategy.getLatestPointQuerySql L341-356 + addGroupByClauseIfNecessary L335-337)是 per-device 最新

SELECT device_id, last(time), last_by(s_0, time), last_by(s_1, time), ...
FROM <tbl> WHERE (device_id = 'd_0' OR ...) GROUP BY device_id

即用 last_by(sensor, time) + GROUP BY device_id,每个设备给出各自最新一行。

这里 DolphinDB 是:

SELECT last(s_0), last(s_1), ... FROM <tbl> WHERE deviceId IN ('d_0', ...)

last() 作用在整个多设备结果集上、且没有 GROUP BY deviceId,返回的是跨所有设备的单个全局最新值,只有 1 行。这既不是“每设备最新”语义,也会和上面的点数 bug 叠加放大误差。

建议对齐 last_by(...) ... context by deviceId(DolphinDB 的 per-partition 最新可用 context by deviceId 配合 last/last_by),返回每设备一行。两边都不带时间谓词,这点一致。

+ tsLiteral(aggRangeQuery.getEndTimestamp())
+ " AND deviceId IN "
+ deviceInList(devs);
return executeQueryAndCount(sql);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑错误:聚合查询缺少 GROUP BY deviceId,与 iotdb-2.0 表模型“每设备一行聚合”不一致。

iotdb-2.0 表模型(TableStrategy.getAggQuerySqlHead L206-223 + addAggWhereClause L331)对 aggRange / aggValue / aggRangeValue 都是:

SELECT device_id, <agg>(s_0), <agg>(s_1), ... FROM <tbl> WHERE ... AND (device_id...) GROUP BY device_id

→ 投影 device_idGROUP BY device_id,返回 N_device 行(每设备一行聚合)。

这里 DolphinDB 三个聚合查询都是 SELECT agg(s_i) ... AND deviceId IN (...)没有 GROUP BY,会把所有被选设备聚合成单行。这导致:返回行数(1 vs N_device)、聚合工作量、以及最终点数都与参考不一致(点数 bug 见上)。

建议这三个聚合查询补 GROUP BY deviceId,并在 SELECT 投影 deviceId,对齐 TableStrategy

+ ", "
+ groupByQuery.getGranularity()
+ "l)";
return executeQueryAndCount(sql);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑错误:groupByQuery 只按时间桶分组,丢了设备维度,且未投影桶列。

iotdb-2.0 表模型(TableStrategy.getGroupByQuerySQL L234-262)是按 设备 + 时间桶 双维分组,并投影两者:

SELECT device_id, date_bin(<g>ms, time), <agg>(s_0), ...
FROM <tbl> WHERE time>=.. AND time<=.. AND (device_id...)
group by device_id, date_bin(<g>ms, time)

这里 DolphinDB 是 ... GROUP BY bar(time, g l):只按时间桶分组(无 deviceId),且 SELECT 里没有桶列。结果是跨设备塌缩、且无法和时间桶对齐。

建议对齐为 GROUP BY deviceId, bar(time, g) 并把 deviceId, bar(...) AS bucket 投影出来(下方 groupByQueryOrderByDesc 已经用了 bar(...) AS bucket,可统一)。

.append(deviceInList(devs))
.append(")");
}
return executeQueryAndCount(sql.toString());
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

逻辑错误:setOpQuery 子查询只投影 sensor 值列、且没有把设备维度纳入投影,集合运算行标识不正确。

iotdb-2.0 表模型每个子查询是 getSimpleQuerySqlHead + range where(含 time + device 过滤),但投影也是 sensor 列;DolphinDB 这里子查询同样只投影 sensorColumns(...),没有 timeColumn、没有 deviceId

UNION/INTERSECT/EXCEPT 会基于“仅值列”的元组去重/求交:不同时间戳、不同设备但读数恰好相同的行会被错误合并,得到的集合不是“按时间范围内的行做集合运算”。

建议子查询至少投影 timeColumn(通常还应包含 deviceId)以保持行标识,对齐参考实现。

Comment on lines +316 to +323
private void awaitDrain(MultithreadedTableWriter mtw) throws InterruptedException {
while (true) {
MultithreadedTableWriter.Status st = mtw.getStatus();
if (st.unsentRows == 0) return;
if (st.hasError()) return;
Thread.sleep(1);
}
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

健壮性(liveness):awaitDrain 无超时上限,DolphinDB 卡住时会永久阻塞写线程。

while (true) {
  MultithreadedTableWriter.Status st = mtw.getStatus();
  if (st.unsentRows == 0) return;
  if (st.hasError()) return;
  Thread.sleep(1);
}

若后台发送线程卡住但既不报错、unsentRows 也不归零(服务端 hang / 连接断开),该循环会一直转,挂死整个 data client 线程;Thread.sleep(1) 也偏紧;返回 void,调用方无法区分“drain 完”还是“出错/超时”。

PR 描述已把此项列为 follow-up——建议本 PR 直接加 deadline(如复用写超时配置)并返回 boolean,避免合入留坑。

timeColumn,
null,
MultithreadedTableWriter.Mode.M_Append,
null);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可维护性(建议):MTW 13 个 positional 参数建议加名字注释;threadCount=1 的取舍点明利弊。

new MultithreadedTableWriter(host, port, user, pwd, dbPath, tableName, false, false, null, batchSize, 0.01f, 1, timeColumn, null, Mode.M_Append, null) 这一长串位置参数可读性较差,建议为关键位加注释(batchSize / throttle=0.01f / threadCount=1 / partitionCol=timeColumn / mode)。

另外 threadCount=1 + 每个 batch 末尾 awaitDrain 同步 flush,等于把 MTW 当“带缓冲的同步写”用——若是有意对齐其它 adapter 的并发语义(每 client 一个 writer),建议在类注释里写明,避免被误认为漏配;这也确实放弃了 MTW 多线程管道的吞吐优势,属设计取舍。

long startMs = TimeUtils.convertDateStrToTimestamp(config.getSTART_TIME());
long durationMs =
(long) config.getLOOP() * config.getBATCH_SIZE_PER_WRITE() * config.getPOINT_STEP();
long endMs = startMs + durationMs + 86_400_000L; // pad by 1 day for the right boundary
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

确认项(分区右边界):endMsLOOP*BATCH_SIZE_PER_WRITE*POINT_STEP 估算 +1 天。

该估算假设数据时间跨度 = LOOP * BATCH_SIZE_PER_WRITE * POINT_STEP。在乱序写入(IS_OUT_OF_ORDER/OUT_OF_ORDER_RATIO)或 DEVICE_NUM_PER_WRITE>1 改变时间推进方式时,真实最大时间戳可能超出估算,落在 VALUE 分区范围外会写失败。+1 天右边距能吸收一部分,建议确认乱序场景下边界安全(或对越界给出明确报错)。

try {
Entity result = conn.run(sql);
int rows = result.rows();
long points = (long) rows * config.getQUERY_SENSOR_NUM() * config.getQUERY_DEVICE_NUM();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dolphindb-3.0:点数口径 rows * QUERY_SENSOR_NUM * QUERY_DEVICE_NUM 多了 QUERY_DEVICE_NUM 因子;iotdb-2.0 表模型是 rows * QUERY_SENSOR_NUM 两模块代码逐字节相同,请一并修改。

+ tableRef(devs.get(0))
+ " WHERE deviceId IN "
+ deviceInList(devs);
return executeQueryAndCount(sql);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dolphindb-3.0latestPointQuery 用全局 last() 而非 per-device last_by + GROUP BY device_id,会塌缩成 1 行。 两模块代码逐字节相同,请一并修改。

JackieTien97 and others added 3 commits May 29, 2026 10:20
…le model

Address review findings on PR #540 by making the DolphinDB query and write
behavior match the iotdb-2.0 TableStrategy / SessionStrategy reference. Applied
byte-identically to both dolphindb-2.0 and dolphindb-3.0.

Query semantics (were diverging from the table-model reference):
- executeQueryAndCount: points = rows * QUERY_SENSOR_NUM (drop the spurious
  QUERY_DEVICE_NUM factor). The device dimension is already carried by the
  result rows, matching SessionStrategy#executeQueryAndGetStatusImpl which only
  multiplies by QUERY_SENSOR_NUM in TABLE mode.
- latestPointQuery: SELECT deviceId, last(s) ... GROUP BY deviceId -> one latest
  row per device (was a single global last() row), mirroring last_by + GROUP BY
  device_id.
- aggRange/aggValue/aggRangeValue: add GROUP BY deviceId and project deviceId ->
  one aggregated row per device (was collapsing all devices into one row).
- groupBy/groupByOrderByDesc: GROUP BY deviceId, bar(time,g) and project both the
  device and the time bucket (was time-bucket only, device dimension dropped).
- setOpQuery: project time + deviceId alongside sensor values so UNION/INTERSECT/
  EXCEPT preserve row identity (was deduping on sensor values only).
- rangeQueryOrderByDesc / valueRangeQueryOrderByDesc: ORDER BY deviceId, time DESC
  to match the reference ordering.

Value filter:
- valueFilterClause: truncate threshold to int and render DATE columns as a
  DolphinDB YYYY.MM.DD literal, matching TableStrategy#getValueFilterClause
  (was emitting a raw double for every column type).

Robustness / readability:
- awaitDrain: bound the busy-wait by WRITE_OPERATION_TIMEOUT_MS and return a
  boolean so insertOneBatch fails the batch instead of blocking forever on a
  stalled server.
- Document the 13 positional MultithreadedTableWriter args and the deliberate
  threadCount=1 (buffered-synchronous) design; add a class-level Javadoc.

mvn -pl core,dolphindb-3.0,dolphindb-2.0 test passes on JDK 8 (both
DolphinDBTypeMapTest green); spotless:apply applied.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…via DB_SWITCH

The DolphinDB adapter now supports two write paths, chosen by the third segment of
DB_SWITCH (reusing the existing DBType-DBVersion-DBInsertMode convention):

  DolphinDB-3-MTW / DolphinDB-2-MTW  -> MultithreadedTableWriter (existing buffered
                                        row writer)
  DolphinDB-3-PTA / DolphinDB-2-PTA  -> PartitionedTableAppender (new): transposes
                                        each MultiDeviceBatch into one columnar
                                        BasicTable and appends it once through a
                                        shared ExclusiveDBConnectionPool, which routes
                                        rows to their partitions.

The old no-suffix DolphinDB-3 / DolphinDB-2 are replaced by the four suffixed values.

core:
- DBInsertMode: add INSERT_USE_MTW("MTW"), INSERT_USE_PTA("PTA").
- DBSwitch: replace DB_DOLPHINDB_3/2 with DB_DOLPHINDB_{3,2}_{MTW,PTA}.
- DBFactory: the four switches still map to DOLPHINDB3_CLASS / DOLPHINDB2_CLASS — the
  write path is selected inside the adapter, not by class.

adapter (dolphindb-3.0, byte-identically synced to dolphindb-2.0):
- insertOneBatch dispatches on the insert mode to insertOneBatchViaMtw (unchanged) or
  the new insertOneBatchViaAppender.
- ensureAppender lazily builds one ExclusiveDBConnectionPool (size 4, for parallel
  partition appends) per adapter instance and one PartitionedTableAppender per
  (dbPath, table); partitionCol is the VALUE-partitioned time column.
- The columnar table is built in the same column order as buildCreateTableSql
  (time, deviceId, tag_*, s_0..s_n); a ColumnCategory.TAG sensor is rendered as a
  SYMBOL (STRING) column, matching the DDL. Per-value conversion mirrors convertValue.
- close() now also shuts the connection pool down.

config.properties / README / DifferentTestDatabase docs updated with the four values
and a one-line MTW-vs-PTA note (and stale DOLPHINDB_PARTITION_DAYS / RANGE-partition
text corrected to the current VALUE-partition design).

API surface verified via javap against dolphindb-javaapi 3.00.0.2 / 2.00.11.1.
mvn -pl core,dolphindb-3.0,dolphindb-2.0 -am test passes on JDK 8 (26 core test
classes + both DolphinDBTypeMapTest green); spotless:apply applied; DBSwitch parsing
of all four strings verified at runtime, old strings rejected.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@JackieTien97 JackieTien97 merged commit 5375675 into master May 30, 2026
2 checks passed
@JackieTien97 JackieTien97 deleted the dolphindb branch May 30, 2026 01:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant