diff --git a/.github/workflows/testing-work.yml b/.github/workflows/testing-work.yml index 078d5d9..1da3b81 100644 --- a/.github/workflows/testing-work.yml +++ b/.github/workflows/testing-work.yml @@ -34,6 +34,7 @@ jobs: pip install pytest-cov pip install -e . if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + if [ -f test_requirements.txt ]; then pip install -r test_requirements.txt; fi - name: Test with pytest run: | diff --git a/mdf_connect_client/mdfcc.py b/mdf_connect_client/mdfcc.py index 28c0aea..f59db8d 100644 --- a/mdf_connect_client/mdfcc.py +++ b/mdf_connect_client/mdfcc.py @@ -1,10 +1,14 @@ from datetime import datetime +from typing import Any, Tuple, Dict, List +from uuid import uuid4 import json +import os import globus_sdk import mdf_toolbox from nameparser import HumanName import requests +import urllib from .version import __version__ @@ -23,6 +27,11 @@ "reject": ("This submission has been rejected because it does not meet the " "appropriate standards") } +FILE_UPLOAD_SERVICES = ["transfer", "openid", + "https://auth.globus.org/scopes/facd7ccc-c5f4-42aa-916b-a0e270e2c2a9/all", # funcx + "https://auth.globus.org/scopes/f10a69a9-338c-4e5b-baa1-0dc92359ab47/https", # Eagle HTTPS + "https://auth.globus.org/scopes/82f1b5c6-6e9b-11e5-ba47-22000b92c6ec/https", # NCSA HTTPS +] class MDFConnectClient: @@ -37,7 +46,7 @@ class MDFConnectClient: globus_sdk.NullAuthorizer ] - def __init__(self, test=False, service_instance=None, authorizer=None): + def __init__(self, test=False, service_instance=None, authorizer=None, confidential=False, client_secret=None): """Create an MDF Connect Client. Arguments: @@ -53,6 +62,12 @@ def __init__(self, test=False, service_instance=None, authorizer=None): authorizer (globus_sdk.GlobusAuthorizer): The authorizer to use for authentication. This value should not normally be changed from the default. **Default:** ``None``, to run the standard authentication flow. + confidential (bool): When ``True``, log in to Globus services as a confidential client + (a client with its own login information, i.e. NOT a human's account). + **Default:** ``False``, to run the standard authentication flow. + client_secret (str): Client secret to use when performing a confidential login. + Required when performing a confidential login. + **Default:** ``None``, because it is unnecessary otherwise. Returns: *MDFConnectClient*: An initialized, authenticated MDF Connect Client. @@ -82,9 +97,18 @@ def __init__(self, test=False, service_instance=None, authorizer=None): if any([isinstance(authorizer, allowed) for allowed in self.__allowed_authorizers]): self.__authorizer = authorizer else: - self.__authorizer = mdf_toolbox.login(services=self.__login_services, - client_id=self.__client_id, - app_name=self.__app_name).get(login_service) + perform_login = mdf_toolbox.login + login_kwargs = {"services": self.__login_services+FILE_UPLOAD_SERVICES, + "client_id": self.__client_id, + "app_name": self.__app_name} + if confidential: + if client_secret is None: + raise ValueError(f"Unable to perform confidential login without client_secret") + perform_login = mdf_toolbox.confidential_login + login_kwargs["client_secret"] = client_secret + del login_kwargs["app_name"] + self.__auths = perform_login(**login_kwargs) + self.__authorizer = self.__auths.get(login_service) if not self.__authorizer: raise ValueError("Unable to authenticate") @@ -94,6 +118,7 @@ def logout(self): """ self.reset_submission() self.__authorizer = None + if self.__auths: self.__auths = None mdf_toolbox.logout(client_id=self.__client_id, app_name=self.__app_name) return "Logged out. You must create a new MDF Connect Client to log back in." @@ -1464,3 +1489,205 @@ def reject_curation_submission(self, source_id, reason=None, prompt=True, raw=Fa if raw is ``True``, *dict*: The full task results. """ return self._complete_curation_task(source_id, "reject", reason, prompt, raw) + + def upload_to_endpoint(self, local_data_path: str, endpoint_id: str = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec", + dest_parent: str = None, dest_child: str = None) -> Tuple[str, str]: + """Upload local data to a Globus endpoint using HTTPS PUT requests. Data can be a folder or an individual file. + Note that the ACL rule created in this method must later be deleted after the dataset is submitted to MDF. + Args: + local_data_path (str): Path to the local dataset to publish to Foundry via HTTPS. Creates an HTTPS PUT + request to upload the data specified to a Globus endpoint (default is NCSA endpoint) before it is + transferred to MDF. + endpoint_id (str): Globus endpoint ID to upload the data to. Default is NCSA endpoint. + + Returns + ------- + (str) Globus data source URL: URL pointing to the data on the Globus endpoint + (str) rule_id: Globus ACL rule ID for the uploaded data. Used to delete the rule after the dataset is submitted + to MDF. + """ + # define upload destination + dest_path = self._create_dest_folder(endpoint_id, parent_dir=dest_parent, child_dir=dest_child) + # create new ACL rule (ie permission) for user to read/write to endpoint and path + rule_id = "" # self._create_access_rule(endpoint_id, dest_path) + # upload data to endpoint + globus_data_source = self._https_upload(local_data_path=local_data_path, dest_path=dest_path, + endpoint_id=endpoint_id) + return globus_data_source, rule_id + + def _create_dest_folder(self, endpoint_id: str, parent_dir: str = None, child_dir: str = None) -> str: + """Create a destination folder for the data on a Globus endpoint + Args: + endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. + parent_dir (str): Set to "/tmp" when default is None. The parent directory that all publications via HTTPS + will be written to. + child_dir (str): Set to a random UUID when default is None. The child directory that the data will be + written to. + Returns + ------- + (str): Path on Globus endpoint to write to + """ + transfer_client = self.__auths["transfer"] + # use a random UUID for each dataset publication, unless specified otherwise + if child_dir is None: + child_dir = uuid4() # the publication ID forms the name of the child directory + if parent_dir is None: + parent_dir = "/tmp" + dest_path = os.path.join(parent_dir, str(child_dir)) # NOTE: must start and end with "/" + + try: + transfer_client.operation_mkdir(endpoint_id=endpoint_id, path=dest_path) + except globus_sdk.TransferAPIError as e: + raise IOError(f"Error from Globus API while creating destination folder: {e.message}") from e + return dest_path + + def _create_access_rule(self, endpoint_id: str, dest_path: str) -> str: + """Create an ACL rule (ie permission) for the user to read/write to the given destination on a Globus endpoint + Args: + endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. + dest_path (str): The path to the existing folder on the given Globus endpoint. + Returns + ------- + (str): The ID for the ACL rule (necessary to delete it in the future) + """ + transfer_client = self.__auths["transfer"] + auth_client = globus_sdk.AuthClient(authorizer=self.__auths["openid"]) + # get user info + res = auth_client.oauth2_userinfo() + user_id = res.data["sub"] # get the user primary ID (based on primary email set in Globus) + # create data blob needed to set new rule with Globus + rule_data = { + "DATA_TYPE": "access", + "principal_type": "identity", + "principal": user_id, + "path": dest_path, + "permissions": "rw", + } + # create new ACL rule (eg permission) for user to read/write to endpoint and path + rule_id = None + try: + ret = transfer_client.add_endpoint_acl_rule(endpoint_id, rule_data) + rule_id = ret["access_id"] # rule_id is needed to delete the rule later + except globus_sdk.TransferAPIError: + pass # NOTE: known issue where user can still write to endpoint if this fails + return rule_id + + def _https_upload(self, local_data_path: str, dest_path: str = "/tmp", + endpoint_id: str = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec") -> str: + """Upload a dataset via HTTPS to a Globus endpoint + Args: + local_data_path (str): The path to the local data to upload. Can be relative or absolute. + dest_path (str): The path to the destination folder on the Globus endpoint. Default is "/tmp". + endpoint_id (str): A UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. Default is the NCSA UUID "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec". + Returns + ------- + (str): Globus data source URL (ie the URL that points to the data on a Globus endpoint) + """ + transfer_client = self.__auths["transfer"] + # get URL for Globus endpoint location + endpoint = transfer_client.get_endpoint(endpoint_id) # gets info for NCSA endpoint + https_base_url = endpoint["https_server"] + + # Submit data (folders of files or an independent file) to be written to endpoint + if os.path.isdir(local_data_path): + self._upload_folder(local_data_path, https_base_url, dest_path, endpoint_id) + elif os.path.isfile(local_data_path): + self._upload_file(local_data_path, https_base_url, dest_path, endpoint_id) + else: + raise IOError(f"Data path '{local_data_path}' is of unknown type") + + # return the data source URL for publication to MDF + return self._make_globus_link(endpoint_id, dest_path) + + def _upload_folder(self, local_data_path: str, https_base_url: str, parent_dest_path: str, endpoint_id: str) \ + -> List[Dict[str, Any]]: + """Upload a folder to a Globus endpoint using HTTPS + Args: + local_data_path (str): The path to the local data to upload. Can be relative or absolute. + https_base_url (str): The URL for a given Globus endpoint. + parent_dest_path (str): The path to the parent folder to be written to on the given endpoint. The contents + of "local_data_path" will be written here, including subdirectories. + endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. This must be the same endpoint pointed to by the https_base_url. + Returns + ------- + (list): A list of all the HTTPS PUT request results (dicts) from the uploads + """ + transfer_client = self.__auths["transfer"] + results = [] + # initialize destination path as the parent destination path + dest_path = parent_dest_path + + # walk through each child directory in the designated local data folder + for root, _, files in os.walk(local_data_path): + # update destination path if we have walked into a child directory + if root != local_data_path: + # get the child directory relative path + subpath = os.path.relpath(root, local_data_path) + # update destination path to include child directories (ie subpaths) + dest_path = os.path.join(parent_dest_path, subpath) + # create child directories on endpoint + try: + transfer_client.operation_mkdir(endpoint_id=endpoint_id, path=dest_path) + except globus_sdk.TransferAPIError as e: + raise IOError(f"Error while creating child directory {dest_path}: {e.message}") from e + # get local path to file to upload + for filename in files: + filepath = os.path.join(root, filename) + # upload file to destination path on endpoint + result = self._upload_file(filepath, https_base_url, dest_path, endpoint_id) + results.append(result) + return results + + def _upload_file(self, filepath: str, https_base_url: str, dest_path: str, endpoint_id: str) -> requests.Response: + """Upload an individual file to a Globus endpoint using HTTPS PUT + Args: + filepath (str): The path to the local file to upload. + https_base_url (str): The URL for a given Globus endpoint. + dest_path (str): The path to the folder to be written to on the given endpoint. + endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. This must be the same endpoint pointed to by the https_base_url. + Returns + ------- + (Response): The `requests` HTTPS response object from a PUT request + """ + # lets you HTTPS to specific endpoint (NCSA endpoint by default) + scope = f"https://auth.globus.org/scopes/{endpoint_id}/https" + # Get the authorization header token (string for the headers dict) HTTPS upload + auth_gcs = globus_sdk.AuthClient(authorizer=self.__auths[scope]) + header = auth_gcs.authorizer.get_authorization_header() + + # get Globus endpoint path to write to + filename = os.path.split(filepath)[1] + # need to strip out leading "/" in dest_path for join to work + endpoint_dest = os.path.join(https_base_url, dest_path.lstrip("/"), filename) + + # upload via HTTPS as arbitrary binary content type + with open(filepath, "rb") as f: + reply = requests.put( + endpoint_dest, + data=f, + headers={"Authorization": header, "Content-Type": "application/octet-stream"} + ) + if reply.status_code != 200: + raise IOError(f"Error on HTTPS PUT, got response {reply.status_code}: {reply.text}") + # Return the response + return reply + + def _make_globus_link(self, endpoint_id: str, path: str) -> str: + """Create the Globus data source URL for a given datapath on an endpoint + Args: + endpoint_id (str): The UUID designating the exact Globus endpoint. Can be obtained via the Globus Web UI or + the SDK. + path (str): The path to the dataset folder on the given endpoint. + Returns + ------- + (str): The Globus data source URL (ie the URL that points to the data on a Globus endpoint) + """ + # make sure the path has the "/"s encoded properly for a URL + safe_path = urllib.parse.quote(path, safe="*") + link = f"https://app.globus.org/file-manager?origin_id={endpoint_id}&origin_path={safe_path}" + return link diff --git a/test_requirements.txt b/test_requirements.txt index 537c6e1..5122f5b 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -2,4 +2,7 @@ coveralls>=1.10.0 nbval>=0.9.4 pytest>=5.3.1 pytest-cov>=2.5.1 -mdf_toolbox \ No newline at end of file +mdf_toolbox +numpy +pandas +requests \ No newline at end of file diff --git a/tests/test_connect_client.py b/tests/test_connect_client.py index cd05eb4..17d6b6c 100644 --- a/tests/test_connect_client.py +++ b/tests/test_connect_client.py @@ -1,5 +1,11 @@ from datetime import datetime +from filecmp import cmp +from math import floor +import json import os +import numpy as np +import pandas as pd +import requests from mdf_toolbox import insensitive_comparison import mdf_toolbox @@ -13,9 +19,9 @@ client_secret = os.getenv('CLIENT_SECRET') auths = mdf_toolbox.confidential_login(client_id=client_id, - client_secret=client_secret, - services=["mdf_connect", "mdf_connect_dev"], - make_clients=True) + client_secret=client_secret, + services=["mdf_connect", "mdf_connect_dev"], + make_clients=True) print(auths) @@ -469,6 +475,56 @@ def test_submission(): "update": False}) +def test_https_upload(): + """Unit test: Test the _upload_to_endpoint() HTTPS functionality on its own, without publishing to MDF + """ + endpoint_id = "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec" # NCSA endpoint + dest_parent = "/tmp" + dest_child = f"test_{floor(datetime.now().timestamp())}" + local_path = "./data/https_test" + filename = "test_data.json" + + mdf = MDFConnectClient(confidential=True, client_secret=client_secret) + # create test JSON to upload (if it doesn't already exist) + _write_test_data(local_path, filename) + # upload via HTTPS to NCSA endpoint + globus_data_source, _ = mdf.upload_to_endpoint(local_path, endpoint_id, dest_parent=dest_parent, + dest_child=dest_child) + + expected_data_source = f"https://app.globus.org/file-manager?origin_id=" \ + f"82f1b5c6-6e9b-11e5-ba47-22000b92c6ec&origin_path=%2Ftmp%2F{dest_child}" + # confirm data source link was created properly, with correct folders + assert globus_data_source == expected_data_source + + mdf_url = f"https://data.materialsdatafacility.org/tmp/{dest_child}/{filename}" + response = requests.get(mdf_url) + # check that we get a valid response back (note that a 200 could be a UI error, returned as HTML) + assert response.status_code == 200 + # check that contents of response are as expected + tmp_file = "./data/tmp_data.json" + with open(tmp_file, "wb") as fl: + fl.write(response.content) + assert cmp(tmp_file, os.path.join(local_path, filename)) + + # delete ACL rule for user + # if rule_id is not None: + # res = f.transfer_client.delete_endpoint_acl_rule(endpoint_id, rule_id) + + +def _write_test_data(dest_path="./data/https_test", filename="test_data.json"): + # Create random JSON data + data = pd.DataFrame(np.random.rand(100, 4), columns=list('ABCD')) + res = data.to_json(orient="records") + + # Make data directory + os.makedirs(dest_path, exist_ok=True) + data_filepath = os.path.join(dest_path, filename) + + # Write data to JSON file + with open(data_filepath, "w+") as f: + json.dump(res, f, indent=4) + + # def test_submit_dataset(): # # TODO # pass