|
- #!/usr/bin/env python3
-
- # Copyright (c) Facebook, Inc. and its affiliates.
- #
- # This source code is licensed under the MIT license found in the
- # LICENSE file in the root directory of this source tree.
-
- import logging
- import os
- import shutil
- from typing import List, Optional
-
-
- logger = logging.getLogger(__file__)
-
-
- try:
- from iopath.common.file_io import g_pathmgr as IOPathManager
-
- try:
- # [FB only - for now] AWS PathHandler for PathManager
- from .fb_pathhandlers import S3PathHandler
-
- IOPathManager.register_handler(S3PathHandler())
- except KeyError:
- logging.warning("S3PathHandler already registered.")
- except ImportError:
- logging.debug(
- "S3PathHandler couldn't be imported. Either missing fb-only files, or boto3 module."
- )
-
- except ImportError:
- IOPathManager = None
-
-
- class PathManager:
- """
- Wrapper for insulating OSS I/O (using Python builtin operations) from
- iopath's PathManager abstraction (for transparently handling various
- internal backends).
- """
-
- @staticmethod
- def open(
- path: str,
- mode: str = "r",
- buffering: int = -1,
- encoding: Optional[str] = None,
- errors: Optional[str] = None,
- newline: Optional[str] = None,
- ):
- if IOPathManager:
- return IOPathManager.open(
- path=path,
- mode=mode,
- buffering=buffering,
- encoding=encoding,
- errors=errors,
- newline=newline,
- )
- return open(
- path,
- mode=mode,
- buffering=buffering,
- encoding=encoding,
- errors=errors,
- newline=newline,
- )
-
- @staticmethod
- def copy(src_path: str, dst_path: str, overwrite: bool = False) -> bool:
- if IOPathManager:
- return IOPathManager.copy(
- src_path=src_path, dst_path=dst_path, overwrite=overwrite
- )
- return shutil.copyfile(src_path, dst_path)
-
- @staticmethod
- def get_local_path(path: str, **kwargs) -> str:
- if IOPathManager:
- return IOPathManager.get_local_path(path, **kwargs)
- return path
-
- @staticmethod
- def exists(path: str) -> bool:
- if IOPathManager:
- return IOPathManager.exists(path)
- return os.path.exists(path)
-
- @staticmethod
- def isfile(path: str) -> bool:
- if IOPathManager:
- return IOPathManager.isfile(path)
- return os.path.isfile(path)
-
- @staticmethod
- def ls(path: str) -> List[str]:
- if IOPathManager:
- return IOPathManager.ls(path)
- return os.listdir(path)
-
- @staticmethod
- def mkdirs(path: str) -> None:
- if IOPathManager:
- return IOPathManager.mkdirs(path)
- os.makedirs(path, exist_ok=True)
-
- @staticmethod
- def rm(path: str) -> None:
- if IOPathManager:
- return IOPathManager.rm(path)
- os.remove(path)
-
- @staticmethod
- def chmod(path: str, mode: int) -> None:
- if not PathManager.path_requires_pathmanager(path):
- os.chmod(path, mode)
-
- @staticmethod
- def register_handler(handler) -> None:
- if IOPathManager:
- return IOPathManager.register_handler(handler=handler)
-
- @staticmethod
- def copy_from_local(
- local_path: str, dst_path: str, overwrite: bool = False, **kwargs
- ) -> None:
- if IOPathManager:
- return IOPathManager.copy_from_local(
- local_path=local_path, dst_path=dst_path, overwrite=overwrite, **kwargs
- )
- return shutil.copyfile(local_path, dst_path)
-
- @staticmethod
- def path_requires_pathmanager(path: str) -> bool:
- """Do we require PathManager to access given path?"""
- if IOPathManager:
- for p in IOPathManager._path_handlers.keys():
- if path.startswith(p):
- return True
- return False
-
- @staticmethod
- def supports_rename(path: str) -> bool:
- # PathManager doesn't yet support renames
- return not PathManager.path_requires_pathmanager(path)
-
- @staticmethod
- def rename(src: str, dst: str):
- os.rename(src, dst)
-
- """
- ioPath async PathManager methods:
- """
- @staticmethod
- def opena(
- path: str,
- mode: str = "r",
- buffering: int = -1,
- encoding: Optional[str] = None,
- errors: Optional[str] = None,
- newline: Optional[str] = None,
- ):
- """
- Return file descriptor with asynchronous write operations.
- """
- global IOPathManager
- if not IOPathManager:
- logging.info("ioPath is initializing PathManager.")
- try:
- from iopath.common.file_io import PathManager
- IOPathManager = PathManager()
- except Exception:
- logging.exception("Failed to initialize ioPath PathManager object.")
- return IOPathManager.opena(
- path=path,
- mode=mode,
- buffering=buffering,
- encoding=encoding,
- errors=errors,
- newline=newline,
- )
-
- @staticmethod
- def async_close() -> bool:
- """
- Wait for files to be written and clean up asynchronous PathManager.
- NOTE: `PathManager.async_close()` must be called at the end of any
- script that uses `PathManager.opena(...)`.
- """
- global IOPathManager
- if IOPathManager:
- return IOPathManager.async_close()
- return False
|