Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import java.util.Map;
import java.util.List;
import java.util.Set;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -126,9 +127,9 @@
@GetMapping(value = "/all")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKER_GROUP_FAIL)
public Result queryAllWorkerGroups(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
Map<String, Object> result = workerGroupService.queryAllGroup(loginUser);
return returnDataList(result);
public Result<List<String>> queryAllWorkerGroups(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
List<String> workerGroupNames = workerGroupService.queryAllGroup(loginUser);
return Result.success(workerGroupNames);
}

/**
Expand All @@ -146,10 +147,10 @@
@ResponseStatus(HttpStatus.OK)
@ApiException(DELETE_WORKER_GROUP_FAIL)
@OperatorLog(auditType = AuditType.WORKER_GROUP_DELETE)
public Result deleteWorkerGroupById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("id") Integer id) {
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(loginUser, id);
return returnDataList(result);
public Result<Void> deleteWorkerGroupById(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable("id") Integer id) {
workerGroupService.deleteWorkerGroupById(loginUser, id);
return Result.success();
}

/**
Expand All @@ -162,8 +163,8 @@
@GetMapping(value = "/worker-address-list")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_WORKER_ADDRESS_LIST_FAIL)
public Result queryWorkerAddressList(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {
Map<String, Object> result = workerGroupService.getWorkerAddressList();
return returnDataList(result);
public Result<Set<String>> queryWorkerAddressList(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser) {

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'loginUser' is never used.
Set<String> workerAddressList = workerGroupService.getWorkerAddressList();
return Result.success(workerAddressList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Map;
import java.util.Set;

public interface WorkerGroupService {

Expand Down Expand Up @@ -54,24 +55,23 @@ public interface WorkerGroupService {
* Query all worker group
*
* @param loginUser login user
* @return all worker group list
* @return distinct worker group names available to the user
*/
Map<String, Object> queryAllGroup(User loginUser);
List<String> queryAllGroup(User loginUser);

/**
* Delete worker group by id
* @param loginUser login user
* @param id worker group id
* @return delete result code
*/
Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);
void deleteWorkerGroupById(User loginUser, Integer id);

/**
* Query all worker address list
*
* @return all worker address list
* @return all worker address set
*/
Map<String, Object> getWorkerAddressList();
Set<String> getWorkerAddressList();

/**
* Query worker group by workflow definition codes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -114,7 +113,6 @@ public WorkerGroup saveWorkerGroup(User loginUser,
String name,
String addrList,
String description) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser, null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
// todo: add permission exception
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
Expand All @@ -140,10 +138,6 @@ public WorkerGroup saveWorkerGroup(User loginUser,
if (workerGroup == null) {
throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, id);
}
// todo: Can we update the worker name?
if (!workerGroup.getName().equals(name)) {
checkWorkerGroupDependencies(workerGroup, result);
}
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
Expand All @@ -159,23 +153,19 @@ public WorkerGroup saveWorkerGroup(User loginUser,
}

/**
* check if the worker group has any dependent tasks,schedulers or environments.
*
* @param workerGroup worker group
* @return boolean
* check if the worker group has any dependent tasks, schedulers or environments;
* throws ServiceException with the matching status if any dependency is found.
*/
private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String, Object> result) {
private void checkWorkerGroupDependencies(WorkerGroup workerGroup) {
// check if the worker group has any dependent tasks
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.selectList(
new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getWorkerGroup, workerGroup.getName()));

if (CollectionUtils.isNotEmpty(taskDefinitions)) {
List<String> taskNames = taskDefinitions.stream().limit(3).map(taskDefinition -> taskDefinition.getName())
List<String> taskNames = taskDefinitions.stream().limit(3).map(TaskDefinition::getName)
.collect(Collectors.toList());

putMsg(result, Status.WORKER_GROUP_DEPENDENT_TASK_EXISTS, taskDefinitions.size(),
throw new ServiceException(Status.WORKER_GROUP_DEPENDENT_TASK_EXISTS, taskDefinitions.size(),
JSONUtils.toJsonString(taskNames));
return true;
}

// check if the worker group has any dependent schedulers
Expand All @@ -187,10 +177,8 @@ private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String
.map(schedule -> workflowDefinitionMapper.queryByCode(schedule.getWorkflowDefinitionCode())
.getName())
.collect(Collectors.toList());

putMsg(result, Status.WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS, schedules.size(),
throw new ServiceException(Status.WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS, schedules.size(),
JSONUtils.toJsonString(workflowDefinitionNames));
return true;
}

// check if the worker group has any dependent environments
Expand All @@ -199,11 +187,9 @@ private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String
.lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName()));

