Skip to content
On this page

Globus storage

Integration for Globus Compute.

Usage example

python
from machinable import get

get("globus", {"client_id": ..., ...}).__enter__()

# your code

Source

py
import os

from globus_sdk import (
    NativeAppAuthClient,
    RefreshTokenAuthorizer,
    TransferClient,
    TransferData,
)
from globus_sdk.scopes import TransferScopes
from globus_sdk.services.transfer.errors import TransferAPIError
from globus_sdk.tokenstorage import SimpleJSONFileAdapter
from machinable import Storage
from pydantic import BaseModel, Field


class Globus(Storage):
    class Config(BaseModel):
        client_id: str
        local_endpoint_id: str
        local_endpoint_directory: str
        remote_endpoint_id: str
        remote_endpoint_directory: str
        auth_filepath: str = "~/.globus-tokens.json"

    def __init__(self, version=None):
        super().__init__(version=version)
        self._auth_client = None
        self._auth_file = None
        self._authorizer = None
        self._transfer_client = None

    @property
    def auth_client(self):
        if self._auth_client is None:
            self._auth_client = NativeAppAuthClient(self.config.client_id)
        return self._auth_client

    @property
    def auth_file(self):
        if self._auth_file is None:
            self._auth_file = SimpleJSONFileAdapter(
                os.path.expanduser(self.config.auth_filepath)
            )
        return self._auth_file

    @property
    def authorizer(self):
        if self._authorizer is None:
            if not self.auth_file.file_exists():
                # do a login flow, getting back initial tokens
                self.auth_client.oauth2_start_flow(
                    requested_scopes=f"{TransferScopes.all}[*https://auth.globus.org/scopes/{self.config.remote_endpoint_id}/data_access]",
                    refresh_tokens=True,
                )
                authorize_url = self.auth_client.oauth2_get_authorize_url()
                print(f"Please go to this URL and login:\n\n{authorize_url}\n")
                auth_code = input("Please enter the code here: ").strip()
                tokens = self.auth_client.oauth2_exchange_code_for_tokens(
                    auth_code
                )
                self.auth_file.store(tokens)
                tokens = tokens.by_resource_server["transfer.api.globus.org"]
            else:
                # otherwise, we already did login; load the tokens
                tokens = self.auth_file.get_token_data(
                    "transfer.api.globus.org"
                )

            self._authorizer = RefreshTokenAuthorizer(
                tokens["refresh_token"],
                self.auth_client,
                access_token=tokens["access_token"],
                expires_at=tokens["expires_at_seconds"],
                on_refresh=self.auth_file.on_refresh,
            )
        return self._authorizer

    @property
    def transfer_client(self):
        if self._transfer_client is None:
            self._transfer_client = TransferClient(authorizer=self.authorizer)
        return self._transfer_client

    def commit(self, interface: "Interface") -> None:
        ...

    def update(self, interface: "Interface") -> None:
        ...

    def contains(self, uuid: str) -> bool:
        # check if folder exists on globus storage
        try:
            response = self.transfer_client.operation_ls(
                self.config.remote_endpoint_id,
                path=os.path.join(self.config.remote_endpoint_directory, uuid),
            )
        except TransferAPIError as e:
            if e.code == "ClientError.NotFound":
                return False
            elif e.code == "ConsentRequired":
                raise RuntimeError(
                    f"You do not have the right permissions. Try removing {self.config.auth_filepath} and authenticating again with the appropriate identity provider."
                ) from e
            raise e

        for item in response:
            if item["name"] == ".machinable":
                return True

        return False

    def retrieve(self, uuid: str, local_directory: str) -> bool:
        # task_data = TransferData(
        #     source_endpoint=self.config.remote_endpoint_id,
        #     destination_endpoint=self.config.local_endpoint_id,
        # )
        # task_data.add_item(
        #     os.path.join(self.config.remote_endpoint_directory, uuid),
        #     target_directory,
        # )

        # task_doc = self.transfer_client.submit_transfer(task_data)
        # task_id = task_doc["task_id"]
        # print(f"submitted transfer, task_id={task_id}")
        return False

MIT Licensed