Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
safePutAll(prepareParamsMap, commandParamsMap);
}

// 6. VarPool: override values only for existing IN-direction parameters
// 6. VarPool: override values for existing IN-direction parameters,
// and inject new parameters that only exist in VarPool for placeholder resolution
List<Property> varPools = parseVarPool(taskInstance);
if (CollectionUtils.isNotEmpty(varPools)) {
for (Property varPool : varPools) {
Expand All @@ -208,7 +209,11 @@ public Map<String, Property> paramParsingPreparation(@NonNull TaskInstance taskI
}
Property targetParam = prepareParamsMap.get(varPool.getProp());
if (targetParam != null && Direct.IN.equals(targetParam.getDirect())) {
// Existing IN parameter: override its value
targetParam.setValue(varPool.getValue());
} else if (targetParam == null) {
// Parameter not in map: inject it for placeholder resolution
prepareParamsMap.put(varPool.getProp(), varPool);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,4 +538,81 @@ public void testResolvePlaceholders() throws Exception {
// Ensure no unintended side effects
Assertions.assertEquals(4, paramsMap.size());
}

@Test
public void testParamParsingPreparation_varPoolInjectionWhenParamNameNotInLocalParams() {
// Test scenario: upstream task outputs p1=111, downstream task has p2=${p1} but no local p1
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskCode(1000001L);
taskInstance.setTaskDefinitionVersion(1);
taskInstance.setExecutePath("home/path/execute");

// VarPool from upstream: p1=111 (OUT direction)
taskInstance.setVarPool("[{\"prop\":\"p1\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"111\"}]");

TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setName("TaskName-1");
taskDefinition.setCode(1000001L);
taskDefinition.setVersion(1);

WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(2);
final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder()
.timeZone("Asia/Shanghai")
.build();
workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
workflowInstance.setGlobalParams("[]");
workflowInstance.setScheduleTime(DateUtils.stringToDate("2024-01-01 00:00:00"));

WorkflowDefinition workflowDefinition = new WorkflowDefinition();
workflowDefinition.setName("ProcessName-1");
workflowDefinition.setProjectName("ProjectName");
workflowDefinition.setProjectCode(3000001L);
workflowDefinition.setCode(200001L);

Project project = new Project();
project.setName("ProjectName");
project.setCode(3000001L);

workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
taskInstance.setTaskCode(taskDefinition.getCode());
taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
taskInstance.setProjectCode(workflowDefinition.getProjectCode());
taskInstance.setWorkflowInstanceId(workflowInstance.getId());

// Local params: p2=${p1} (references p1 which only exists in varPool, not in local params)
org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters sqlParameters =
new org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters();
sqlParameters.setType("MYSQL");
sqlParameters.setSql("select 1");

Property p2 = new Property();
p2.setProp("p2");
p2.setDirect(Direct.IN);
p2.setType(DataType.VARCHAR);
p2.setValue("${p1}");
sqlParameters.setLocalParams(Collections.singletonList(p2));

taskInstance.setTaskParams(JSONUtils.toJsonString(sqlParameters));

Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());

Map<String, Property> propertyMap =
curingParamsServiceImpl.paramParsingPreparation(taskInstance, sqlParameters, workflowInstance,
project.getName(), workflowDefinition.getName());

Assertions.assertNotNull(propertyMap);

// Assert: p1 should be injected from varPool
Assertions.assertTrue(propertyMap.containsKey("p1"), "p1 should be injected from varPool");
Assertions.assertEquals("111", propertyMap.get("p1").getValue());

// Assert: p2 should exist and its value should be resolved from ${p1} to 111
Assertions.assertTrue(propertyMap.containsKey("p2"));
Assertions.assertEquals("111", propertyMap.get("p2").getValue(),
"p2's value ${p1} should be resolved to 111 using injected varPool param");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,12 @@ private String replaceOriginalValue(String content, String rgex, Map<String, Pro
break;
}
String paramName = m.group(1);
String paramValue = sqlParamsMap.get(paramName).getValue();
content = m.replaceFirst(paramValue);
Property prop = sqlParamsMap.get(paramName);
if (prop == null || prop.getValue() == null) {
log.warn("Cannot find parameter: {} for !{{}} replacement, skipping", paramName, paramName);
break;
}
content = m.replaceFirst(Matcher.quoteReplacement(prop.getValue()));
}
return content;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,4 +489,69 @@ private ResourceParametersHelper getResourceParametersHelperWithDatasourceType(D
return resourceParametersHelper;
}

@Test
void testReplaceOriginalValue_withSpecialCharacters() throws Exception {
Map<String, Property> paramsMap = new HashMap<>();

Property p1 = new Property();
p1.setProp("price");
p1.setValue("$100");
paramsMap.put("price", p1);

Property p2 = new Property();
p2.setProp("path");
p2.setValue("C:\\Users\\test");
paramsMap.put("path", p2);

Property p3 = new Property();
p3.setProp("empty");
p3.setValue("");
paramsMap.put("empty", p3);

Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
method.setAccessible(true);

// The regex pattern matches optional quotes around !{...}, so they are replaced too
String sql1 = "select * from items where price = '!{price}'";
String result1 = (String) method.invoke(sqlTask, sql1, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
Assertions.assertEquals("select * from items where price = $100", result1);

String sql2 = "load data local inpath \"!{path}\" into table t";
String result2 = (String) method.invoke(sqlTask, sql2, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
Assertions.assertEquals("load data local inpath C:\\Users\\test into table t", result2);

String sql3 = "select !{empty} as val";
String result3 = (String) method.invoke(sqlTask, sql3, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
Assertions.assertEquals("select as val", result3);
}

@Test
void testReplaceOriginalValue_paramNotFound_keepsOriginalText() throws Exception {
Map<String, Property> paramsMap = new HashMap<>();

Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
method.setAccessible(true);

String sql = "select * from t where name = '!{unknownParam}'";
String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
Assertions.assertEquals(sql, result);
}

@Test
void testReplaceOriginalValue_paramValueNull_keepsOriginalText() throws Exception {
Map<String, Property> paramsMap = new HashMap<>();

Property p = new Property();
p.setProp("name");
p.setValue(null);
paramsMap.put("name", p);

Method method = SqlTask.class.getDeclaredMethod("replaceOriginalValue", String.class, String.class, Map.class);
method.setAccessible(true);

String sql = "select * from t where name = '!{name}'";
String result = (String) method.invoke(sqlTask, sql, "['\"]*\\!\\{(.*?)\\}['\"]*", paramsMap);
Assertions.assertEquals(sql, result);
}

}
Loading