102102import org .apache .iotdb .db .queryengine .plan .statement .crud .InsertRowsOfOneDeviceStatement ;
103103import org .apache .iotdb .db .queryengine .plan .statement .crud .InsertRowsStatement ;
104104import org .apache .iotdb .db .queryengine .plan .statement .crud .InsertTabletStatement ;
105+ import org .apache .iotdb .db .queryengine .plan .statement .crud .LoadTsFileStatement ;
105106import org .apache .iotdb .db .queryengine .plan .statement .metadata .CreateAlignedTimeSeriesStatement ;
106107import org .apache .iotdb .db .queryengine .plan .statement .metadata .CreateMultiTimeSeriesStatement ;
107108import org .apache .iotdb .db .queryengine .plan .statement .metadata .CreateTimeSeriesStatement ;
@@ -365,16 +366,29 @@ private TSExecuteStatementResp executeStatementInternal(
365366
366367 queryId = SESSION_MANAGER .requestQueryId (clientSession , req .statementId );
367368
368- result =
369- COORDINATOR .executeForTreeModel (
370- s ,
371- queryId ,
372- SESSION_MANAGER .getSessionInfo (clientSession ),
373- statement ,
374- partitionFetcher ,
375- schemaFetcher ,
376- req .getTimeout (),
377- true );
369+ // For synchronous multi-file loading, split into sub-statements for batch execution
370+ if (shouldSplitLoadTsFileStatement (s , false )) {
371+ result =
372+ executeBatchLoadTsFile (
373+ (LoadTsFileStatement ) s ,
374+ queryId ,
375+ SESSION_MANAGER .getSessionInfo (clientSession ),
376+ statement ,
377+ partitionFetcher ,
378+ schemaFetcher ,
379+ config .getQueryTimeoutThreshold ());
380+ } else {
381+ result =
382+ COORDINATOR .executeForTreeModel (
383+ s ,
384+ queryId ,
385+ SESSION_MANAGER .getSessionInfo (clientSession ),
386+ statement ,
387+ partitionFetcher ,
388+ schemaFetcher ,
389+ req .getTimeout (),
390+ true );
391+ }
378392 }
379393 } else {
380394 org .apache .iotdb .db .queryengine .plan .relational .sql .ast .Statement s =
@@ -396,17 +410,31 @@ private TSExecuteStatementResp executeStatementInternal(
396410
397411 queryId = SESSION_MANAGER .requestQueryId (clientSession , req .statementId );
398412
399- result =
400- COORDINATOR .executeForTableModel (
401- s ,
402- relationSqlParser ,
403- clientSession ,
404- queryId ,
405- SESSION_MANAGER .getSessionInfo (clientSession ),
406- statement ,
407- metadata ,
408- req .getTimeout (),
409- true );
413+ // For synchronous multi-file loading, split into sub-statements for batch execution
414+ if (shouldSplitTableLoadTsFile (s , false )) {
415+ result =
416+ executeBatchTableLoadTsFile (
417+ (org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile ) s ,
418+ relationSqlParser ,
419+ clientSession ,
420+ queryId ,
421+ SESSION_MANAGER .getSessionInfo (clientSession ),
422+ statement ,
423+ metadata ,
424+ config .getQueryTimeoutThreshold ());
425+ } else {
426+ result =
427+ COORDINATOR .executeForTableModel (
428+ s ,
429+ relationSqlParser ,
430+ clientSession ,
431+ queryId ,
432+ SESSION_MANAGER .getSessionInfo (clientSession ),
433+ statement ,
434+ metadata ,
435+ req .getTimeout (),
436+ true );
437+ }
410438 }
411439
412440 if (result .status .code != TSStatusCode .SUCCESS_STATUS .getStatusCode ()
@@ -1845,16 +1873,30 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
18451873 queryId = SESSION_MANAGER .requestQueryId ();
18461874 type = s .getType () == null ? null : s .getType ().name ();
18471875 // create and cache dataset
1848- result =
1849- COORDINATOR .executeForTreeModel (
1850- s ,
1851- queryId ,
1852- SESSION_MANAGER .getSessionInfo (clientSession ),
1853- statement ,
1854- partitionFetcher ,
1855- schemaFetcher ,
1856- config .getQueryTimeoutThreshold (),
1857- false );
1876+
1877+ // For asynchronous multi-file loading, split into sub-statements for batch execution
1878+ if (shouldSplitLoadTsFileStatement (s , true )) {
1879+ result =
1880+ executeBatchLoadTsFile (
1881+ (LoadTsFileStatement ) s ,
1882+ queryId ,
1883+ SESSION_MANAGER .getSessionInfo (clientSession ),
1884+ statement ,
1885+ partitionFetcher ,
1886+ schemaFetcher ,
1887+ config .getQueryTimeoutThreshold ());
1888+ } else {
1889+ result =
1890+ COORDINATOR .executeForTreeModel (
1891+ s ,
1892+ queryId ,
1893+ SESSION_MANAGER .getSessionInfo (clientSession ),
1894+ statement ,
1895+ partitionFetcher ,
1896+ schemaFetcher ,
1897+ config .getQueryTimeoutThreshold (),
1898+ false );
1899+ }
18581900 }
18591901 } else {
18601902
@@ -1875,17 +1917,31 @@ public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) {
18751917
18761918 queryId = SESSION_MANAGER .requestQueryId ();
18771919
1878- result =
1879- COORDINATOR .executeForTableModel (
1880- s ,
1881- relationSqlParser ,
1882- clientSession ,
1883- queryId ,
1884- SESSION_MANAGER .getSessionInfo (clientSession ),
1885- statement ,
1886- metadata ,
1887- config .getQueryTimeoutThreshold (),
1888- false );
1920+ // For asynchronous multi-file loading, split into sub-statements for batch execution
1921+ if (shouldSplitTableLoadTsFile (s , true )) {
1922+ result =
1923+ executeBatchTableLoadTsFile (
1924+ (org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile ) s ,
1925+ relationSqlParser ,
1926+ clientSession ,
1927+ queryId ,
1928+ SESSION_MANAGER .getSessionInfo (clientSession ),
1929+ statement ,
1930+ metadata ,
1931+ config .getQueryTimeoutThreshold ());
1932+ } else {
1933+ result =
1934+ COORDINATOR .executeForTableModel (
1935+ s ,
1936+ relationSqlParser ,
1937+ clientSession ,
1938+ queryId ,
1939+ SESSION_MANAGER .getSessionInfo (clientSession ),
1940+ statement ,
1941+ metadata ,
1942+ config .getQueryTimeoutThreshold (),
1943+ false );
1944+ }
18891945 }
18901946
18911947 results .add (result .status );
@@ -3190,4 +3246,202 @@ public void handleClientExit() {
31903246 PipeDataNodeAgent .receiver ().legacy ().handleClientExit ();
31913247 SubscriptionAgent .receiver ().handleClientExit ();
31923248 }
3249+
3250+ /**
3251+ * Determines whether a tree-model LoadTsFileStatement should be split into multiple
3252+ * sub-statements for execution.
3253+ *
3254+ * @param statement the Statement to be executed
3255+ * @param requireAsync whether async loading is required
3256+ * @return true if the statement should be split for execution, false otherwise
3257+ */
3258+ private boolean shouldSplitLoadTsFileStatement (Statement statement , boolean requireAsync ) {
3259+ if (!(statement instanceof LoadTsFileStatement )) {
3260+ return false ;
3261+ }
3262+ LoadTsFileStatement loadStmt = (LoadTsFileStatement ) statement ;
3263+ return loadStmt .getTsFiles ().size () > 1 && loadStmt .isAsyncLoad () == requireAsync ;
3264+ }
3265+
3266+ /**
3267+ * Determines whether a table-model LoadTsFile should be split into multiple sub-statements for
3268+ * execution.
3269+ *
3270+ * @param statement the Statement to be executed
3271+ * @param requireAsync whether async loading is required
3272+ * @return true if the statement should be split for execution, false otherwise
3273+ */
3274+ private boolean shouldSplitTableLoadTsFile (
3275+ org .apache .iotdb .db .queryengine .plan .relational .sql .ast .Statement statement ,
3276+ boolean requireAsync ) {
3277+ if (!(statement
3278+ instanceof org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile )) {
3279+ return false ;
3280+ }
3281+ org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile loadStmt =
3282+ (org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile ) statement ;
3283+ return loadStmt .getTsFiles ().size () > 1 && loadStmt .isAsyncLoad () == requireAsync ;
3284+ }
3285+
3286+ /**
3287+ * Executes tree-model LoadTsFileStatement sub-statements in batch.
3288+ *
3289+ * @param loadTsFileStatement the LoadTsFileStatement to be executed
3290+ * @param queryId the query ID
3291+ * @param sessionInfo the session information
3292+ * @param statement the SQL statement string
3293+ * @param partitionFetcher the partition fetcher
3294+ * @param schemaFetcher the schema fetcher
3295+ * @param timeoutMs the timeout in milliseconds
3296+ * @return the execution result
3297+ */
3298+ private ExecutionResult executeBatchLoadTsFile (
3299+ LoadTsFileStatement loadTsFileStatement ,
3300+ long queryId ,
3301+ SessionInfo sessionInfo ,
3302+ String statement ,
3303+ IPartitionFetcher partitionFetcher ,
3304+ ISchemaFetcher schemaFetcher ,
3305+ long timeoutMs ) {
3306+
3307+ ExecutionResult result = null ;
3308+ List <LoadTsFileStatement > subStatements = loadTsFileStatement .getSubStatement ();
3309+ int totalFiles = subStatements .size ();
3310+
3311+ LOGGER .info ("Start batch loading {} TsFile(s) in tree model, queryId: {}" , totalFiles , queryId );
3312+
3313+ for (int i = 0 ; i < totalFiles ; i ++) {
3314+ LoadTsFileStatement subStatement = subStatements .get (i );
3315+ LOGGER .info (
3316+ "Loading TsFile {}/{} in tree model, file: {}, queryId: {}" ,
3317+ i + 1 ,
3318+ totalFiles ,
3319+ subStatement .getTsFiles ().get (0 ).getName (),
3320+ queryId );
3321+
3322+ result =
3323+ COORDINATOR .executeForTreeModel (
3324+ subStatement ,
3325+ queryId ,
3326+ sessionInfo ,
3327+ statement ,
3328+ partitionFetcher ,
3329+ schemaFetcher ,
3330+ timeoutMs ,
3331+ false );
3332+
3333+ // Exit early if any sub-statement execution fails
3334+ if (result != null
3335+ && result .status .getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
3336+ LOGGER .warn (
3337+ "Failed to load TsFile {}/{} in tree model, file: {}, queryId: {}, error: {}" ,
3338+ i + 1 ,
3339+ totalFiles ,
3340+ subStatement .getTsFiles ().get (0 ).getName (),
3341+ queryId ,
3342+ result .status .getMessage ());
3343+ break ;
3344+ }
3345+
3346+ LOGGER .info (
3347+ "Successfully loaded TsFile {}/{} in tree model, file: {}, queryId: {}" ,
3348+ i + 1 ,
3349+ totalFiles ,
3350+ subStatement .getTsFiles ().get (0 ).getName (),
3351+ queryId );
3352+ }
3353+
3354+ if (result != null && result .status .getCode () == TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
3355+ LOGGER .info (
3356+ "Completed batch loading all {} TsFile(s) in tree model, queryId: {}" ,
3357+ totalFiles ,
3358+ queryId );
3359+ }
3360+
3361+ return result ;
3362+ }
3363+
3364+ /**
3365+ * Executes table-model LoadTsFile sub-statements in batch.
3366+ *
3367+ * @param loadTsFile the LoadTsFile to be executed
3368+ * @param relationSqlParser the relational SQL parser
3369+ * @param clientSession the client session
3370+ * @param queryId the query ID
3371+ * @param sessionInfo the session information
3372+ * @param statement the SQL statement string
3373+ * @param metadata the metadata
3374+ * @param timeoutMs the timeout in milliseconds
3375+ * @return the execution result
3376+ */
3377+ private ExecutionResult executeBatchTableLoadTsFile (
3378+ org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile loadTsFile ,
3379+ SqlParser relationSqlParser ,
3380+ IClientSession clientSession ,
3381+ long queryId ,
3382+ SessionInfo sessionInfo ,
3383+ String statement ,
3384+ Metadata metadata ,
3385+ long timeoutMs ) {
3386+
3387+ ExecutionResult result = null ;
3388+ List <org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile > subStatements =
3389+ loadTsFile .getSubStatement ();
3390+ int totalFiles = subStatements .size ();
3391+
3392+ LOGGER .info (
3393+ "Start batch loading {} TsFile(s) in table model, queryId: {}" , totalFiles , queryId );
3394+
3395+ for (int i = 0 ; i < totalFiles ; i ++) {
3396+ org .apache .iotdb .db .queryengine .plan .relational .sql .ast .LoadTsFile subStatement =
3397+ subStatements .get (i );
3398+ LOGGER .info (
3399+ "Loading TsFile {}/{} in table model, file: {}, queryId: {}" ,
3400+ i + 1 ,
3401+ totalFiles ,
3402+ subStatement .getTsFiles ().get (0 ).getName (),
3403+ queryId );
3404+
3405+ result =
3406+ COORDINATOR .executeForTableModel (
3407+ subStatement ,
3408+ relationSqlParser ,
3409+ clientSession ,
3410+ queryId ,
3411+ sessionInfo ,
3412+ statement ,
3413+ metadata ,
3414+ timeoutMs ,
3415+ false );
3416+
3417+ // Exit early if any sub-statement execution fails
3418+ if (result != null
3419+ && result .status .getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
3420+ LOGGER .warn (
3421+ "Failed to load TsFile {}/{} in table model, file: {}, queryId: {}, error: {}" ,
3422+ i + 1 ,
3423+ totalFiles ,
3424+ subStatement .getTsFiles ().get (0 ).getName (),
3425+ queryId ,
3426+ result .status .getMessage ());
3427+ break ;
3428+ }
3429+
3430+ LOGGER .info (
3431+ "Successfully loaded TsFile {}/{} in table model, file: {}, queryId: {}" ,
3432+ i + 1 ,
3433+ totalFiles ,
3434+ subStatement .getTsFiles ().get (0 ).getName (),
3435+ queryId );
3436+ }
3437+
3438+ if (result != null && result .status .getCode () == TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
3439+ LOGGER .info (
3440+ "Completed batch loading all {} TsFile(s) in table model, queryId: {}" ,
3441+ totalFiles ,
3442+ queryId );
3443+ }
3444+
3445+ return result ;
3446+ }
31933447}
0 commit comments