Source code for educelab.globus.cp.main

import argparse
import json
import logging
import shutil
import sys
import time
from datetime import datetime as dt, timezone as tz
from pathlib import PurePosixPath, PureWindowsPath

import globus_sdk

import educelab.globus as globus
from educelab.globus._cli import setup_logging


[docs] def to_posix_path(path): if '\\' in path: path = PureWindowsPath(path) else: return PurePosixPath(path) parts = [p.replace('\\', '') for p in path.parts] if ':' in parts[0]: parts[0] = '/' + parts[0].replace(':', '') return PurePosixPath(*parts)
[docs] def resolve_path(endpoint, path_str): """Resolve a user-supplied path string against the endpoint's basedir.""" basedir = endpoint.get('basedir', '') if not path_str: if not basedir: raise ValueError( 'no path provided and endpoint has no basedir configured') return str(to_posix_path(basedir)) path = to_posix_path(path_str) if path.is_absolute(): return str(path) if not basedir: raise ValueError( f'relative path {path_str!r} requires endpoint basedir to be configured') return str(to_posix_path(basedir) / path)
def _fmt_bytes(n): for unit in ('B', 'KB', 'MB', 'GB', 'TB'): if abs(n) < 1024: return f'{n:.1f} {unit}' n /= 1024 return f'{n:.1f} PB' def _print_progress(task_info): status = task_info.get('status', 'ACTIVE') files_done = task_info.get('files_transferred') or 0 files_total = task_info.get('files') or 0 transferred = task_info.get('bytes_transferred') or 0 speed = task_info.get('effective_bytes_per_second') or 0 if status == 'INACTIVE': line = 'Transfer paused (INACTIVE) — check Globus Web App for details' elif files_total == 0: line = 'Queued, waiting for transfer to start...' else: line = (f'{files_done}/{files_total} files ' f'{_fmt_bytes(transferred)} transferred ' f'{_fmt_bytes(speed)}/s') cols = shutil.get_terminal_size().columns print(f'\r{line:<{cols}}', end='', flush=True)
[docs] def main(): parser = argparse.ArgumentParser() parser.add_argument( 'src', metavar='endpoint:path', help='Source in endpoint-name:/abs/path or endpoint-name:rel/path format') parser.add_argument( 'dst', metavar='endpoint:path', help='Destination in endpoint-name:/abs/path or endpoint-name:rel/path format') parser.add_argument('--label', help='Transfer label shown in the Globus UI') tx_opts = parser.add_argument_group('transfer options') tx_opts.add_argument('--background', '-b', action='store_true', help='Background mode: Exit immediately after ' 'submitting the transfer task') tx_opts.add_argument('--notify-success', default=False, action=argparse.BooleanOptionalAction, help='Send a notification e-mail when the transfer ' 'completes successfully') tx_opts.add_argument('--notify-failed', default=True, action=argparse.BooleanOptionalAction, help='Send a notification e-mail when the transfer ' 'fails') tx_opts.add_argument('--notify-inactive', default=True, action=argparse.BooleanOptionalAction, help='Send a notification e-mail when the transfer ' 'enters an inactive state') tx_opts.add_argument('--verify', default=False, action='store_true', help='Enable checksum verification on transfer') tx_opts.add_argument('--sync-level', choices=('exists', 'size', 'mtime', 'checksum'), help='Only transfer files where the destination does ' 'not match the source: exists (dst missing), ' 'size (size differs), mtime (dst older than ' 'src), checksum (checksums differ). Default: ' 'transfer all files unconditionally') log_opts = parser.add_mutually_exclusive_group() log_opts.add_argument('--verbose', '-v', action='store_true', help='Enable debug logging') log_opts.add_argument('--quiet', '-q', action='store_true', help='Only log warnings and errors') args = parser.parse_args() setup_logging(verbose=args.verbose, quiet=args.quiet) logger = logging.getLogger(__name__) valid_endpoints = globus.endpoint_names() def parse_ep_path(arg, label): if ':' not in arg: parser.error( f"{label}: expected 'endpoint-name:path' format, got '{arg}'") name, _, path_str = arg.partition(':') if name not in valid_endpoints: parser.error( f"{label}: unknown endpoint '{name}', " f"choose from: {', '.join(valid_endpoints)}") return name, path_str src_name, src_path_str = parse_ep_path(args.src, 'src') dst_name, dst_path_str = parse_ep_path(args.dst, 'dst') src = globus.get_endpoint(src_name) dst = globus.get_endpoint(dst_name) logger.info(f'source endpoint: {src_name}, destination endpoint: {dst_name}') try: src_path = resolve_path(src, src_path_str) dst_path = resolve_path(dst, dst_path_str) except ValueError as e: parser.error(str(e)) # login uuids = [src['uuid'], dst['uuid']] logger.debug(f'logging in to Globus endpoints: {", ".join(uuids)}') try: tc = globus.login(uuids) except RuntimeError as e: logger.error(str(e)) sys.exit(1) except globus_sdk.TransferAPIError as e: logger.error(f'Globus API error during login: {e.message}') sys.exit(1) # stat source to determine file vs directory try: stat = tc.operation_stat(src['uuid'], src_path) except globus_sdk.TransferAPIError as e: if e.http_status == 404: logger.error(f'source path not found: {src_path}') elif e.http_status == 403: logger.error(f'permission denied reading source: {src_path}') else: logger.error(f'error accessing source path: {e.message}') sys.exit(1) pathtype = stat.get('type') recursive = pathtype == 'dir' # ensure the destination directory (or parent, for file transfers) exists mkdir_path = dst_path if recursive else str(PurePosixPath(dst_path).parent) logger.debug(f'ensuring destination directory exists: {mkdir_path}') try: tc.operation_mkdir(dst['uuid'], mkdir_path) except globus_sdk.TransferAPIError as e: if 'Exists' in (e.code or '') or 'exist' in (e.message or '').lower(): pass else: logger.error(f'failed to create destination directory: {e.message}') sys.exit(1) # build the transfer task label = args.label if args.label is None: now = dt.now(tz=tz.utc) now_str = now.strftime('%m/%d/%Y, %H:%M:%S') label = f'Pipeline transfer ({now_str})' logger.info(f'building transfer task: {label}') tx = globus_sdk.TransferData(source_endpoint=src['uuid'], destination_endpoint=dst['uuid'], label=label, sync_level=args.sync_level, notify_on_succeeded=args.notify_success, notify_on_failed=args.notify_failed, notify_on_inactive=args.notify_inactive, verify_checksum=args.verify) logger.debug(f'{src_path} -> {dst_path} (recursive: {recursive})') tx.add_item(src_path, dst_path, recursive=recursive) # queue the transfer task logger.info('submitting transfer task...') try: task = tc.submit_transfer(tx) except globus_sdk.TransferAPIError as e: logger.error(f'failed to submit transfer task: {e.message}') sys.exit(1) task_id = task['task_id'] logger.info(f'successfully submitted transfer task: {task_id}') if args.background: sys.exit() # wait for completion logger.info('waiting for transfer to complete...') use_progress = sys.stdout.isatty() while True: try: task_info = tc.get_task(task_id) except globus_sdk.TransferAPIError as e: logger.warning(f'error polling task status: {e.message}') time.sleep(5) continue if task_info['status'] == 'ACTIVE': try: task_events = tc.task_event_list( task_id, query_params={'filter': 'is_error:1'})['DATA'] except globus_sdk.TransferAPIError as e: logger.debug(f'error fetching task events: {e.message}') task_events = [] if task_events: if use_progress: print(flush=True) logger.debug(task_events) ev = task_events[0] err_desc = ev.get('description', 'unknown error') err_path = '' try: details = json.loads(ev.get('details', '{}')) path = details['context'][0].get('path') if path: err_path = f": '{path}'" except (json.JSONDecodeError, KeyError, IndexError, TypeError): pass logger.error(f'transfer failed. {err_desc}{err_path}') try: tc.cancel_task(task_id) except globus_sdk.TransferAPIError: pass sys.exit(1) if use_progress: _print_progress(task_info) if task_info['status'] not in ('ACTIVE', 'INACTIVE'): break time.sleep(5) if use_progress: print(flush=True) if task_info['status'] == 'FAILED': fatal = task_info.get('fatal_error') or {} err_desc = fatal.get('description', 'unknown error') logger.error(f'transfer task failed: {err_desc}') sys.exit(1) logger.info('done.')
if __name__ == "__main__": main()