if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) {
putMsg(result, Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, environmentWorkerGroupRelations.size());
return true;
throw new ServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS,
environmentWorkerGroupRelations.size());
}

return false;
}

private void checkWorkerGroupAddrList(String workerGroupAddress) {
Expand Down Expand Up @@ -282,12 +268,11 @@ public Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSi
/**
* query all worker group
*
* @param loginUser
* @return all worker group list
* @param loginUser login user
* @return distinct worker group names available to the user
*/
@Override
public Map<String, Object> queryAllGroup(User loginUser) {
Map<String, Object> result = new HashMap<>();
public List<String> queryAllGroup(User loginUser) {
List<WorkerGroupPageDetail> workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
workerGroups = getUiWorkerGroupPageDetails(null);
Expand All @@ -303,9 +288,7 @@ public Map<String, Object> queryAllGroup(User loginUser) {
.map(WorkerGroup::getName)
.collect(Collectors.toList());
availableWorkerGroupList.addAll(configWorkerGroupNames);
result.put(Constants.DATA_LIST, availableWorkerGroupList.stream().distinct().collect(Collectors.toList()));
putMsg(result, Status.SUCCESS);
return result;
return availableWorkerGroupList.stream().distinct().collect(Collectors.toList());
}

private List<WorkerGroupPageDetail> getUiWorkerGroupPageDetails(List<Integer> ids) {
Expand All @@ -328,21 +311,17 @@ private List<WorkerGroupPageDetail> getUiWorkerGroupPageDetails(List<Integer> id
* delete worker group by id
*
* @param id worker group id
* @return delete result code
*/
@Override
@Transactional
public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
Map<String, Object> result = new HashMap<>();
public void deleteWorkerGroupById(User loginUser, Integer id) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
WorkerGroup workerGroup = workerGroupDao.queryById(id);
if (workerGroup == null) {
log.error("Worker group does not exist, workerGroupId:{}.", id);
putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST);
return result;
throw new ServiceException(Status.DELETE_WORKER_GROUP_NOT_EXIST);
}
List<WorkflowInstance> workflowInstances = workflowInstanceMapper.queryByWorkerGroupNameAndStatus(
workerGroup.getName(),
Expand All @@ -353,34 +332,25 @@ public Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id) {
log.warn(
"Delete worker group failed because there are {} workflowInstances are using it, workflowInstanceIds:{}.",
workflowInstances.size(), workflowInstanceIds);
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, workflowInstances.size());
return result;
throw new ServiceException(Status.DELETE_WORKER_GROUP_BY_ID_FAIL, workflowInstances.size());
}

if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
}
checkWorkerGroupDependencies(workerGroup);

workerGroupDao.deleteById(id);
boardCastToMasterThatWorkerGroupChanged();

log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
return result;
}

/**
* query all worker address list
*
* @return all worker address list
* @return all worker address set
*/
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
Set<String> serverNodeList = registryClient.getServerNodeSet(RegistryNodeType.WORKER);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;
public Set<String> getWorkerAddressList() {
return registryClient.getServerNodeSet(RegistryNodeType.WORKER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
Expand Down Expand Up @@ -204,9 +203,8 @@ public void giveValidParams_whenQueryAllGroupPaging_expectSuccess() {

@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
Assertions.assertEquals(workerGroups.size(), 0);
List<String> workerGroups = workerGroupService.queryAllGroup(getLoginUser());
Assertions.assertEquals(0, workerGroups.size());
}

@Test
Expand All @@ -218,9 +216,8 @@ public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists()
baseServiceLogger)).thenReturn(true);
when(workerGroupDao.queryById(1)).thenReturn(null);

Map<String, Object> notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
((Status) notExistResult.get(Constants.STATUS)).getCode());
assertThrowsServiceException(Status.DELETE_WORKER_GROUP_NOT_EXIST,
() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
Expand All @@ -240,9 +237,8 @@ public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
WorkflowExecutionStatus.NOT_TERMINAL_STATES))
.thenReturn(workflowInstances);

Map<String, Object> deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
((Status) deleteFailed.get(Constants.STATUS)).getCode());
assertThrowsServiceException(Status.DELETE_WORKER_GROUP_BY_ID_FAIL,
() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
Expand All @@ -257,24 +253,16 @@ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null);

when(workerGroupDao.deleteById(1)).thenReturn(true);

when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);

when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);

when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);

Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) successResult.get(Constants.STATUS)).getCode());
assertDoesNotThrow(() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
public void testQueryAllGroupWithDefault() {
Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
List<String> workerGroups = workerGroupService.queryAllGroup(getLoginUser());
Assertions.assertEquals(0, workerGroups.size());
}

Expand Down
Loading