Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 237 additions & 24 deletions api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
get_user_pdf_folder_path,
get_user_traceability_scanner_config,
is_safe_local_user_file_path,
is_safe_user_path,
is_testing_enabled_by_env,
load_settings,
parse_int,
Expand Down Expand Up @@ -146,6 +147,7 @@
if not os.path.exists(USER_FILES_BASE_DIR):
os.makedirs(USER_FILES_BASE_DIR, exist_ok=True)


# Read API Version once
# API Version is not supposed to change runtime
API_VERSION = ""
Expand Down Expand Up @@ -956,6 +958,8 @@ def get_query_string_args(args):
"test-case-id",
"test_run_config_id",
"test_runs_limit",
"path",
"recursive",
"token",
"url",
"user-id",
Expand Down Expand Up @@ -9707,27 +9711,68 @@ def get(self, api_response: ApiResponse = None):
api_response.set_data(ret)
return api_response.return_ok()

i = 0
for user_file in os.listdir(user_files_path):
sub_path = request_data.get("path", "")
recursive = str(request_data.get("recursive", "")).lower() == "true"

# Skip hidden files
if user_file.startswith("."):
continue
if sub_path:
listing_path = os.path.join(user_files_path, sub_path)
else:
listing_path = user_files_path

tmp = {
"index": i,
"filepath": os.path.join(user_files_path, user_file),
"updated_at": time.ctime(os.path.getmtime(os.path.join(user_files_path, user_file))),
}
if not is_safe_user_path(user_files_path, listing_path):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if not os.path.isdir(listing_path):
api_response.set_message("Directory not found")
return api_response.return_not_found()

if "filter" in request_data.keys():
if str(request_data["filter"]).lower() not in user_file.lower():
if recursive:
i = 0
for root, _dirs, files in os.walk(listing_path):
_dirs[:] = [d for d in _dirs if not d.startswith(".")]
for fname in sorted(files):
if fname.startswith("."):
continue
full = os.path.join(root, fname)
rel = os.path.relpath(full, user_files_path)
if "filter" in request_data:
if str(request_data["filter"]).lower() not in fname.lower():
continue
ret.append({
"index": i,
"filepath": full,
"relative_path": rel,
"name": fname,
"type": "file",
"updated_at": time.ctime(os.path.getmtime(full)),
})
i += 1
else:
i = 0
for entry_name in os.listdir(listing_path):
if entry_name.startswith("."):
continue

ret.append(tmp)
i += 1
full = os.path.join(listing_path, entry_name)
rel = os.path.relpath(full, user_files_path)
entry_type = "directory" if os.path.isdir(full) else "file"

if "filter" in request_data:
if str(request_data["filter"]).lower() not in entry_name.lower():
continue

ret.append({
"index": i,
"filepath": full,
"relative_path": rel,
"name": entry_name,
"type": entry_type,
"updated_at": time.ctime(os.path.getmtime(full)),
})
i += 1

ret = sorted(ret, key=lambda f: f["filepath"], reverse=False) # sort by filename
ret = sorted(ret, key=lambda f: (f["type"] != "directory", f["name"].lower()))
api_response.set_data(ret)
return api_response.return_ok()

Expand All @@ -9746,7 +9791,10 @@ def post(self, api_response: ApiResponse = None):
api_response.set_missing_fields(wrong_fields)
return api_response.return_bad_request_missing_fields()

if request_data["filename"].startswith("."):
filename = request_data["filename"]
basename = os.path.basename(filename)

if basename.startswith("."):
api_response.set_message("Filename cannot start with a dot")
return api_response.return_bad_request()

Expand All @@ -9763,10 +9811,13 @@ def post(self, api_response: ApiResponse = None):
if not os.path.exists(user_files_path):
os.makedirs(user_files_path, exist_ok=True)

filename = request_data["filename"]
filecontent = request_data["filecontent"]
filepath = os.path.join(user_files_path, filename)

