Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -82,6 +83,7 @@ public Map<String, String> fileToken(@PathVariable("connId") int connId,
"load.upload.file.duplicate-name");
Map<String, String> tokens = new HashMap<>();
for (String fileName : fileNames) {
this.checkFileNameValid(fileName);
String token = this.service.generateFileToken(fileName);
Ex.check(!this.uploadingTokenLocks().containsKey(token),
"load.upload.file.token.existed");
Expand All @@ -96,13 +98,19 @@ public FileUploadResult upload(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("file") MultipartFile file,
@RequestParam("name") String fileName,
@RequestParam(value = "size",
required = false)
Long fileSize,
@RequestParam("token") String token,
@RequestParam("total") int total,
@RequestParam("index") int index) {
Comment thread
peakxy marked this conversation as resolved.
this.checkTotalAndIndexValid(total, index);
this.checkFileNameValid(fileName);
this.checkFileNameMatchToken(fileName, token);
JobManager jobEntity = this.jobService.get(jobId);
this.checkFileValid(connId, jobId, jobEntity, file, fileName);
Long sourceFileSize = this.resolveSourceFileSize(file, fileSize,
total, index);
this.checkFileValid(jobId, jobEntity, file, fileName);
if (jobEntity.getJobStatus() == JobStatus.DEFAULT) {
jobEntity.setJobStatus(JobStatus.UPLOADING);
this.jobService.update(jobEntity);
Expand All @@ -123,49 +131,67 @@ public FileUploadResult upload(@PathVariable("connId") int connId,

lock.readLock().lock();
try {
FileMapping reservedMapping;
synchronized (this.service) {
reservedMapping = this.reserveUploadQuota(connId, jobId,
fileName, filePath,
sourceFileSize);
}
result = this.service.uploadFile(file, index, filePath);
if (result.getStatus() == FileUploadResult.Status.FAILURE) {
return result;
}
synchronized (this.service) {
// Verify the existence of fragmented files
FileMapping mapping = this.service.get(connId, jobId, fileName);
if (mapping == null) {
mapping = new FileMapping(connId, fileName, filePath);
mapping.setJobId(jobId);
mapping.setFileStatus(FileMappingStatus.UPLOADING);
this.service.save(mapping);
} else {
if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) {
result.setId(mapping.getId());
// Remove uploading file token
this.uploadingTokenLocks().remove(token);
return result;
} else {
mapping.setUpdateTime(HubbleUtil.nowDate());
}
mapping = reservedMapping;
}
Ex.check(mapping != null, "load.file-mapping.not-exist.name",
fileName);
if (mapping.getFileStatus() == FileMappingStatus.COMPLETED) {
result.setId(mapping.getId());
this.uploadingTokenLocks().remove(token);
return result;
}
mapping.setUpdateTime(HubbleUtil.nowDate());
// Determine whether all the parts have been uploaded, then merge them
boolean merged = this.service.tryMergePartFiles(filePath, total);
if (!merged) {
this.service.update(mapping);
return result;
}
JobManager currentJob = this.jobService.get(jobId);
long actualFileSize;
try {
actualFileSize = this.resolveUploadedFileSize(
mapping.getPath());
Ex.check(currentJob != null, "job-manager.not-exist.id",
jobId);
long reservedUploadingSize =
this.sumReservedUploadingSize(jobId,
mapping.getId());
this.checkFileSizeLimit(actualFileSize,
currentJob.getJobSize(),
reservedUploadingSize);
} catch (RuntimeException e) {
this.cleanupFailedUpload(mapping, token);
throw e;
}
// Read column names and values then fill it
this.service.extractColumns(mapping);
mapping.setFileStatus(FileMappingStatus.COMPLETED);
mapping.setTotalLines(FileUtil.countLines(mapping.getPath()));
mapping.setTotalSize(FileUtils.sizeOf(new File(mapping.getPath())));
mapping.setTotalSize(actualFileSize);

// Move to the directory corresponding to the file mapping Id
String newPath = this.service.moveToNextLevelDir(mapping);
// Update file mapping stored path
mapping.setPath(newPath);
this.service.update(mapping);
// Update Job Manager size
long jobSize = jobEntity.getJobSize() + mapping.getTotalSize();
jobEntity.setJobSize(jobSize);
this.jobService.update(jobEntity);
long jobSize = currentJob.getJobSize() + mapping.getTotalSize();
currentJob.setJobSize(jobSize);
this.jobService.update(currentJob);
result.setId(mapping.getId());
// Remove uploading file token
this.uploadingTokenLocks().remove(token);
Expand All @@ -181,6 +207,7 @@ public Boolean delete(@PathVariable("connId") int connId,
@PathVariable("jobId") int jobId,
@RequestParam("name") String fileName,
@RequestParam("token") String token) {
this.checkFileNameValid(fileName);
JobManager jobEntity = this.jobService.get(jobId);
Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobStatus.UPLOADING ||
Expand Down Expand Up @@ -243,7 +270,16 @@ private void checkFileNameMatchToken(String fileName, String token) {
"load.upload.file.name-token.unmatch");
}

private void checkFileValid(int connId, int jobId, JobManager jobEntity,
private void checkFileNameValid(String fileName) {
Ex.check(StringUtils.isNotBlank(fileName) &&
fileName.equals(FilenameUtils.getName(fileName)) &&
!fileName.contains("/") &&
!fileName.contains("\\") &&
!fileName.contains(".."),
"load.upload.file.name.invalid");
}

private void checkFileValid(int jobId, JobManager jobEntity,
MultipartFile file, String fileName) {
Ex.check(jobEntity != null, "job-manager.not-exist.id", jobId);
Ex.check(jobEntity.getJobStatus() == JobStatus.DEFAULT ||
Expand All @@ -261,31 +297,154 @@ private void checkFileValid(int connId, int jobId, JobManager jobEntity,
HubbleOptions.UPLOAD_FILE_FORMAT_LIST);
Ex.check(formatWhiteList.contains(format),
"load.upload.file.format.unsupported");
}

private FileMapping reserveUploadQuota(int connId, int jobId,
String fileName, String filePath,
Long sourceFileSize) {
JobManager currentJob = this.jobService.get(jobId);
Ex.check(currentJob != null, "job-manager.not-exist.id", jobId);

FileMapping mapping = this.service.get(connId, jobId, fileName);
Ex.check(mapping == null ||
mapping.getFileStatus() == FileMappingStatus.UPLOADING,
"load.upload.file.existed", fileName);

long reservedFileSize = this.resolveReservedFileSize(mapping,
sourceFileSize);
Integer mappingId = mapping == null ? null : mapping.getId();
long reservedUploadingSize = this.sumReservedUploadingSize(jobId,
mappingId);
this.checkFileSizeLimit(reservedFileSize, currentJob.getJobSize(),
reservedUploadingSize);

if (mapping == null) {
mapping = new FileMapping(connId, fileName, filePath);
mapping.setJobId(jobId);
this.fillUploadingReservation(mapping, reservedFileSize);
this.service.save(mapping);
return mapping;
}

mapping.setPath(filePath);
this.fillUploadingReservation(mapping, reservedFileSize);
this.service.update(mapping);
return mapping;
}

private Long resolveSourceFileSize(MultipartFile file, Long fileSize,
int total, int index) {
if (total == 1) {
return file.getSize();
}
if (fileSize != null) {
return fileSize;
}
if (index == 0) {
return this.estimateChunkedFileSizeUpperBound(file.getSize(),
total);
}
return null;
}

private void checkFileSizeLimit(long fileSize, long currentJobSize) {
this.checkFileSizeLimit(fileSize, currentJobSize, 0L);
}

private void checkFileSizeLimit(long fileSize, long currentJobSize,
long reservedUploadingSize) {
Ex.check(fileSize > 0L, "load.upload.file.cannot-be-empty");

long fileSize = file.getSize();
long singleFileSizeLimit = this.config.get(
HubbleOptions.UPLOAD_SINGLE_FILE_SIZE_LIMIT);
Ex.check(fileSize <= singleFileSizeLimit,
"load.upload.file.exceed-single-size",
FileUtils.byteCountToDisplaySize(singleFileSizeLimit));

// Check is there a file with the same name
FileMapping oldMapping = this.service.get(connId, jobId, fileName);
Ex.check(oldMapping == null ||
oldMapping.getFileStatus() == FileMappingStatus.UPLOADING,
"load.upload.file.existed", fileName);

long totalFileSizeLimit = this.config.get(
HubbleOptions.UPLOAD_TOTAL_FILE_SIZE_LIMIT);
List<FileMapping> fileMappings = this.service.listAll();
long currentTotalSize = fileMappings.stream()
.map(FileMapping::getTotalSize)
.reduce(0L, (Long::sum));
Ex.check(fileSize + currentTotalSize <= totalFileSizeLimit,
"load.upload.file.exceed-single-size",
long totalReservedSize = this.safeAdd(this.safeAdd(fileSize,
currentJobSize),
reservedUploadingSize);
Ex.check(totalReservedSize <= totalFileSizeLimit,
"load.upload.file.exceed-total-size",
FileUtils.byteCountToDisplaySize(totalFileSizeLimit));
}

private long resolveUploadedFileSize(String filePath) {
File uploadedFile = this.service.requirePathUnderUploadRoot(filePath);
if (!uploadedFile.exists() || !uploadedFile.isFile()) {
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
throw new InternalException("The uploaded file '%s' is not ready " +
"for quota validation",
filePath);
}
return FileUtils.sizeOf(uploadedFile);
}

private void cleanupFailedUpload(FileMapping mapping, String token) {
this.uploadingTokenLocks().remove(token);
this.service.cleanupMappings(Collections.singletonList(mapping));
}

private long resolveReservedFileSize(FileMapping mapping,
Long sourceFileSize) {
long reservedFileSize = mapping == null ? 0L : mapping.getTotalSize();
if (sourceFileSize != null) {
return Math.max(reservedFileSize, sourceFileSize);
}
Ex.check(reservedFileSize > 0L,
"load.upload.file.size.missing-before-reserve");
return reservedFileSize;
}

private void fillUploadingReservation(FileMapping mapping,
long reservedFileSize) {
mapping.setFileStatus(FileMappingStatus.UPLOADING);
mapping.setTotalSize(reservedFileSize);
mapping.setUpdateTime(HubbleUtil.nowDate());
}

private long sumReservedUploadingSize(int jobId, Integer excludedMappingId) {
List<FileMapping> mappings = this.service.listByJob(jobId);
if (mappings == null || mappings.isEmpty()) {
return 0L;
}

long reservedUploadingSize = 0L;
for (FileMapping mapping : mappings) {
if (mapping == null ||
mapping.getFileStatus() != FileMappingStatus.UPLOADING) {
continue;
}
if (excludedMappingId != null &&
excludedMappingId.equals(mapping.getId())) {
continue;
}
if (mapping.getTotalSize() <= 0L) {
continue;
}
reservedUploadingSize = this.safeAdd(reservedUploadingSize,
mapping.getTotalSize());
}
return reservedUploadingSize;
}

private long estimateChunkedFileSizeUpperBound(long chunkSize, int total) {
try {
return Math.multiplyExact(chunkSize, (long) total);
} catch (ArithmeticException ignored) {
return Long.MAX_VALUE;
}
}

private long safeAdd(long left, long right) {
try {
return Math.addExact(left, right);
} catch (ArithmeticException ignored) {
return Long.MAX_VALUE;
}
}

private String generateFilePath(int connId, int jobId, String fileName) {
String location = this.config.get(HubbleOptions.UPLOAD_FILE_LOCATION);
String path = Paths.get(CONN_PREIFX + connId, JOB_PREIFX + jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,7 @@ public JobManager create(@PathVariable("connId") int connId,

@DeleteMapping("{id}")
public void delete(@PathVariable("id") int id) {
JobManager task = this.service.get(id);
if (task == null) {
throw new ExternalException("job.manager.not-exist.id", id);
}
this.service.remove(id);
this.service.deleteJob(id);
}
Comment thread
peakxy marked this conversation as resolved.

@GetMapping("{id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public class FileMapping {
@JsonProperty("update_time")
private Date updateTime;

@JsonProperty("total_size_bytes")
public long getTotalSizeBytes() {
return this.totalSize;
}

public FileMapping(int connId, String name, String path) {
this(connId, name, path, HubbleUtil.nowDate());
}
Expand Down
Loading
Loading