diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java index 563ff1172258..5610412a6d2b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/TaskDefinitionController.java @@ -30,11 +30,12 @@ import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.TaskDefinitionService; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.api.vo.TaskDefinitionVO; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.User; -import java.util.Map; +import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; @@ -84,14 +85,14 @@ public class TaskDefinitionController extends BaseController { @ResponseStatus(HttpStatus.OK) @ApiException(UPDATE_TASK_DEFINITION_ERROR) @OperatorLog(auditType = AuditType.TASK_UPDATE) - public Result updateTaskWithUpstream(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code, - @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, - @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { - Map result = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, + public Result updateTaskWithUpstream(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code") long code, + @RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj, + @RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) { + Long updatedTaskCode = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, taskDefinitionJsonObj, upstreamCodes); - return returnDataList(result); + return Result.success(updatedTaskCode); } /** @@ -141,12 +142,12 @@ public Result queryTaskDefinitionVersions(@Parameter(hidden = true) @RequestAttr @ResponseStatus(HttpStatus.OK) @ApiException(SWITCH_TASK_DEFINITION_VERSION_ERROR) @OperatorLog(auditType = AuditType.TASK_SWITCH_VERSION) - public Result switchTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code, - @PathVariable(value = "version") int version) { - Map result = taskDefinitionService.switchVersion(loginUser, projectCode, code, version); - return returnDataList(result); + public Result switchTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code") long code, + @PathVariable(value = "version") int version) { + taskDefinitionService.switchVersion(loginUser, projectCode, code, version); + return Result.success(); } /** @@ -167,13 +168,12 @@ public Result switchTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttr @ResponseStatus(HttpStatus.OK) @ApiException(DELETE_TASK_DEFINITION_VERSION_ERROR) @OperatorLog(auditType = AuditType.TASK_DELETE_VERSION) - public Result deleteTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code, - @PathVariable(value = "version") int version) { - Map result = - taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, code, version); - return returnDataList(result); + public Result deleteTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code") long code, + @PathVariable(value = "version") int version) { + taskDefinitionService.deleteByCodeAndVersion(loginUser, projectCode, code, version); + return Result.success(); } /** @@ -191,11 +191,12 @@ public Result deleteTaskDefinitionVersion(@Parameter(hidden = true) @RequestAttr @GetMapping(value = "/{code}") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_DETAIL_OF_TASK_DEFINITION_ERROR) - public Result queryTaskDefinitionDetail(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code") long code) { - Map result = taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, code); - return returnDataList(result); + public Result queryTaskDefinitionDetail(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code") long code) { + TaskDefinitionVO taskDefinitionVO = + taskDefinitionService.queryTaskDefinitionDetail(loginUser, projectCode, code); + return Result.success(taskDefinitionVO); } /** @@ -212,10 +213,10 @@ public Result queryTaskDefinitionDetail(@Parameter(hidden = true) @RequestAttrib @GetMapping(value = "/gen-task-codes") @ResponseStatus(HttpStatus.OK) @ApiException(LOGIN_USER_QUERY_PROJECT_LIST_PAGING_ERROR) - public Result genTaskCodeList(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam("genNum") Integer genNum) { - Map result = taskDefinitionService.genTaskCodeList(genNum); - return returnDataList(result); + public Result> genTaskCodeList(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("genNum") Integer genNum) { + List taskCodes = taskDefinitionService.genTaskCodeList(genNum); + return Result.success(taskCodes); } /** @@ -237,12 +238,11 @@ public Result genTaskCodeList(@Parameter(hidden = true) @RequestAttribute(value @ResponseStatus(HttpStatus.OK) @ApiException(RELEASE_TASK_DEFINITION_ERROR) @OperatorLog(auditType = AuditType.TASK_RELEASE) - public Result releaseTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, - @PathVariable(value = "code", required = true) long code, - @RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) { - Map result = - taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, code, releaseState); - return returnDataList(result); + public Result releaseTaskDefinition(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode, + @PathVariable(value = "code", required = true) long code, + @RequestParam(value = "releaseState", required = true, defaultValue = "OFFLINE") ReleaseState releaseState) { + taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, code, releaseState); + return Result.success(); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 51291d0cc464..e00f9e2ea818 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -158,7 +158,17 @@ public String ping() { // TODO Should we import package in python client side? utils package can but service can not, why // Core api public Map genTaskCodeList(Integer genNum) { - return taskDefinitionService.genTaskCodeList(genNum); + Map result = new HashMap<>(); + try { + List taskCodes = taskDefinitionService.genTaskCodeList(genNum); + result.put(Constants.STATUS, Status.SUCCESS); + result.put(Constants.MSG, Status.SUCCESS.getMsg()); + result.put(Constants.DATA_LIST, taskCodes); + } catch (ServiceException e) { + result.put(Constants.STATUS, Status.DATA_IS_NOT_VALID); + result.put(Constants.MSG, e.getMessage()); + } + return result; } public Map getCodeAndVersion(String projectName, String workflowDefinitionName, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java index f5a3e8aac16b..f2750efdca8c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/TaskDefinitionService.java @@ -18,11 +18,12 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.api.vo.TaskDefinitionVO; import org.apache.dolphinscheduler.common.enums.ReleaseState; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; -import java.util.Map; +import java.util.List; public interface TaskDefinitionService { @@ -33,11 +34,12 @@ public interface TaskDefinitionService { * @param projectCode project code * @param workflowDefinitionCode workflow definition code * @param taskName task name + * @return matching task definition */ - Map queryTaskDefinitionByName(User loginUser, - long projectCode, - long workflowDefinitionCode, - String taskName); + TaskDefinition queryTaskDefinitionByName(User loginUser, + long projectCode, + long workflowDefinitionCode, + String taskName); /** * Get resource task definition by code @@ -57,13 +59,13 @@ TaskDefinition getTaskDefinition(User loginUser, * @param taskCode task definition code * @param taskDefinitionJsonObj task definition json object * @param upstreamCodes upstream task codes, sep comma - * @return update result code + * @return updated task code */ - Map updateTaskWithUpstream(User loginUser, - long projectCode, - long taskCode, - String taskDefinitionJsonObj, - String upstreamCodes); + Long updateTaskWithUpstream(User loginUser, + long projectCode, + long taskCode, + String taskDefinitionJsonObj, + String upstreamCodes); /** * update task definition @@ -73,10 +75,10 @@ Map updateTaskWithUpstream(User loginUser, * @param taskCode task code * @param version the version user want to switch */ - Map switchVersion(User loginUser, - long projectCode, - long taskCode, - int version); + void switchVersion(User loginUser, + long projectCode, + long taskCode, + int version); /** * query the pagination versions info by one certain task definition code @@ -101,12 +103,11 @@ Result queryTaskDefinitionVersions(User loginUser, * @param projectCode project code * @param taskCode the task definition code * @param version the task definition version user want to delete - * @return delete version result code */ - Map deleteByCodeAndVersion(User loginUser, - long projectCode, - long taskCode, - int version); + void deleteByCodeAndVersion(User loginUser, + long projectCode, + long taskCode, + int version); /** * query detail of task definition by code @@ -116,9 +117,9 @@ Map deleteByCodeAndVersion(User loginUser, * @param taskCode the task definition code * @return task definition detail */ - Map queryTaskDefinitionDetail(User loginUser, - long projectCode, - long taskCode); + TaskDefinitionVO queryTaskDefinitionDetail(User loginUser, + long projectCode, + long taskCode); /** * gen task code list @@ -126,7 +127,7 @@ Map queryTaskDefinitionDetail(User loginUser, * @param genNum gen num * @return task code list */ - Map genTaskCodeList(Integer genNum); + List genTaskCodeList(Integer genNum); /** * release task definition @@ -135,12 +136,11 @@ Map queryTaskDefinitionDetail(User loginUser, * @param projectCode project code * @param code task definition code * @param releaseState releaseState - * @return update result code */ - Map releaseTaskDefinition(User loginUser, - long projectCode, - long code, - ReleaseState releaseState); + void releaseTaskDefinition(User loginUser, + long projectCode, + long code, + ReleaseState releaseState); void deleteTaskByWorkflowDefinitionCode(long workflowDefinitionCode, int workflowDefinitionVersion); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java index 6512338c5370..43fc5852b875 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java @@ -130,23 +130,33 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe * @param taskName task name */ @Override - public Map queryTaskDefinitionByName(User loginUser, long projectCode, long workflowDefinitionCode, - String taskName) { + public TaskDefinition queryTaskDefinitionByName(User loginUser, long projectCode, long workflowDefinitionCode, + String taskName) { Project project = projectMapper.queryByCode(projectCode); // check user access for project projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_DEFINITION); - Map result = new HashMap<>(); TaskDefinition taskDefinition = taskDefinitionMapper.queryByName(project.getCode(), workflowDefinitionCode, taskName); if (taskDefinition == null) { log.error("Task definition does not exist, taskName:{}.", taskName); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskName); - } else { - result.put(Constants.DATA_LIST, taskDefinition); - putMsg(result, Status.SUCCESS); + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskName); + } + return taskDefinition; + } + + /** + * Resolve project + write permission; throws ServiceException with the + * status that {@link ProjectService#hasProjectAndWritePerm(User, Project, Map)} + * would otherwise have written into the legacy result map. + */ + private void requireProjectAndWritePerm(User loginUser, Project project) { + Map permCheck = new HashMap<>(); + if (!projectService.hasProjectAndWritePerm(loginUser, project, permCheck)) { + Status status = (Status) permCheck.get(Constants.STATUS); + String msg = (String) permCheck.get(Constants.MSG); + throw new ServiceException(status.getCode(), msg); } - return result; } public void updateDag(User loginUser, long workflowDefinitionCode, @@ -202,29 +212,31 @@ public TaskDefinition getTaskDefinition(User loginUser, return taskDefinition; } - private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, - Map result) { + /** + * Update the task body and cascade through workflow task relations. + * + * @return the persisted {@link TaskDefinitionLog}, or {@code null} when the + * task body is unchanged (callers may still need to apply upstream + * changes). All other failure modes throw {@link ServiceException}. + */ + private TaskDefinitionLog updateTask(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj) { Project project = projectMapper.queryByCode(projectCode); // check if user have write perm for project - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return null; - } + requireProjectAndWritePerm(loginUser, project); TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null) { log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); - return null; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) { // if stream, can update task definition without online check if (taskDefinition.getTaskExecuteType() != TaskExecuteType.STREAM) { log.warn("Only {} type task can be updated without online check, taskDefinitionCode:{}.", TaskExecuteType.STREAM, taskCode); - putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); - return null; + throw new ServiceException(Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION); } } TaskDefinitionLog taskDefinitionToUpdate = @@ -234,24 +246,20 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task } if (taskDefinition.equals(taskDefinitionToUpdate)) { log.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINITION_NOT_MODIFY_ERROR, String.valueOf(taskCode)); return null; } if (taskDefinitionToUpdate == null) { log.warn("Parameter taskDefinitionJson is invalid."); - putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); - return null; + throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj); } if (!checkTaskParameters(taskDefinitionToUpdate.getTaskType(), taskDefinitionToUpdate.getTaskParams())) { - putMsg(result, Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); - return null; + throw new ServiceException(Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName()); } Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode); if (version == null || version == 0) { log.error("Max version task definitionLog can not be found in database, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.DATA_IS_NOT_VALID, taskCode); - return null; + throw new ServiceException(Status.DATA_IS_NOT_VALID, taskCode); } Date now = new Date(); taskDefinitionToUpdate.setCode(taskCode); @@ -270,12 +278,11 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task if ((update & insert) != 1) { log.error("Update task definition or definitionLog error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); - putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); - } else - log.info( - "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.", - projectCode, taskCode, taskDefinitionToUpdate.getVersion()); + } + log.info( + "Update task definition and definitionLog complete, projectCode:{}, taskDefinitionCode:{}, newTaskVersion:{}.", + projectCode, taskCode, taskDefinitionToUpdate.getVersion()); // update workflow task relation List workflowTaskRelations = workflowTaskRelationMapper .queryWorkflowTaskRelationByTaskCodeAndTaskVersion(taskDefinitionToUpdate.getCode(), @@ -302,7 +309,6 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task if (updateWorkflowDefinitionVersionCount != 1) { log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); - putMsg(result, Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR); throw new ServiceException(Status.WORKFLOW_TASK_RELATION_BATCH_UPDATE_ERROR); } WorkflowTaskRelationLog workflowTaskRelationLog = new WorkflowTaskRelationLog(workflowTaskRelation); @@ -313,7 +319,6 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task if (insertWorkflowTaskRelationLogCount != 1) { log.error("batch update workflow task relation error, projectCode:{}, taskDefinitionCode:{}.", projectCode, taskCode); - putMsg(result, Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR); throw new ServiceException(Status.CREATE_WORKFLOW_TASK_RELATION_LOG_ERROR); } } @@ -329,7 +334,6 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task workflowDefinitionLog.setOperator(loginUser.getId()); int insertWorkflowDefinitionLogCount = workflowDefinitionLogMapper.insert(workflowDefinitionLog); if ((updateWorkflowDefinitionCount & insertWorkflowDefinitionLogCount) != 1) { - putMsg(result, Status.UPDATE_WORKFLOW_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_WORKFLOW_DEFINITION_ERROR); } } @@ -345,14 +349,13 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task * @param taskCode task definition code * @param taskDefinitionJsonObj task definition json object * @param upstreamCodes upstream task codes, sep comma - * @return update result code + * @return updated task code */ @Override - public Map updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, - String taskDefinitionJsonObj, String upstreamCodes) { - Map result = new HashMap<>(); + public Long updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, + String taskDefinitionJsonObj, String upstreamCodes) { TaskDefinitionLog taskDefinitionToUpdate = - updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result); + updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj); List upstreamTaskRelations = workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); Set upstreamCodeSet = @@ -363,8 +366,7 @@ public Map updateTaskWithUpstream(User loginUser, long projectCo .collect(Collectors.toSet()); } if (CollectionUtils.isEqualCollection(upstreamCodeSet, upstreamTaskCodes) && taskDefinitionToUpdate == null) { - putMsg(result, Status.SUCCESS); - return result; + return taskCode; } Map queryUpStreamTaskCodeMap; if (CollectionUtils.isNotEmpty(upstreamTaskCodes)) { @@ -377,8 +379,7 @@ public Map updateTaskWithUpstream(User loginUser, long projectCo String notExistTaskCodes = StringUtils.join(upstreamTaskCodes, Constants.COMMA); log.error("Some task definitions in parameter upstreamTaskCodes do not exist, notExistTaskCodes:{}.", notExistTaskCodes); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes); - return result; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, notExistTaskCodes); } } else { queryUpStreamTaskCodeMap = new HashMap<>(); @@ -419,9 +420,7 @@ public Map updateTaskWithUpstream(User loginUser, long projectCo log.info( "Update task with upstream tasks complete, projectCode:{}, taskDefinitionCode:{}, upstreamTaskCodes:{}.", projectCode, taskCode, upstreamTaskCodes); - result.put(Constants.DATA_LIST, taskCode); - putMsg(result, Status.SUCCESS); - return result; + return taskCode; } private void updateUpstreamTask(Set allPreTaskCodeSet, long taskCode, long projectCode, @@ -520,24 +519,21 @@ private WorkflowTaskRelationLog createWorkflowTaskRelationLog(User loginUser, */ @Transactional @Override - public Map switchVersion(User loginUser, long projectCode, long taskCode, int version) { + public void switchVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); // check user access for project projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_SWITCH_TO_THIS_VERSION); - Map result = new HashMap<>(); if (processService.isTaskOnline(taskCode)) { log.warn( "Task definition version can not be switched due to workflow definition is {}, taskDefinitionCode:{}.", ReleaseState.ONLINE.getDescp(), taskCode); - putMsg(result, Status.WORKFLOW_DEFINE_STATE_ONLINE); - return result; + throw new ServiceException(Status.WORKFLOW_DEFINE_STATE_ONLINE); } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); - return result; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } TaskDefinitionLog taskDefinitionUpdate = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode, version); @@ -545,29 +541,26 @@ public Map switchVersion(User loginUser, long projectCode, long taskDefinitionUpdate.setUpdateTime(new Date()); taskDefinitionUpdate.setId(taskDefinition.getId()); int switchVersion = taskDefinitionMapper.updateById(taskDefinitionUpdate); - if (switchVersion > 0) { - List taskRelationList = - workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); - if (CollectionUtils.isNotEmpty(taskRelationList)) { - log.info( - "Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", - taskCode); - long workflowDefinitionCode = taskRelationList.get(0).getWorkflowDefinitionCode(); - List workflowTaskRelations = - workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinitionCode); - updateDag(loginUser, workflowDefinitionCode, workflowTaskRelations, - Lists.newArrayList(taskDefinitionUpdate)); - } else { - log.info( - "Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", - version, taskCode); - putMsg(result, Status.SUCCESS); - } - } else { + if (switchVersion <= 0) { log.error("Task definition version switch error, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.SWITCH_TASK_DEFINITION_VERSION_ERROR); + throw new ServiceException(Status.SWITCH_TASK_DEFINITION_VERSION_ERROR); + } + List taskRelationList = + workflowTaskRelationMapper.queryUpstreamByCode(projectCode, taskCode); + if (CollectionUtils.isNotEmpty(taskRelationList)) { + log.info( + "Task definition has upstream tasks, start handle them after switch task, taskDefinitionCode:{}.", + taskCode); + long workflowDefinitionCode = taskRelationList.get(0).getWorkflowDefinitionCode(); + List workflowTaskRelations = + workflowTaskRelationMapper.queryByWorkflowDefinitionCode(workflowDefinitionCode); + updateDag(loginUser, workflowDefinitionCode, workflowTaskRelations, + Lists.newArrayList(taskDefinitionUpdate)); + } else { + log.info( + "Task definition version switch complete, switch task version to {}, taskDefinitionCode:{}.", + version, taskCode); } - return result; } @Override @@ -595,86 +588,66 @@ public Result queryTaskDefinitionVersions(User loginUser, } @Override - public Map deleteByCodeAndVersion(User loginUser, long projectCode, long taskCode, int version) { + public void deleteByCodeAndVersion(User loginUser, long projectCode, long taskCode, int version) { Project project = projectMapper.queryByCode(projectCode); // check if user have write perm for project - Map result = new HashMap<>(); - boolean hasProjectAndWritePerm = projectService.hasProjectAndWritePerm(loginUser, project, result); - if (!hasProjectAndWritePerm) { - return result; - } - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); + requireProjectAndWritePerm(loginUser, project); + TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); - } else { - if (taskDefinition.getVersion() == version) { - log.warn( - "Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.", - projectCode, taskCode, version); - putMsg(result, Status.MAIN_TABLE_USING_VERSION); - return result; - } - int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version); - if (delete > 0) { - log.info( - "Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.", - projectCode, taskCode, version); - putMsg(result, Status.SUCCESS); - } else { - log.error("Task definition version delete error, projectCode:{}, taskDefinitionCode:{}, version:{}.", - projectCode, taskCode, version); - putMsg(result, Status.DELETE_TASK_DEFINITION_VERSION_ERROR); - } + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } - return result; + if (taskDefinition.getVersion() == version) { + log.warn( + "Task definition can not be deleted due to version is being used, projectCode:{}, taskDefinitionCode:{}, version:{}.", + projectCode, taskCode, version); + throw new ServiceException(Status.MAIN_TABLE_USING_VERSION); + } + int delete = taskDefinitionLogMapper.deleteByCodeAndVersion(taskCode, version); + if (delete <= 0) { + log.error("Task definition version delete error, projectCode:{}, taskDefinitionCode:{}, version:{}.", + projectCode, taskCode, version); + throw new ServiceException(Status.DELETE_TASK_DEFINITION_VERSION_ERROR); + } + log.info( + "Task definition version delete complete, projectCode:{}, taskDefinitionCode:{}, version:{}.", + projectCode, taskCode, version); } @Override - public Map queryTaskDefinitionDetail(User loginUser, long projectCode, long taskCode) { + public TaskDefinitionVO queryTaskDefinitionDetail(User loginUser, long projectCode, long taskCode) { Project project = projectMapper.queryByCode(projectCode); // check user access for project projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_DEFINITION); - Map result = new HashMap<>(); TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode); if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { log.error("Task definition does not exist, taskDefinitionCode:{}.", taskCode); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); - } else { - List taskRelationList = workflowTaskRelationMapper - .queryByCode(projectCode, 0, 0, taskCode); - if (CollectionUtils.isNotEmpty(taskRelationList)) { - taskRelationList = taskRelationList.stream() - .filter(v -> v.getPreTaskCode() != 0).collect(Collectors.toList()); - } - TaskDefinitionVO taskDefinitionVo = TaskDefinitionVO.fromTaskDefinition(taskDefinition); - taskDefinitionVo.setWorkflowTaskRelationList(taskRelationList); - result.put(Constants.DATA_LIST, taskDefinitionVo); - putMsg(result, Status.SUCCESS); + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(taskCode)); } - return result; + List taskRelationList = workflowTaskRelationMapper + .queryByCode(projectCode, 0, 0, taskCode); + if (CollectionUtils.isNotEmpty(taskRelationList)) { + taskRelationList = taskRelationList.stream() + .filter(v -> v.getPreTaskCode() != 0).collect(Collectors.toList()); + } + TaskDefinitionVO taskDefinitionVo = TaskDefinitionVO.fromTaskDefinition(taskDefinition); + taskDefinitionVo.setWorkflowTaskRelationList(taskRelationList); + return taskDefinitionVo; } @Override - public Map genTaskCodeList(Integer genNum) { - Map result = new HashMap<>(); + public List genTaskCodeList(Integer genNum) { if (genNum == null || genNum < 1 || genNum > 100) { log.warn("Parameter genNum must be great than 1 and less than 100."); - putMsg(result, Status.DATA_IS_NOT_VALID, genNum); - return result; + throw new ServiceException(Status.DATA_IS_NOT_VALID, genNum); } List taskCodes = new ArrayList<>(); - for (int i = 0; i < genNum; i++) { taskCodes.add(CodeGenerateUtils.genCode()); } - - putMsg(result, Status.SUCCESS); - // return workflowDefinitionCode - result.put(Constants.DATA_LIST, taskCodes); - return result; + return taskCodes; } /** @@ -684,32 +657,26 @@ public Map genTaskCodeList(Integer genNum) { * @param projectCode project code * @param code task definition code * @param releaseState releaseState - * @return update result code */ @Transactional @Override - public Map releaseTaskDefinition(User loginUser, long projectCode, long code, - ReleaseState releaseState) { + public void releaseTaskDefinition(User loginUser, long projectCode, long code, ReleaseState releaseState) { Project project = projectMapper.queryByCode(projectCode); // check user access for project projectService.checkProjectAndAuthThrowException(loginUser, project, null); - Map result = new HashMap<>(); if (null == releaseState) { - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE); - return result; + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE); } TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code); if (taskDefinition == null || projectCode != taskDefinition.getProjectCode()) { - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); - return result; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); } TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(code, taskDefinition.getVersion()); if (taskDefinitionLog == null) { log.error("Task definition does not exist, taskDefinitionCode:{}.", code); - putMsg(result, Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); - return result; + throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, String.valueOf(code)); } switch (releaseState) { case OFFLINE: @@ -727,8 +694,7 @@ public Map releaseTaskDefinition(User loginUser, long projectCod permissionCheck.checkPermission(); } catch (Exception e) { log.error("Resources permission check error, resourceIds:{}.", resourceIds, e); - putMsg(result, Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION); - return result; + throw new ServiceException(Status.RESOURCE_NOT_EXIST_OR_NO_PERMISSION); } } taskDefinition.setFlag(Flag.YES); @@ -736,20 +702,16 @@ public Map releaseTaskDefinition(User loginUser, long projectCod break; default: log.warn("Parameter releaseState is invalid."); - putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE); - return result; + throw new ServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, Constants.RELEASE_STATE); } int update = taskDefinitionMapper.updateById(taskDefinition); int updateLog = taskDefinitionLogMapper.updateById(taskDefinitionLog); if ((update == 0 && updateLog == 1) || (update == 1 && updateLog == 0)) { log.error("Update taskDefinition state or taskDefinitionLog state error, taskDefinitionCode:{}.", code); - putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR); throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR); } - log.error("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", + log.info("Update taskDefinition state or taskDefinitionLog state to complete, taskDefinitionCode:{}.", code); - putMsg(result, Status.SUCCESS); - return result; } @Override diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java index c38c221ff4dc..35ee26346105 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java @@ -17,9 +17,11 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.TASK_DEFINITION; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_SWITCH_TO_THIS_VERSION; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doNothing; @@ -55,10 +57,8 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessServiceImpl; -import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -146,10 +146,10 @@ public void queryTaskDefinitionByName() { when(taskDefinitionMapper.queryByName(project.getCode(), PROCESS_DEFINITION_CODE, taskName)) .thenReturn(new TaskDefinition()); - Map relation = taskDefinitionService + TaskDefinition taskDefinition = taskDefinitionService .queryTaskDefinitionByName(user, PROJECT_CODE, PROCESS_DEFINITION_CODE, taskName); - assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + Assertions.assertNotNull(taskDefinition); } @Test @@ -165,18 +165,17 @@ public void switchVersion() { taskDefinition.setProjectCode(PROJECT_CODE); when(taskDefinitionMapper.queryByCode(TASK_CODE)) .thenReturn(taskDefinition); - when(taskDefinitionMapper.updateById(new TaskDefinitionLog())).thenReturn(1); - Map relation = taskDefinitionService - .switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION); + when(taskDefinitionMapper.updateById(any(TaskDefinitionLog.class))).thenReturn(1); - assertEquals(Status.SUCCESS, relation.get(Constants.STATUS)); + Assertions.assertDoesNotThrow( + () -> taskDefinitionService.switchVersion(user, PROJECT_CODE, TASK_CODE, VERSION)); } @Test public void deleteByCodeAndVersion() { Project project = getProject(); when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(project); - when(projectService.hasProjectAndWritePerm(user, project, new HashMap<>())).thenReturn(true); + when(projectService.hasProjectAndWritePerm(eq(user), eq(project), any(Map.class))).thenReturn(true); // cross-project privilege escalation: taskCode belongs to another project - must be rejected TaskDefinition otherProjectTask = new TaskDefinition(); @@ -184,9 +183,8 @@ public void deleteByCodeAndVersion() { otherProjectTask.setCode(TASK_CODE); otherProjectTask.setVersion(VERSION + 1); when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(otherProjectTask); - Map crossProjectResult = - taskDefinitionService.deleteByCodeAndVersion(user, PROJECT_CODE, TASK_CODE, VERSION); - assertEquals(Status.TASK_DEFINE_NOT_EXIST, crossProjectResult.get(Constants.STATUS)); + assertThrowsServiceException(Status.TASK_DEFINE_NOT_EXIST, + () -> taskDefinitionService.deleteByCodeAndVersion(user, PROJECT_CODE, TASK_CODE, VERSION)); Mockito.verify(taskDefinitionLogMapper, Mockito.never()).deleteByCodeAndVersion(TASK_CODE, VERSION); // normal path: taskCode belongs to the project - should succeed @@ -196,18 +194,8 @@ public void deleteByCodeAndVersion() { taskDefinition.setVersion(VERSION + 1); when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); when(taskDefinitionLogMapper.deleteByCodeAndVersion(TASK_CODE, VERSION)).thenReturn(1); - Map successResult = - taskDefinitionService.deleteByCodeAndVersion(user, PROJECT_CODE, TASK_CODE, VERSION); - assertEquals(Status.SUCCESS, successResult.get(Constants.STATUS)); - } - - private void putMsg(Map result, Status status, Object... statusParams) { - result.put(Constants.STATUS, status); - if (statusParams != null && statusParams.length > 0) { - result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); - } else { - result.put(Constants.MSG, status.getMsg()); - } + Assertions.assertDoesNotThrow( + () -> taskDefinitionService.deleteByCodeAndVersion(user, PROJECT_CODE, TASK_CODE, VERSION)); } @Test @@ -246,8 +234,8 @@ public void checkJson() { @Test public void genTaskCodeList() { - Map genTaskCodeList = taskDefinitionService.genTaskCodeList(10); - assertEquals(Status.SUCCESS, genTaskCodeList.get(Constants.STATUS)); + List taskCodes = taskDefinitionService.genTaskCodeList(10); + assertEquals(10, taskCodes.size()); } @Test @@ -257,9 +245,8 @@ public void testReleaseTaskDefinition() { Mockito.doNothing().when(projectService).checkProjectAndAuthThrowException(user, project, null); // check task dose not exist - Map map = - taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); - assertEquals(Status.TASK_DEFINE_NOT_EXIST, map.get(Constants.STATUS)); + assertThrowsServiceException(Status.TASK_DEFINE_NOT_EXIST, + () -> taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE)); // process definition offline TaskDefinition taskDefinition = new TaskDefinition(); @@ -274,19 +261,17 @@ public void testReleaseTaskDefinition() { TaskDefinitionLog taskDefinitionLog = new TaskDefinitionLog(taskDefinition); when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(TASK_CODE, taskDefinition.getVersion())) .thenReturn(taskDefinitionLog); - Map offlineTaskResult = - taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.OFFLINE); - assertEquals(Status.SUCCESS, offlineTaskResult.get(Constants.STATUS)); + Assertions.assertDoesNotThrow(() -> taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, + ReleaseState.OFFLINE)); // process definition online, resource exist - Map onlineTaskResult = - taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.ONLINE); - assertEquals(Status.SUCCESS, onlineTaskResult.get(Constants.STATUS)); + Assertions.assertDoesNotThrow(() -> taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, + ReleaseState.ONLINE)); // release error code - Map failResult = - taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, ReleaseState.getEnum(2)); - assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, failResult.get(Constants.STATUS)); + assertThrowsServiceException(Status.REQUEST_PARAMS_NOT_VALID_ERROR, + () -> taskDefinitionService.releaseTaskDefinition(user, PROJECT_CODE, TASK_CODE, + ReleaseState.getEnum(2))); } @Test @@ -364,7 +349,7 @@ public void testUpdateTaskWithUpstream() { user.setUserType(UserType.ADMIN_USER); when(projectMapper.queryByCode(PROJECT_CODE)).thenReturn(getProject()); - when(projectService.hasProjectAndWritePerm(user, getProject(), new HashMap<>())).thenReturn(true); + when(projectService.hasProjectAndWritePerm(eq(user), eq(getProject()), any(Map.class))).thenReturn(true); when(taskDefinitionMapper.queryByCode(TASK_CODE)).thenReturn(taskDefinition); when(taskDefinitionLogMapper.queryMaxVersionForDefinition(TASK_CODE)).thenReturn(1); when(taskDefinitionMapper.updateById(Mockito.any())).thenReturn(1); @@ -381,9 +366,9 @@ public void testUpdateTaskWithUpstream() { when(workflowTaskRelationMapper.updateById(Mockito.any())).thenReturn(1); when(workflowTaskRelationLogDao.batchInsert(Mockito.anyList())).thenReturn(2); // success - Map successMap = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE, + Long updatedTaskCode = taskDefinitionService.updateTaskWithUpstream(user, PROJECT_CODE, TASK_CODE, taskDefinitionJson, UPSTREAM_CODE); - assertEquals(Status.SUCCESS, successMap.get(Constants.STATUS)); + assertEquals(TASK_CODE, updatedTaskCode); user.setUserType(UserType.GENERAL_USER); } }