Skip to content
On this page

Globus storage

Integration for Globus Compute.

Usage example

python
from machinable import get

storage = get("globus", {"client_id": ..., ...}).__enter__()

# run some experiment
component = ...

# upload to globus
storage.upload(component) 

# search for component in globus using hash 
matches = storage.search_for(experiment)
print(matches)

# download from globus
storage.download(matches[0].uuid)

Source

py
from typing import TYPE_CHECKING, List

import os
import time

from globus_sdk import (
    NativeAppAuthClient,
    RefreshTokenAuthorizer,
    TransferClient,
    TransferData,
)
from globus_sdk.scopes import TransferScopes
from globus_sdk.services.auth.errors import AuthAPIError
from globus_sdk.services.transfer.errors import TransferAPIError
from globus_sdk.tokenstorage import SimpleJSONFileAdapter
from machinable import Storage
from pydantic import BaseModel, Field

if TYPE_CHECKING:
    from machinable import Interface


class Globus(Storage):
    class Config(BaseModel):
        client_id: str = Field("???")
        remote_endpoint_id: str = Field("???")
        local_endpoint_id: str = Field("???")
        remote_endpoint_directory: str = Field("???")
        local_endpoint_directory: str = "~/"
        auth_filepath: str = "~/.globus-tokens.json"

    def __init__(self, version=None):
        super().__init__(version=version)
        self._auth_client = None
        self._auth_file = None
        self._authorizer = None
        self._transfer_client = None
        self.active_tasks = []

    @property
    def auth_client(self):
        if self._auth_client is None:
            self._auth_client = NativeAppAuthClient(self.config.client_id)
        return self._auth_client

    @property
    def auth_file(self):
        if self._auth_file is None:
            self._auth_file = SimpleJSONFileAdapter(
                os.path.expanduser(self.config.auth_filepath)
            )
        return self._auth_file

    @property
    def authorizer(self):
        if self._authorizer is None:
            if not self.auth_file.file_exists():
                # do a login flow, getting back initial tokens
                self.auth_client.oauth2_start_flow(
                    requested_scopes=[
                        f"{TransferScopes.all}[*https://auth.globus.org/scopes/{self.config.remote_endpoint_id}/data_access]",
                        f"{TransferScopes.all}[*https://auth.globus.org/scopes/{self.config.local_endpoint_id}/data_access]",
                    ],
                    refresh_tokens=True,
                )
                authorize_url = self.auth_client.oauth2_get_authorize_url()
                print(
                    f"Please go to this URL and login:\n\n{authorize_url}\n",
                    flush=True,
                )
                time.sleep(0.05)
                auth_code = input("Please enter the code here: ").strip()
                tokens = self.auth_client.oauth2_exchange_code_for_tokens(
                    auth_code
                )
                self.auth_file.store(tokens)
                tokens = tokens.by_resource_server["transfer.api.globus.org"]
            else:
                # otherwise, we already did login; load the tokens
                tokens = self.auth_file.get_token_data(
                    "transfer.api.globus.org"
                )

            self._authorizer = RefreshTokenAuthorizer(
                tokens["refresh_token"],
                self.auth_client,
                access_token=tokens["access_token"],
                expires_at=tokens["expires_at_seconds"],
                on_refresh=self.auth_file.on_refresh,
            )
        return self._authorizer

    @property
    def transfer_client(self):
        if self._transfer_client is None:
            self._transfer_client = TransferClient(authorizer=self.authorizer)
        return self._transfer_client

    def commit(self, interface: "Interface") -> str:
        try:
            src = os.path.abspath(interface.local_directory())
            if not os.path.exists(src):
                raise RuntimeError("Interface must be committed before storage")

            # This is not a strict requirement since client might allow access
            # if os.path.normpath(src) != os.path.normpath(self.local_path(interface.uuid)):
            #     raise RuntimeError("Interface directory must be in storage directory")

            task_data = TransferData(
                source_endpoint=self.config.local_endpoint_id,
                destination_endpoint=self.config.remote_endpoint_id,
                notify_on_succeeded=False,
                notify_on_failed=False,
            )

            task_data.add_item(
                src,
                self.remote_path(interface.uuid),
                recursive=True,
            )

            task_doc = self.transfer_client.submit_transfer(task_data)

            task_id = task_doc["task_id"]

            self.active_tasks.append(task_id)

            print(f"Submitted Globus commit, task_id={task_id}")

            # this operation is non-blocking by default

            return task_id
        except TransferAPIError as e:
            if (
                e.code == "Conflict"
                and e.message
                == "A transfer with identical paths has not yet completed"
            ):
                return False
            elif e.code == "ConsentRequired":
                raise RuntimeError(
                    f"You do not have the right permissions. Try removing {self.config.auth_filepath} and authenticating again with the appropriate identity provider."
                ) from e
            raise e
        except AuthAPIError as e:
            raise RuntimeError(
                f"Authentication failed. Try removing {self.config.auth_filepath} and authenticating again with the appropriate identity provider."
            ) from e

    def update(self, interface: "Interface") -> None:
        return self.commit(interface)

    def contains(self, uuid: str) -> bool:
        # check if folder exists on globus storage
        try:
            response = self.transfer_client.operation_ls(
                self.config.remote_endpoint_id,
                path=self.remote_path(uuid),
                show_hidden=True,
            )
        except TransferAPIError as e:
            if e.code == "ClientError.NotFound":
                return False
            elif e.code == "ConsentRequired":
                raise RuntimeError(
                    f"You do not have the right permissions. Try removing {self.config.auth_filepath} and authenticating again with the appropriate identity provider."
                ) from e
            raise e

        for item in response:
            if item["name"] == ".machinable":
                return True

        return False

    def retrieve(
        self, uuid: str, local_directory: str, timeout: int = 5 * 60
    ) -> bool:
        if not self.contains(uuid):
            return False

        task_data = TransferData(
            source_endpoint=self.config.remote_endpoint_id,
            destination_endpoint=self.config.local_endpoint_id,
            notify_on_succeeded=False,
            notify_on_failed=False,
        )
        task_data.add_item(
            self.remote_path(uuid),
            local_directory,
        )
        task_doc = self.transfer_client.submit_transfer(task_data)
        task_id = task_doc["task_id"]
        self.active_tasks.append(task_id)

        print(f"[Storage] Submitted Globus retrieve, task_id={task_id}")

        self.tasks_wait(timeout=timeout)

        return True

    def tasks_wait(self, timeout: int = 5 * 60) -> None:
        for task_id in self.active_tasks:
            print(f"[Storage] Waiting for Globus task {task_id} to complete")
            self.transfer_client.task_wait(task_id, timeout=timeout)
            print(f"[Storage] task_id={task_id} transfer finished")
        self.active_tasks = []

    def local_path(self, *append):
        return os.path.join(self.config.local_endpoint_directory, *append)

    def remote_path(self, *append):
        return os.path.join(self.config.remote_endpoint_directory, *append)

    def search_for(self, experiment) -> List[str]:
        response = self.transfer_client.operation_ls(
            self.config.remote_endpoint_id,
            path=self.remote_path(),
            show_hidden=True,
        )
        found = []
        for item in response:
            if experiment.hash in item["name"]:
                found.append(item["name"])

        return found

MIT Licensed