diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java index 8a68b7579f97..ddfd355520f2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java @@ -199,7 +199,8 @@ public Map paramParsingPreparation(@NonNull TaskInstance taskI safePutAll(prepareParamsMap, commandParamsMap); } - // 6. VarPool: override values only for existing IN-direction parameters + // 6. VarPool: override values for existing IN-direction parameters, + // and inject new parameters that only exist in VarPool for placeholder resolution List varPools = parseVarPool(taskInstance); if (CollectionUtils.isNotEmpty(varPools)) { for (Property varPool : varPools) { @@ -208,7 +209,11 @@ public Map paramParsingPreparation(@NonNull TaskInstance taskI } Property targetParam = prepareParamsMap.get(varPool.getProp()); if (targetParam != null && Direct.IN.equals(targetParam.getDirect())) { + // Existing IN parameter: override its value targetParam.setValue(varPool.getValue()); + } else if (targetParam == null) { + // Parameter not in map: inject it for placeholder resolution + prepareParamsMap.put(varPool.getProp(), varPool); } } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java index 1f0beb349eee..3cce0826c7fa 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java @@ -538,4 +538,81 @@ public void testResolvePlaceholders() throws Exception { // Ensure no unintended side effects Assertions.assertEquals(4, paramsMap.size()); } + + @Test + public void testParamParsingPreparation_varPoolInjectionWhenParamNameNotInLocalParams() { + // Test scenario: upstream task outputs p1=111, downstream task has p2=${p1} but no local p1 + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setTaskCode(1000001L); + taskInstance.setTaskDefinitionVersion(1); + taskInstance.setExecutePath("home/path/execute"); + + // VarPool from upstream: p1=111 (OUT direction) + taskInstance.setVarPool("[{\"prop\":\"p1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"111\"}]"); + + TaskDefinition taskDefinition = new TaskDefinition(); + taskDefinition.setName("TaskName-1"); + taskDefinition.setCode(1000001L); + taskDefinition.setVersion(1); + + WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setId(2); + final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder() + .timeZone("Asia/Shanghai") + .build(); + workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam)); + workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString()); + workflowInstance.setGlobalParams("[]"); + workflowInstance.setScheduleTime(DateUtils.stringToDate("2024-01-01 00:00:00")); + + WorkflowDefinition workflowDefinition = new WorkflowDefinition(); + workflowDefinition.setName("ProcessName-1"); + workflowDefinition.setProjectName("ProjectName"); + workflowDefinition.setProjectCode(3000001L); + workflowDefinition.setCode(200001L); + + Project project = new Project(); + project.setName("ProjectName"); + project.setCode(3000001L); + + workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode()); + workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); + taskInstance.setTaskCode(taskDefinition.getCode()); + taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion()); + taskInstance.setProjectCode(workflowDefinition.getProjectCode()); + taskInstance.setWorkflowInstanceId(workflowInstance.getId()); + + // Local params: p2=${p1} (references p1 which only exists in varPool, not in local params) + org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters sqlParameters = + new org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters(); + sqlParameters.setType("MYSQL"); + sqlParameters.setSql("select 1"); + + Property p2 = new Property(); + p2.setProp("p2"); + p2.setDirect(Direct.IN); + p2.setType(DataType.VARCHAR); + p2.setValue("${p1}"); + sqlParameters.setLocalParams(Collections.singletonList(p2)); + + taskInstance.setTaskParams(JSONUtils.toJsonString(sqlParameters)); + + Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList()); + + Map propertyMap = + curingParamsServiceImpl.paramParsingPreparation(taskInstance, sqlParameters, workflowInstance, + project.getName(), workflowDefinition.getName()); + + Assertions.assertNotNull(propertyMap); + + // Assert: p1 should be injected from varPool + Assertions.assertTrue(propertyMap.containsKey("p1"), "p1 should be injected from varPool"); + Assertions.assertEquals("111", propertyMap.get("p1").getValue()); + + // Assert: p2 should exist and its value should be resolved from ${p1} to 111 + Assertions.assertTrue(propertyMap.containsKey("p2")); + Assertions.assertEquals("111", propertyMap.get("p2").getValue(), + "p2's value ${p1} should be resolved to 111 using injected varPool param"); + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 113dfbcd9eb8..1bdbc6aea6f6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -492,8 +492,12 @@ private String replaceOriginalValue(String content, String rgex, Map paramsMap = new HashMap<>(); + + Property p1 = new Property(); + p1.setProp("price"); + p1.setValue("$100"); + paramsMap.put("price", p1); + + Property p2 = new Property(); + p2.setProp("path"); + p2.setValue("C:\\Users\\test"); + paramsMap.put("path", p2); + + Property p3 = new Property(); + p3.setProp("empty"); + p3.setValue(""); + paramsMap.put("empty", p3); + + Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class); + method.setAccessible(true); + + // The regex pattern matches optional quotes around !{...}, so they are replaced too + String sql1 = "select * from items where price = '!{price}'"; + String result1 = (String) method.invoke(sqlTask, sql1, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap); + Assertions.assertEquals("select * from items where price = $100", result1); + + String sql2 = "load data local inpath \"!{path}\" into table t"; + String result2 = (String) method.invoke(sqlTask, sql2, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap); + Assertions.assertEquals("load data local inpath C:\\Users\\test into table t", result2); + + String sql3 = "select !{empty} as val"; + String result3 = (String) method.invoke(sqlTask, sql3, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap); + Assertions.assertEquals("select as val", result3); + } + + @Test + void testReplaceOriginalValue_paramNotFound_keepsOriginalText() throws Exception { + Map paramsMap = new HashMap<>(); + + Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class); + method.setAccessible(true); + + String sql = "select * from t where name = '!{unknownParam}'"; + String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap); + Assertions.assertEquals(sql, result); + } + + @Test + void testReplaceOriginalValue_paramValueNull_keepsOriginalText() throws Exception { + Map paramsMap = new HashMap<>(); + + Property p = new Property(); + p.setProp("name"); + p.setValue(null); + paramsMap.put("name", p); + + Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class); + method.setAccessible(true); + + String sql = "select * from t where name = '!{name}'"; + String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap); + Assertions.assertEquals(sql, result); + } + }