if not is_safe_user_path(user_files_path, filepath):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if filename == "":
api_response.set_message("Missing filename value")
return api_response.return_bad_request()
Expand All @@ -9778,6 +9829,9 @@ def post(self, api_response: ApiResponse = None):
api_response.set_message("File already exists")
return api_response.return_conflict()

parent_dir = os.path.dirname(filepath)
os.makedirs(parent_dir, exist_ok=True)

f = open(filepath, "w")
f.write(filecontent)
f.close()
Expand All @@ -9789,15 +9843,88 @@ def post(self, api_response: ApiResponse = None):
ret = {
"index": 0,
"filepath": filepath,
"relative_path": os.path.relpath(filepath, user_files_path),
"name": os.path.basename(filepath),
"type": "file",
"updated_at": time.ctime(os.path.getmtime(filepath))
}
api_response.set_data(ret)
return api_response.return_created()

@api_response_decorator
def put(self, api_response: ApiResponse = None):
"""
move or rename a user file or directory
"""
request_data = request.get_json(force=True)
api_response.set_logger(logger)
api_response.set_args(request_data)

mandatory_fields = ["source", "destination"]
wrong_fields = get_wrong_mandatory_fields(mandatory_fields, request_data)
if len(wrong_fields) > 0:
api_response.set_missing_fields(wrong_fields)
return api_response.return_bad_request_missing_fields()

dbi = get_db()

user_id = get_user_id_from_request(request_data, dbi.session)
if user_id == 0:
return api_response.return_unauthorized()

dbi.close()

user_files_path = os.path.join(USER_FILES_BASE_DIR, f"{user_id}")
if not os.path.exists(user_files_path):
os.makedirs(user_files_path, exist_ok=True)

source = request_data["source"]
destination = request_data["destination"]

if source == "" or destination == "":
api_response.set_message("Source and destination must not be empty")
return api_response.return_bad_request()

src_path = os.path.join(user_files_path, source)
dst_path = os.path.join(user_files_path, destination)

if not is_safe_user_path(user_files_path, src_path):
api_response.set_message("Invalid source path")
return api_response.return_bad_request()

if not is_safe_user_path(user_files_path, dst_path):
api_response.set_message("Invalid destination path")
return api_response.return_bad_request()

if not os.path.exists(src_path):
api_response.set_message("Source not found")
return api_response.return_not_found()

if os.path.exists(dst_path):
api_response.set_message("Destination already exists")
return api_response.return_conflict()

dst_parent = os.path.dirname(dst_path)
os.makedirs(dst_parent, exist_ok=True)

shutil.move(src_path, dst_path)

entry_type = "directory" if os.path.isdir(dst_path) else "file"
ret = {
"index": 0,
"filepath": dst_path,
"relative_path": os.path.relpath(dst_path, user_files_path),
"name": os.path.basename(dst_path),
"type": entry_type,
"updated_at": time.ctime(os.path.getmtime(dst_path)),
}
api_response.set_data(ret)
return api_response.return_ok()

@api_response_decorator
def delete(self, api_response: ApiResponse = None):
"""
delete user file
delete user file or directory
"""
request_data = request.get_json(force=True)
api_response.set_logger(logger)
Expand All @@ -9821,6 +9948,7 @@ def delete(self, api_response: ApiResponse = None):
user_files_path = os.path.join(USER_FILES_BASE_DIR, f"{user_id}")
if not os.path.exists(user_files_path):
os.makedirs(user_files_path, exist_ok=True)
api_response.set_message("File not found")
return api_response.return_not_found()

filename = request_data["filename"]
Expand All @@ -9830,20 +9958,95 @@ def delete(self, api_response: ApiResponse = None):
api_response.set_message("Missing filename value")
return api_response.return_bad_request()

if not is_safe_user_path(user_files_path, filepath):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if not os.path.exists(filepath):
api_response.set_message("File not found")
return api_response.return_not_found()

os.remove(filepath)
if os.path.isdir(filepath):
shutil.rmtree(filepath)
else:
os.remove(filepath)

if os.path.exists(filepath):
api_response.set_message("File not found")
api_response.set_message("Failed to delete")
return api_response.return_not_found()

