diff --git a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java index ea29b74d1..d019549a4 100644 --- a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java +++ b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/FileUploadController.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.nio.file.Paths; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,6 +83,7 @@ public Map fileToken(@PathVariable("connId") int connId, "load.upload.file.duplicate-name"); Map tokens = new HashMap<>(); for (String fileName : fileNames) { + this.checkFileNameValid(fileName); String token = this.service.generateFileToken(fileName); Ex.check(!this.uploadingTokenLocks().containsKey(token), "load.upload.file.token.existed"); @@ -96,13 +98,19 @@ public FileUploadResult upload(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestParam("file") MultipartFile file, @RequestParam("name") String fileName, + @RequestParam(value = "size", + required = false) + Long fileSize, @RequestParam("token") String token, @RequestParam("total") int total, @RequestParam("index") int index) { this.checkTotalAndIndexValid(total, index); + this.checkFileNameValid(fileName); this.checkFileNameMatchToken(fileName, token); JobManager jobEntity = this.jobService.get(jobId); - this.checkFileValid(connId, jobId, jobEntity, file, fileName); + Long sourceFileSize = this.resolveSourceFileSize(file, fileSize, + total, index); + this.checkFileValid(jobId, jobEntity, file, fileName); if (jobEntity.getJobStatus() == JobStatus.DEFAULT) { jobEntity.setJobStatus(JobStatus.UPLOADING); this.jobService.update(jobEntity); @@ -123,39 +131,57 @@ public FileUploadResult upload(@PathVariable("connId") int connId, lock.readLock().lock(); try { + FileMapping reservedMapping; + synchronized (this.service) { + reservedMapping = this.reserveUploadQuota(connId, jobId, + fileName, filePath, + sourceFileSize); + } result = this.service.uploadFile(file, index, filePath); if (result.getStatus() == FileUploadResult.Status.FAILURE) { return result; } synchronized (this.service) { - // Verify the existence of fragmented files FileMapping mapping = this.service.get(connId, jobId, fileName); if (mapping == null) { - mapping = new FileMapping(connId, fileName, filePath); - mapping.setJobId(jobId); - mapping.setFileStatus(FileMappingStatus.UPLOADING); - this.service.save(mapping); - } else { - if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) { - result.setId(mapping.getId()); - // Remove uploading file token - this.uploadingTokenLocks().remove(token); - return result; - } else { - mapping.setUpdateTime(HubbleUtil.nowDate()); - } + mapping = reservedMapping; + } + Ex.check(mapping != null, "load.file-mapping.not-exist.name", + fileName); + if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) { + result.setId(mapping.getId()); + this.uploadingTokenLocks().remove(token); + return result; } + mapping.setUpdateTime(HubbleUtil.nowDate()); // Determine whether all the parts have been uploaded, then merge them boolean merged = this.service.tryMergePartFiles(filePath, total); if (!merged) { this.service.update(mapping); return result; } + JobManager currentJob = this.jobService.get(jobId); + long actualFileSize; + try { + actualFileSize = this.resolveUploadedFileSize( + mapping.getPath()); + Ex.check(currentJob != null, "job-manager.not-exist.id", + jobId); + long reservedUploadingSize = + this.sumReservedUploadingSize(jobId, + mapping.getId()); + this.checkFileSizeLimit(actualFileSize, + currentJob.getJobSize(), + reservedUploadingSize); + } catch (RuntimeException e) { + this.cleanupFailedUpload(mapping, token); + throw e; + } // Read column names and values then fill it this.service.extractColumns(mapping); mapping.setFileStatus(FileMappingStatus.COMPLETED); mapping.setTotalLines(FileUtil.countLines(mapping.getPath())); - mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath()))); + mapping.setTotalSize(actualFileSize); // Move to the directory corresponding to the file mapping Id String newPath = this.service.moveToNextLevelDir(mapping); @@ -163,9 +189,9 @@ public FileUploadResult upload(@PathVariable("connId") int connId, mapping.setPath(newPath); this.service.update(mapping); // Update Job Manager size - long jobSize = jobEntity.getJobSize() + mapping.getTotalSize(); - jobEntity.setJobSize(jobSize); - this.jobService.update(jobEntity); + long jobSize = currentJob.getJobSize() + mapping.getTotalSize(); + currentJob.setJobSize(jobSize); + this.jobService.update(currentJob); result.setId(mapping.getId()); // Remove uploading file token this.uploadingTokenLocks().remove(token); @@ -181,6 +207,7 @@ public Boolean delete(@PathVariable("connId") int connId, @PathVariable("jobId") int jobId, @RequestParam("name") String fileName, @RequestParam("token") String token) { + this.checkFileNameValid(fileName); JobManager jobEntity = this.jobService.get(jobId); Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobStatus.UPLOADING || @@ -243,7 +270,16 @@ private void checkFileNameMatchToken(String fileName, String token) { "load.upload.file.name-token.unmatch"); } - private void checkFileValid(int connId, int jobId, JobManager jobEntity, + private void checkFileNameValid(String fileName) { + Ex.check(StringUtils.isNotBlank(fileName) && + fileName.equals(FilenameUtils.getName(fileName)) && + !fileName.contains("/") && + !fileName.contains("\\") && + !fileName.contains(".."), + "load.upload.file.name.invalid"); + } + + private void checkFileValid(int jobId, JobManager jobEntity, MultipartFile file, String fileName) { Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId); Ex.check(jobEntity.getJobStatus() == JobStatus.DEFAULT || @@ -261,31 +297,154 @@ private void checkFileValid(int connId, int jobId, JobManager jobEntity, HubbleOptions.UPLOAD_FILE_FORMAT_LIST); Ex.check(formatWhiteList.contains(format), "load.upload.file.format.unsupported"); + } + + private FileMapping reserveUploadQuota(int connId, int jobId, + String fileName, String filePath, + Long sourceFileSize) { + JobManager currentJob = this.jobService.get(jobId); + Ex.check(currentJob != null, "job-manager.not-exist.id", jobId); + + FileMapping mapping = this.service.get(connId, jobId, fileName); + Ex.check(mapping == null || + mapping.getFileStatus() == FileMappingStatus.UPLOADING, + "load.upload.file.existed", fileName); + + long reservedFileSize = this.resolveReservedFileSize(mapping, + sourceFileSize); + Integer mappingId = mapping == null ? null : mapping.getId(); + long reservedUploadingSize = this.sumReservedUploadingSize(jobId, + mappingId); + this.checkFileSizeLimit(reservedFileSize, currentJob.getJobSize(), + reservedUploadingSize); + + if (mapping == null) { + mapping = new FileMapping(connId, fileName, filePath); + mapping.setJobId(jobId); + this.fillUploadingReservation(mapping, reservedFileSize); + this.service.save(mapping); + return mapping; + } + + mapping.setPath(filePath); + this.fillUploadingReservation(mapping, reservedFileSize); + this.service.update(mapping); + return mapping; + } + + private Long resolveSourceFileSize(MultipartFile file, Long fileSize, + int total, int index) { + if (total == 1) { + return file.getSize(); + } + if (fileSize != null) { + return fileSize; + } + if (index == 0) { + return this.estimateChunkedFileSizeUpperBound(file.getSize(), + total); + } + return null; + } + + private void checkFileSizeLimit(long fileSize, long currentJobSize) { + this.checkFileSizeLimit(fileSize, currentJobSize, 0L); + } + + private void checkFileSizeLimit(long fileSize, long currentJobSize, + long reservedUploadingSize) { + Ex.check(fileSize > 0L, "load.upload.file.cannot-be-empty"); - long fileSize = file.getSize(); long singleFileSizeLimit = this.config.get( HubbleOptions.UPLOAD_SINGLE_FILE_SIZE_LIMIT); Ex.check(fileSize <= singleFileSizeLimit, "load.upload.file.exceed-single-size", FileUtils.byteCountToDisplaySize(singleFileSizeLimit)); - // Check is there a file with the same name - FileMapping oldMapping = this.service.get(connId, jobId, fileName); - Ex.check(oldMapping == null || - oldMapping.getFileStatus() == FileMappingStatus.UPLOADING, - "load.upload.file.existed", fileName); - long totalFileSizeLimit = this.config.get( HubbleOptions.UPLOAD_TOTAL_FILE_SIZE_LIMIT); - List fileMappings = this.service.listAll(); - long currentTotalSize = fileMappings.stream() - .map(FileMapping::getTotalSize) - .reduce(0L, (Long::sum)); - Ex.check(fileSize + currentTotalSize <= totalFileSizeLimit, - "load.upload.file.exceed-single-size", + long totalReservedSize = this.safeAdd(this.safeAdd(fileSize, + currentJobSize), + reservedUploadingSize); + Ex.check(totalReservedSize <= totalFileSizeLimit, + "load.upload.file.exceed-total-size", FileUtils.byteCountToDisplaySize(totalFileSizeLimit)); } + private long resolveUploadedFileSize(String filePath) { + File uploadedFile = this.service.requirePathUnderUploadRoot(filePath); + if (!uploadedFile.exists() || !uploadedFile.isFile()) { + throw new InternalException("The uploaded file '%s' is not ready " + + "for quota validation", + filePath); + } + return FileUtils.sizeOf(uploadedFile); + } + + private void cleanupFailedUpload(FileMapping mapping, String token) { + this.uploadingTokenLocks().remove(token); + this.service.cleanupMappings(Collections.singletonList(mapping)); + } + + private long resolveReservedFileSize(FileMapping mapping, + Long sourceFileSize) { + long reservedFileSize = mapping == null ? 0L : mapping.getTotalSize(); + if (sourceFileSize != null) { + return Math.max(reservedFileSize, sourceFileSize); + } + Ex.check(reservedFileSize > 0L, + "load.upload.file.size.missing-before-reserve"); + return reservedFileSize; + } + + private void fillUploadingReservation(FileMapping mapping, + long reservedFileSize) { + mapping.setFileStatus(FileMappingStatus.UPLOADING); + mapping.setTotalSize(reservedFileSize); + mapping.setUpdateTime(HubbleUtil.nowDate()); + } + + private long sumReservedUploadingSize(int jobId, Integer excludedMappingId) { + List mappings = this.service.listByJob(jobId); + if (mappings == null || mappings.isEmpty()) { + return 0L; + } + + long reservedUploadingSize = 0L; + for (FileMapping mapping : mappings) { + if (mapping == null || + mapping.getFileStatus() != FileMappingStatus.UPLOADING) { + continue; + } + if (excludedMappingId != null && + excludedMappingId.equals(mapping.getId())) { + continue; + } + if (mapping.getTotalSize() <= 0L) { + continue; + } + reservedUploadingSize = this.safeAdd(reservedUploadingSize, + mapping.getTotalSize()); + } + return reservedUploadingSize; + } + + private long estimateChunkedFileSizeUpperBound(long chunkSize, int total) { + try { + return Math.multiplyExact(chunkSize, (long) total); + } catch (ArithmeticException ignored) { + return Long.MAX_VALUE; + } + } + + private long safeAdd(long left, long right) { + try { + return Math.addExact(left, right); + } catch (ArithmeticException ignored) { + return Long.MAX_VALUE; + } + } + private String generateFilePath(int connId, int jobId, String fileName) { String location = this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION); String path = Paths.get(CONN_PREIFX + connId, JOB_PREIFX + jobId) diff --git a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java index 5bb90526d..12ccaa414 100644 --- a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java +++ b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/controller/load/JobManagerController.java @@ -104,11 +104,7 @@ public JobManager create(@PathVariable("connId") int connId, @DeleteMapping("{id}") public void delete(@PathVariable("id") int id) { - JobManager task = this.service.get(id); - if (task == null) { - throw new ExternalException("job.manager.not-exist.id", id); - } - this.service.remove(id); + this.service.deleteJob(id); } @GetMapping("{id}") diff --git a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java index 53f8a525f..5f23c2135 100644 --- a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java +++ b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/entity/load/FileMapping.java @@ -116,6 +116,11 @@ public class FileMapping { @JsonProperty("update_time") private Date updateTime; + @JsonProperty("total_size_bytes") + public long getTotalSizeBytes() { + return this.totalSize; + } + public FileMapping(int connId, String name, String path) { this(connId, name, path, HubbleUtil.nowDate()); } diff --git a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java index e574ad797..f174c890c 100644 --- a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java +++ b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/FileMappingService.java @@ -27,15 +27,21 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -45,8 +51,10 @@ import org.apache.hugegraph.entity.load.FileMapping; import org.apache.hugegraph.entity.load.FileSetting; import org.apache.hugegraph.entity.load.FileUploadResult; +import org.apache.hugegraph.entity.load.JobManager; import org.apache.hugegraph.exception.InternalException; import org.apache.hugegraph.mapper.load.FileMappingMapper; +import org.apache.hugegraph.mapper.load.JobManagerMapper; import org.apache.hugegraph.options.HubbleOptions; import org.apache.hugegraph.util.Ex; import org.apache.hugegraph.util.HubbleUtil; @@ -78,6 +86,8 @@ public class FileMappingService { private HugeConfig config; @Autowired private FileMappingMapper mapper; + @Autowired + private JobManagerMapper jobManagerMapper; private final Map uploadingTokenLocks; @@ -105,6 +115,12 @@ public List listAll() { return this.mapper.selectList(null); } + public List listByJob(int jobId) { + QueryWrapper query = Wrappers.query(); + query.eq("job_id", jobId); + return this.mapper.selectList(query); + } + public IPage list(int connId, int jobId, int pageNo, int pageSize) { QueryWrapper query = Wrappers.query(); query.eq("conn_id", connId); @@ -137,7 +153,7 @@ public void remove(int id) { } public String generateFileToken(String fileName) { - return HubbleUtil.md5(fileName) + "-" + + return this.fileTokenPrefix(fileName) + HubbleUtil.nowTime().getEpochSecond(); } @@ -238,7 +254,7 @@ public boolean tryMergePartFiles(String dirPath, int total) { } public void extractColumns(FileMapping mapping) { - File file = FileUtils.getFile(mapping.getPath()); + File file = this.requirePathUnderUploadRoot(mapping.getPath()); BufferedReader reader; try { reader = new BufferedReader(new FileReader(file)); @@ -289,7 +305,7 @@ public void extractColumns(FileMapping mapping) { } public String moveToNextLevelDir(FileMapping mapping) { - File currFile = new File(mapping.getPath()); + File currFile = this.requirePathUnderUploadRoot(mapping.getPath()); String destPath = Paths.get(currFile.getParentFile().getPath(), FILE_PREIFX + mapping.getId()) .toString(); @@ -305,31 +321,32 @@ public String moveToNextLevelDir(FileMapping mapping) { } public void deleteDiskFile(FileMapping mapping) { - File file = new File(mapping.getPath()); + File file = this.requirePathUnderUploadRoot(mapping.getPath()); if (file.isDirectory()) { - log.info("Prepare to delete directory {}", file); - try { - FileUtils.forceDelete(file); - } catch (IOException e) { - throw new InternalException("Failed to delete directory " + - "corresponded to the file id %s, " + - "please delete it manually", - e, mapping.getId()); - } + this.deletePathIfExists(file, mapping.getId()); } else { File parentDir = file.getParentFile(); - log.info("Prepare to delete directory {}", parentDir); - try { - FileUtils.forceDelete(parentDir); - } catch (IOException e) { - throw new InternalException("Failed to delete parent directory " + - "corresponded to the file id %s, " + - "please delete it manually", - e, mapping.getId()); + if (parentDir == null) { + log.info("Skip deleting file mapping {} because {} has no " + + "parent directory", mapping.getId(), mapping.getPath()); + return; } + this.deletePathIfExists(parentDir, mapping.getId()); + } + } + + public void cleanupMappings(List mappings) { + for (FileMapping mapping : mappings) { + this.tryCleanupMapping(mapping); } } + @Async + @Scheduled(fixedRate = 10 * 60 * 1000) + public void deleteOrphanedJobFiles() { + this.cleanupMappings(this.listOrphanedJobFiles()); + } + @Async @Scheduled(fixedRate = 10 * 60 * 1000) public void deleteUnfinishedFile() { @@ -343,24 +360,136 @@ public void deleteUnfinishedFile() { Date updateTime = mapping.getUpdateTime(); long duration = now.getTime() - updateTime.getTime(); if (duration > threshold) { - String filePath = mapping.getPath(); - try { - FileUtils.forceDelete(new File(filePath)); - } catch (IOException e) { - log.warn("Failed to delete expired uploading file {}", - filePath, e); - } - this.remove(mapping.getId()); - // Delete corresponding uploading tokens - Iterator> iter; - iter = this.uploadingTokenLocks.entrySet().iterator(); - iter.forEachRemaining(entry -> { - String token = entry.getKey(); - if (token.startsWith(mapping.getName())) { - iter.remove(); - } - }); + this.tryDeleteUnfinishedMapping(mapping); } } } + + public File requirePathUnderUploadRoot(String filePath) { + return this.requirePathUnderUploadRoot(new File(filePath)); + } + + private void deletePathIfExists(File path, int mappingId) { + File safePath = this.requirePathUnderUploadRoot(path); + if (!safePath.exists()) { + log.info("Skip deleting path {} for mapping {} because it no " + + "longer exists", safePath, mappingId); + return; + } + + log.info("Prepare to delete directory {}", safePath); + try { + FileUtils.forceDelete(safePath); + } catch (IOException e) { + throw new InternalException("Failed to delete directory " + + "corresponded to the file id %s, " + + "please delete it manually", + e, mappingId); + } + } + + private List listOrphanedJobFiles() { + List mappings = this.mapper.selectList(null); + if (mappings.isEmpty()) { + return Collections.emptyList(); + } + + Set jobIds = mappings.stream() + .map(FileMapping::getJobId) + .filter(jobId -> jobId != null) + .collect(Collectors.toSet()); + if (jobIds.isEmpty()) { + return new ArrayList<>(mappings); + } + + List jobs = this.jobManagerMapper.selectBatchIds(jobIds); + Set existingJobIds = jobs.stream() + .map(JobManager::getId) + .collect(Collectors.toCollection( + HashSet::new)); + return mappings.stream() + .filter(mapping -> mapping.getJobId() == null || + !existingJobIds.contains( + mapping.getJobId())) + .collect(Collectors.toList()); + } + + private void tryCleanupMapping(FileMapping mapping) { + try { + this.deleteDiskFile(mapping); + this.removeCleanupRecord(mapping.getId()); + } catch (RuntimeException e) { + log.warn("Failed to cleanup disk file for mapping {} at {}", + mapping.getId(), mapping.getPath(), e); + } + } + + private void removeCleanupRecord(int mappingId) { + int deleted = this.mapper.deleteById(mappingId); + if (deleted == 1) { + return; + } + if (deleted == 0) { + log.info("Skip removing file mapping {} because it no longer " + + "exists", mappingId); + return; + } + throw new InternalException("entity.delete.failed", mappingId); + } + + private File requirePathUnderUploadRoot(File file) { + Path uploadRootPath = this.normalizePath(new File( + this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION))); + Path targetPath = this.normalizePath(file); + if (!targetPath.startsWith(uploadRootPath)) { + throw new InternalException("load.upload.file.path.outside-root", + targetPath, uploadRootPath); + } + return targetPath.toFile(); + } + + private Path normalizePath(File file) { + Path path = file.toPath(); + try { + if (file.exists()) { + return path.toRealPath(); + } + } catch (IOException e) { + throw new InternalException("Failed to resolve upload path '%s'", + e, file); + } + return path.toAbsolutePath().normalize(); + } + + private void tryDeleteUnfinishedMapping(FileMapping mapping) { + String filePath = mapping.getPath(); + try { + FileUtils.forceDelete(this.requirePathUnderUploadRoot(filePath)); + } catch (IOException e) { + log.warn("Failed to delete expired uploading file {}", + filePath, e); + } catch (RuntimeException e) { + log.warn("Skip deleting expired uploading file {} because the " + + "path is invalid", filePath, e); + return; + } + this.remove(mapping.getId()); + this.removeUploadingTokens(mapping.getName()); + } + + private void removeUploadingTokens(String fileName) { + String tokenPrefix = this.fileTokenPrefix(fileName); + Iterator> iter; + iter = this.uploadingTokenLocks.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry entry = iter.next(); + if (entry.getKey().startsWith(tokenPrefix)) { + iter.remove(); + } + } + } + + private String fileTokenPrefix(String fileName) { + return HubbleUtil.md5(fileName) + "-"; + } } diff --git a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java index 6e337421a..eb085c390 100644 --- a/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java +++ b/hugegraph-hubble/hubble-be/src/main/java/org/apache/hugegraph/service/load/JobManagerService.java @@ -18,13 +18,16 @@ package org.apache.hugegraph.service.load; +import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.hugegraph.entity.enums.JobStatus; import org.apache.hugegraph.entity.enums.LoadStatus; +import org.apache.hugegraph.entity.load.FileMapping; import org.apache.hugegraph.entity.load.JobManager; import org.apache.hugegraph.entity.load.LoadTask; +import org.apache.hugegraph.exception.ExternalException; import org.apache.hugegraph.exception.InternalException; import org.apache.hugegraph.mapper.load.JobManagerMapper; import org.apache.hugegraph.util.HubbleUtil; @@ -32,6 +35,8 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -48,6 +53,8 @@ public class JobManagerService { private JobManagerMapper mapper; @Autowired private LoadTaskService taskService; + @Autowired + private FileMappingService fileMappingService; public int count() { return this.mapper.selectCount(null); @@ -132,4 +139,49 @@ public void remove(int id) { throw new InternalException("entity.delete.failed", id); } } + + @Transactional(isolation = Isolation.READ_COMMITTED) + public void deleteJob(int id) { + JobManager job = this.get(id); + if (job == null) { + throw new ExternalException("job.manager.not-exist.id", id); + } + + List loadTasks = this.taskService.taskListByJob(id); + for (LoadTask loadTask : loadTasks) { + if (loadTask.getStatus().inRunning() || + loadTask.getStatus() == LoadStatus.PAUSED) { + this.taskService.stop(loadTask.getId()); + } + this.taskService.remove(loadTask.getId()); + } + + List mappings = this.fileMappingService.listByJob(id); + this.remove(id); + this.deleteDiskFilesAfterCommit(mappings); + } + + private void deleteDiskFilesAfterCommit(List mappings) { + if (mappings.isEmpty()) { + return; + } + + List copiedMappings = new ArrayList<>(mappings); + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + this.deleteDiskFiles(copiedMappings); + return; + } + + TransactionSynchronizationManager.registerSynchronization( + new TransactionSynchronization() { + @Override + public void afterCommit() { + deleteDiskFiles(copiedMappings); + } + }); + } + + private void deleteDiskFiles(List mappings) { + this.fileMappingService.cleanupMappings(mappings); + } } diff --git a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties index 117b297fa..ddbb082d4 100644 --- a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties +++ b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages.properties @@ -135,8 +135,11 @@ load.upload.file.duplicate-name=Don't allow duplicate file names to obtain token load.upload.file.name-token.unmatch=The upload file name doesn't match with token load.upload.files.cannot-dup=The upload files can't contain duplicates load.upload.file.exceed-single-size=The upload file has exceeded single limit size {0} -load.upload.file.exceed-total-size=The upload file has exceeded total limit size {0} +load.upload.file.exceed-total-size=The upload files in current job exceed total size limit {0} load.upload.file.existed=The upload file {0} has existed +load.upload.file.name.invalid=The upload file name must be a plain file name +load.upload.file.path.outside-root=The upload path {0} is outside the configured upload directory {1} +load.upload.file.size.missing-before-reserve=The upload size must be reserved before accepting more chunks load.file-mapping.not-exist.id=The file doesn't exist with id {0} load.file-mapping.not-exist.name=The file doesn't exist with name {0} load.file-mapping.file-setting.delimiter-cannot-be-empty=The delimiter can't be null or empty diff --git a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties index 4ef261b57..7c333d384 100644 --- a/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties +++ b/hugegraph-hubble/hubble-be/src/main/resources/i18n/messages_zh_CN.properties @@ -135,8 +135,11 @@ load.upload.file.duplicate-name=不允许重复的文件名同时获取 token load.upload.file.name-token.unmatch=上传文件的名称与 token 不匹配 load.upload.files.cannot-dup=上传的文件不能包含重复的 load.upload.file.exceed-single-size=上传文件大小超过了限制 {0} -load.upload.file.exceed-total-size=上传文件总大小超过了限制 {0} +load.upload.file.exceed-total-size=当前导入任务中的上传文件总大小超过了限制 {0} load.upload.file.existed=上传的文件 {0} 已存在 +load.upload.file.name.invalid=上传文件名必须是纯文件名,不能包含路径信息 +load.upload.file.path.outside-root=上传路径 {0} 不在配置的上传目录 {1} 下 +load.upload.file.size.missing-before-reserve=服务端尚未为该文件预留大小,不能继续接收后续分片 load.file-mapping.not-exist.id=不存在 id 为 {0} 的文件 load.file-mapping.not-exist.name=不存在名为 {0} 的文件 load.file-mapping.file-setting.delimiter-cannot-be-empty=分隔符不能为空 diff --git a/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx b/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx index 84f1132d6..0449f24c9 100644 --- a/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx +++ b/hugegraph-hubble/hubble-fe/src/components/graph-management/data-import/import-tasks/UploadEntry.tsx @@ -35,6 +35,7 @@ import { CancellablePromise } from 'mobx/lib/api/flow'; import { DataImportRootStoreContext } from '../../../../stores'; import { useInitDataImport } from '../../../../hooks'; +import { isCurrentJobUploadSizeExceeded } from '../../../../utils/dataImportUpload'; import type { FileUploadResult } from '../../../../stores/types/GraphManagementStore/dataImportStore'; @@ -207,11 +208,13 @@ export const FileDropZone: React.FC = observer(() => { }); } - const totalSize = filteredFiles - .map(({ size }) => size) - .reduce((prev, curr) => prev + curr, 0); - - if (totalSize / GB > 10) { + if ( + isCurrentJobUploadSizeExceeded( + dataMapStore.fileMapInfos, + dataImportRootStore.fileUploadTasks, + filteredFiles + ) + ) { Message.error({ content: `${t('upload-files.over-all-size-limit')}`, size: 'medium', diff --git a/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts b/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts index 451613e09..fe25499c8 100644 --- a/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts +++ b/hugegraph-hubble/hubble-fe/src/stores/GraphManagementStore/dataImportStore/dataImportRootStore.ts @@ -38,7 +38,7 @@ import { EdgeType, EdgeTypeListResponse } from '../../types/GraphManagementStore/metadataConfigsStore'; -import { checkIfLocalNetworkOffline } from '../../utils'; +import { checkIfLocalNetworkOffline, getErrorMessage } from '../../utils'; const MAX_CONCURRENT_UPLOAD = 5; @@ -79,6 +79,20 @@ export class DataImportRootStore { return this.fileUploadTasks.filter(({ status }) => status !== 'success'); } + findFileUploadTask(fileName: string) { + return this.fileUploadTasks.find(({ name }) => name === fileName); + } + + getRequiredFileUploadTask(fileName: string) { + const fileUploadTask = this.findFileUploadTask(fileName); + + if (isUndefined(fileUploadTask)) { + throw new Error(`Upload task '${fileName}' not found`); + } + + return fileUploadTask; + } + @action setCurrentId(id: number) { this.currentId = id; @@ -134,9 +148,7 @@ export class DataImportRootStore { value: FileUploadTask[T], fileName: string ) { - const fileUploadTask = this.fileUploadTasks.find( - ({ name }) => name === fileName - )!; + const fileUploadTask = this.findFileUploadTask(fileName); // users may click back button in browser if (!isUndefined(fileUploadTask)) { @@ -206,9 +218,10 @@ export class DataImportRootStore { this.fileHashes = { ...this.fileHashes, ...result.data.data }; this.requestStatus.fetchFilehashes = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.fetchFilehashes = 'failed'; - this.errorInfo.fetchFilehashes.message = error.message; - console.error(error.message); + this.errorInfo.fetchFilehashes.message = errorMessage; + console.error(errorMessage); } }); @@ -236,9 +249,10 @@ export class DataImportRootStore { } try { + const fileSize = this.getRequiredFileUploadTask(fileName).size; const result: AxiosResponse> = yield axios .post>( - `${baseUrl}/${this.currentId}/job-manager/${this.currentJobId}/upload-file?total=${fileChunkTotal}&index=${fileChunkList.chunkIndex}&name=${fileName}&token=${this.fileHashes[fileName]}`, + `${baseUrl}/${this.currentId}/job-manager/${this.currentJobId}/upload-file?total=${fileChunkTotal}&index=${fileChunkList.chunkIndex}&name=${fileName}&size=${fileSize}&token=${this.fileHashes[fileName]}`, formData, { headers: { @@ -257,9 +271,10 @@ export class DataImportRootStore { this.requestStatus.uploadFiles = 'success'; return result.data.data; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.uploadFiles = 'failed'; - this.errorInfo.uploadFiles.message = error.message; - console.error(error.message); + this.errorInfo.uploadFiles.message = errorMessage; + console.error(errorMessage); } }); @@ -283,9 +298,10 @@ export class DataImportRootStore { this.requestStatus.deleteFiles = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.deleteFiles = 'failed'; - this.errorInfo.deleteFiles.message = error.message; - console.error(error.message); + this.errorInfo.deleteFiles.message = errorMessage; + console.error(errorMessage); } }); @@ -308,9 +324,10 @@ export class DataImportRootStore { this.requestStatus.sendUploadCompleteSignal = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.sendUploadCompleteSignal = 'failed'; - this.errorInfo.sendUploadCompleteSignal.message = error.message; - console.error(error.message); + this.errorInfo.sendUploadCompleteSignal.message = errorMessage; + console.error(errorMessage); } }); @@ -333,9 +350,10 @@ export class DataImportRootStore { this.requestStatus.sendMappingCompleteSignal = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.sendMappingCompleteSignal = 'failed'; - this.errorInfo.sendMappingCompleteSignal.message = error.message; - console.error(error.message); + this.errorInfo.sendMappingCompleteSignal.message = errorMessage; + console.error(errorMessage); } }); @@ -366,8 +384,9 @@ export class DataImportRootStore { this.vertexTypes = result.data.data.records; this.requestStatus.fetchVertexTypeList = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.fetchVertexTypeList = 'failed'; - this.errorInfo.fetchVertexTypeList.message = error.message; + this.errorInfo.fetchVertexTypeList.message = errorMessage; } }); @@ -398,8 +417,9 @@ export class DataImportRootStore { this.edgeTypes = result.data.data.records; this.requestStatus.fetchEdgeTypeList = 'success'; } catch (error) { + const errorMessage = getErrorMessage(error); this.requestStatus.fetchEdgeTypeList = 'failed'; - this.errorInfo.fetchEdgeTypeList.message = error.message; + this.errorInfo.fetchEdgeTypeList.message = errorMessage; } }); } diff --git a/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts b/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts index fbeb8c034..28a1d81b1 100644 --- a/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts +++ b/hugegraph-hubble/hubble-fe/src/stores/types/GraphManagementStore/dataImportStore.ts @@ -148,6 +148,7 @@ export interface FileMapInfo { name: string; total_lines: number; total_size: string; + total_size_bytes: number; file_setting: FileConfig; file_status: string; vertex_mappings: VertexMap[]; diff --git a/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts b/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts index a2d2c490c..36e820177 100644 --- a/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts +++ b/hugegraph-hubble/hubble-fe/src/stores/utils/index.ts @@ -52,10 +52,44 @@ export const edgeWidthMapping: Record = { /* functions */ -export function checkIfLocalNetworkOffline(error: any) { - if (error.request) { - throw new Error(i18next.t('addition.store.network-error')); +export function getErrorMessage(error: unknown) { + if (error instanceof Error) { + return error.message; } + + if (typeof error === 'string') { + return error; + } + + if ( + typeof error === 'object' && + error !== null && + 'message' in error + ) { + const errorWithMessage = error as { message?: unknown }; + + if (typeof errorWithMessage.message === 'string') { + return errorWithMessage.message; + } + } + + return String(error); +} + +export function checkIfLocalNetworkOffline(error: unknown) { + if ( + typeof error === 'object' && + error !== null && + 'request' in error + ) { + const errorWithRequest = error as { request?: unknown }; + + if (!isUndefined(errorWithRequest.request)) { + throw new Error(i18next.t('addition.store.network-error')); + } + } + + throw error; } export function mapMetadataProperties( diff --git a/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts b/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts new file mode 100644 index 000000000..a36467b3c --- /dev/null +++ b/hugegraph-hubble/hubble-fe/src/utils/dataImportUpload.ts @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +const GB = 1024 * 1024 * 1024; + +type UploadedFile = { + name: string; + total_size_bytes: number; +}; + +type LocalUploadTask = { + name: string; + size: number; +}; + +export const getCurrentJobUploadSize = ( + uploadedFiles: UploadedFile[], + localUploadTasks: LocalUploadTask[] +) => { + const uploadedFileNames = uploadedFiles.map(({ name }) => name); + const uploadedFileSize = uploadedFiles.reduce( + (sum, { total_size_bytes }) => sum + total_size_bytes, + 0 + ); + const pendingFileSize = localUploadTasks + .filter(({ name }) => !uploadedFileNames.includes(name)) + .reduce((sum, { size }) => sum + size, 0); + + return uploadedFileSize + pendingFileSize; +}; + +export const isCurrentJobUploadSizeExceeded = ( + uploadedFiles: UploadedFile[], + localUploadTasks: LocalUploadTask[], + selectedFiles: File[], + totalLimit = 10 * GB +) => { + const selectedFileSize = selectedFiles.reduce( + (sum, { size }) => sum + size, + 0 + ); + + return ( + getCurrentJobUploadSize(uploadedFiles, localUploadTasks) + selectedFileSize > + totalLimit + ); +};