Source code for mdsisclienttools.datastore.ReadWriteHelper

import requests
from typing import Dict, Any, Optional, List
import cloudpathlib.s3 as s3  # type: ignore
from mdsisclienttools.auth.TokenManager import BearerAuth


DEFAULT_DATA_STORE_ENDPOINT = "https://data-api.mds.gbrrestoration.org"


[docs]def _fetch_all_datasets(auth: BearerAuth, endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> List[Dict[str, Any]]: """ fetch_datasets Given basic bearer auth will call the fetch datasets API endpoint to find an example dataset. Arguments ---------- auth : BearerAuth The bearer auth object Returns ------- : List[Dict[str,Any]] The list of registry items See Also (optional) -------- Examples (optional) -------- """ _endpoint = endpoint + "/registry/items/list-all-datasets" response = requests.get(_endpoint, auth=auth) # Check successful assert response.status_code == 200 response = response.json() assert response['status']['success'] # Return the items return response['registry_items']
[docs]def _fetch_dataset(handle_id: str, auth: BearerAuth, endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> Optional[Dict[str, Any]]: """ _fetch_dataset Given the handle ID to lookup and the basic auth will call the fetch dataset data store API to get more information about the dataset. Arguments ---------- handle_id : str The handle ID to lookup auth : BearerAuth The bearer token auth Returns ------- : Optional[Dict[str, Any]] A dictionary for the item if found or None if not Raises ------ Exception If a non status code 200 response. See Also (optional) -------- Examples (optional) -------- """ fetch_endpoint = endpoint + "/registry/items/fetch-dataset" params = { 'handle_id': handle_id } response = requests.get(fetch_endpoint, params=params, auth=auth) # Check successful try: assert response.status_code == 200 assert response.json()['status']['success'] # Return the items return response.json()['item'] except Exception as e: if (response.status_code == 200): return None else: raise e
[docs]def _read_dataset(dataset_id: str, auth: BearerAuth, endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> Dict[str, Any]: """ read_dataset Gets AWS read credentials using the data store API. Arguments ---------- dataset_id : Dict[str, Any] The datasets handle ID auth : BearerAuth The bearer token auth Returns ------- : Dict[str,Any] AWS credentials See Also (optional) -------- Examples (optional) -------- """ read_cred_endpoint = endpoint + \ "/registry/credentials/generate-read-access-credentials" response = requests.post(read_cred_endpoint, json={ "dataset_id": dataset_id, "console_session_required": False }, auth=auth) # Check successful assert response.status_code == 200 response = response.json() assert response['status']['success'] # Give back AWS creds return response['credentials']
[docs]def _write_dataset(dataset_id: str, auth: BearerAuth, endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> Dict[str, Any]: """ _write_dataset Gets AWS write credentials using the data store API. Arguments ---------- dataset_id : Dict[str, Any] The datasets handle ID auth : BearerAuth The bearer token auth See Also (optional) -------- Examples (optional) -------- """ write_credential_endpoint = endpoint + \ "/registry/credentials/generate-write-access-credentials" response = requests.post(write_credential_endpoint, json={ "dataset_id": dataset_id, "console_session_required": False }, auth=auth) # Check successful assert response.status_code == 200, f"Expected response 200OK but got {response.status_code}" response = response.json() assert response['status']['success'] # Give back AWS creds return response['credentials']
[docs]def _download_files(s3_loc: Dict[str, str], s3_creds: Dict[str, Any], destination_dir: str) -> None: """ download_files Uses the cloudpathlib library to download all the files and sub dirs/files from the specified s3 location (s3_uri in s3_loc) into the specified destination directory. Arguments ---------- s3_loc : Dict[str, str] S3 location object s3_creds : Dict[str, Any] S3 creds which match input format (i.e. expiry attribute dropped) destination_dir : str The relative destination directory - will be created if required See Also (optional) -------- Examples (optional) -------- """ # create client client = s3.S3Client( **s3_creds ) # create path path = s3.S3Path(cloud_path=s3_loc['s3_uri'], client=client) # download path.download_to(destination_dir) #Release file handles path.__del__
[docs]def _upload_files(s3_loc: Dict[str, str], s3_creds: Dict[str, Any], source_dir: str) -> None: # create client client = s3.S3Client(**s3_creds) # create path path = s3.S3Path(cloud_path=s3_loc['s3_uri'], client=client) # download path.upload_from(source_dir) #Release file handles path.__del__
[docs]def upload(handle: str, auth: BearerAuth, source_dir: str, data_store_api_endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> None: """Given a source path, handle and authorisation information, will retrieve read only credentials from the data store API, fetch the dataset information, then upload all the dataset files to the handle id specified location. Parameters ---------- handle : str The handle ID of the dataset to download. auth : BearerAuth The bearer auth object. See TokenManager library. source_dir : str The path of to folder of files to upload. Raises ------ ValueError Raises a value error if the handle id is invalid. """ # Get info about handle response = _fetch_dataset( handle_id=handle, auth=auth, endpoint=data_store_api_endpoint) # Handle was found if response: s3_loc = response['s3'] print( f"Found dataset: {response['collection_format']['dataset_info']['name']}.") else: # Handle was not found raise ValueError( f'Invalid input... the dataset with that handle: {handle} could not be found.') # get write credentials for this dataset creds = _write_dataset(handle, auth=auth, endpoint=data_store_api_endpoint) print() print(f'Attempting to upload files to {source_dir}') # Don't need expiry to use del creds['expiry'] _upload_files(s3_loc=s3_loc, s3_creds=creds, source_dir=source_dir) print(f"Upload complete.")
[docs]def download(download_path: str, handle: str, auth: BearerAuth, data_store_api_endpoint: str = DEFAULT_DATA_STORE_ENDPOINT) -> None: """Given a download path, handle and authorisation information, will retrieve read only credentials from the data store API, fetch the dataset information, then download all the dataset files to the specified location. Parameters ---------- download_path : str The path of to download the files to. If the folder/path does not exist, this will create the folder. If there are nested folders, all folders/files will be downloaded and paths created. handle : str The handle ID of the dataset to download. auth : BearerAuth The bearer auth object. See TokenManager library. Raises ------ ValueError Raises a value error if the dataset cannot be found. """ # Get info about handle response = _fetch_dataset( handle_id=handle, auth=auth, endpoint=data_store_api_endpoint) # Handle was found if response: s3_loc = response['s3'] print( f"Found dataset: {response['collection_format']['dataset_info']['name']}.") else: # Handle was not found raise ValueError( f'Invalid input... the dataset with that handle: {handle} could not be found.') # get read credentials for this dataset creds = _read_dataset(handle, auth=auth, endpoint=data_store_api_endpoint) print() print(f'Attempting to download files to {download_path}') # Don't need expiry to use del creds['expiry'] _download_files(s3_loc=s3_loc, s3_creds=creds, destination_dir=download_path) print(f"Download complete.")