ret = {"index": 0,
"filepath": filepath,
"updated_at": ""}
return ret
ret = {
"index": 0,
"filepath": filepath,
"relative_path": os.path.relpath(filepath, user_files_path),
"name": os.path.basename(filepath),
"updated_at": "",
}
api_response.set_data(ret)
return api_response.return_ok()


class UserFileFolder(Resource):
route = "/user/files/folder"

@api_response_decorator
def post(self, api_response: ApiResponse = None):
"""
create a user folder
"""
request_data = request.get_json(force=True)
api_response.set_logger(logger)
api_response.set_args(request_data)

mandatory_fields = ["foldername"]
wrong_fields = get_wrong_mandatory_fields(mandatory_fields, request_data)
if len(wrong_fields) > 0:
api_response.set_missing_fields(wrong_fields)
return api_response.return_bad_request_missing_fields()

dbi = get_db()

user_id = get_user_id_from_request(request_data, dbi.session)
if user_id == 0:
return api_response.return_unauthorized()

dbi.close()

user_files_path = os.path.join(USER_FILES_BASE_DIR, f"{user_id}")
if not os.path.exists(user_files_path):
os.makedirs(user_files_path, exist_ok=True)

foldername = request_data["foldername"]
if foldername == "":
api_response.set_message("Missing foldername value")
return api_response.return_bad_request()

if os.path.basename(foldername).startswith("."):
api_response.set_message("Folder name cannot start with a dot")
return api_response.return_bad_request()

folderpath = os.path.join(user_files_path, foldername)

if not is_safe_user_path(user_files_path, folderpath):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if os.path.exists(folderpath):
api_response.set_message("Folder already exists")
return api_response.return_conflict()

os.makedirs(folderpath, exist_ok=True)

ret = {
"index": 0,
"filepath": folderpath,
"relative_path": os.path.relpath(folderpath, user_files_path),
"name": os.path.basename(folderpath),
"type": "directory",
"updated_at": time.ctime(os.path.getmtime(folderpath)),
}
api_response.set_data(ret)
return api_response.return_created()


class UserFileContent(Resource):
Expand Down Expand Up @@ -9881,6 +10084,11 @@ def get(self, api_response: ApiResponse = None):
return api_response.return_not_found()

filepath = os.path.join(user_files_path, request_data["filename"])

if not is_safe_user_path(user_files_path, filepath):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if not os.path.exists(filepath):
return api_response.return_not_found()

Expand Down Expand Up @@ -9930,6 +10138,10 @@ def put(self, api_response: ApiResponse = None):
filecontent = request_data["filecontent"]
filepath = os.path.join(user_files_path, filename)

if not is_safe_user_path(user_files_path, filepath):
api_response.set_message("Invalid path")
return api_response.return_bad_request()

if filename == "" or filecontent == "":
return api_response.return_bad_request()

Expand Down Expand Up @@ -11711,6 +11923,7 @@ def get(self, api_response: ApiResponse = None):
api.add_resource(UserSignin, UserSignin.route)
api.add_resource(UserSshKey, UserSshKey.route)
api.add_resource(UserFiles, UserFiles.route)
api.add_resource(UserFileFolder, UserFileFolder.route)
api.add_resource(UserFileContent, UserFileContent.route)
api.add_resource(Alert, Alert.route)
api.add_resource(Testing, Testing.route)
Expand Down
7 changes: 7 additions & 0 deletions api/api_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,13 @@ def get_custom_actions(user: UserModel) -> list:
return actions


def is_safe_user_path(user_root, requested_path):
"""Return True only if *requested_path* resolves under *user_root*."""
abs_root = os.path.realpath(user_root)
abs_target = os.path.realpath(requested_path)
return abs_target == abs_root or abs_target.startswith(abs_root + os.sep)


def is_safe_local_user_file_path(path: str) -> bool:
from api import USER_FILES_BASE_DIR
return path.startswith(os.path.abspath(USER_FILES_BASE_DIR) + os.sep)
Expand Down
Loading
Loading