Skip to content

cloudpathlib.AzureBlobPath

Bases: CloudPath

Class for representing and operating on Azure Blob Storage URIs, in the style of the Python standard library's pathlib module. Instances represent a path in Blob Storage with filesystem path semantics, and convenient methods allow for basic operations like joining, reading, writing, iterating over contents, etc. This class almost entirely mimics the pathlib.Path interface, so most familiar properties and methods should be available and behave in the expected way.

The AzureBlobClient class handles authentication with Azure. If a client instance is not explicitly specified on AzureBlobPath instantiation, a default client is used. See AzureBlobClient's documentation for more details.

Source code in cloudpathlib/azure/azblobpath.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@register_path_class("azure")
class AzureBlobPath(CloudPath):
    """Class for representing and operating on Azure Blob Storage URIs, in the style of the Python
    standard library's [`pathlib` module](https://docs.python.org/3/library/pathlib.html).
    Instances represent a path in Blob Storage with filesystem path semantics, and convenient
    methods allow for basic operations like joining, reading, writing, iterating over contents,
    etc. This class almost entirely mimics the [`pathlib.Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
    interface, so most familiar properties and methods should be available and behave in the
    expected way.

    The [`AzureBlobClient`](../azblobclient/) class handles authentication with Azure. If a
    client instance is not explicitly specified on `AzureBlobPath` instantiation, a default client
    is used. See `AzureBlobClient`'s documentation for more details.
    """

    cloud_prefix: str = "az://"
    client: "AzureBlobClient"

    @property
    def drive(self) -> str:
        return self.container

    def mkdir(self, parents=False, exist_ok=False, mode: Optional[Any] = None):
        self.client._mkdir(self, parents=parents, exist_ok=exist_ok)

    def touch(self, exist_ok: bool = True, mode: Optional[Any] = None):
        if self.exists():
            if not exist_ok:
                raise FileExistsError(f"File exists: {self}")
            self.client._move_file(self, self)
        else:
            tf = TemporaryDirectory()
            p = Path(tf.name) / "empty"
            p.touch()

            self.client._upload_file(p, self)

            tf.cleanup()

    def stat(self, follow_symlinks=True):
        try:
            meta = self.client._get_metadata(self)
        except ResourceNotFoundError:
            raise NoStatError(
                f"No stats available for {self}; it may be a directory or not exist."
            )

        return os.stat_result(
            (
                None,  # mode
                None,  # ino
                self.cloud_prefix,  # dev,
                None,  # nlink,
                None,  # uid,
                None,  # gid,
                meta.get("size", 0),  # size,
                None,  # atime,
                meta.get("last_modified", 0).timestamp(),  # mtime,
                None,  # ctime,
            )
        )

    def replace(self, target: "AzureBlobPath") -> "AzureBlobPath":
        try:
            return super().replace(target)

        # we can rename directories on ADLS Gen2
        except CloudPathIsADirectoryError:
            if self.client._check_hns(self):
                return self.client._move_file(self, target)
            else:
                raise

    @property
    def container(self) -> str:
        return self._no_prefix.split("/", 1)[0]

    @property
    def blob(self) -> str:
        key = self._no_prefix_no_drive

        # key should never have starting slash for
        if key.startswith("/"):
            key = key[1:]

        return key

    @property
    def etag(self):
        return self.client._get_metadata(self).get("etag", None)

    @property
    def md5(self) -> str:
        return self.client._get_metadata(self).get("content_settings", {}).get("content_md5", None)

Attributes

anchor: str property

blob: str property

client: AzureBlobClient instance-attribute

cloud_prefix: str = 'az://' class-attribute instance-attribute

container: str property

drive: str property

etag property

fspath: str property

md5: str property

name: str property

parent: Self property

parents: Tuple[Self, ...] property

parser: Self property

parts: Tuple[str, ...] property

stem: str property

suffix: str property

suffixes: List[str] property

Functions

__init__(cloud_path: Union[str, Self, CloudPath], *parts: str, client: Optional[Client] = None) -> None

Source code in cloudpathlib/cloudpath.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def __init__(
    self,
    cloud_path: Union[str, Self, "CloudPath"],
    *parts: str,
    client: Optional["Client"] = None,
) -> None:
    # handle if local file gets opened. must be set at the top of the method in case any code
    # below raises an exception, this prevents __del__ from raising an AttributeError
    self._handle: Optional[IO] = None
    self._client: Optional["Client"] = None

    if parts:
        # ensure first part ends in "/"; (sometimes it is just prefix, sometimes a longer path)
        if not str(cloud_path).endswith("/"):
            cloud_path = str(cloud_path) + "/"

        cloud_path = str(cloud_path) + "/".join(p.strip("/") for p in parts)

    self.is_valid_cloudpath(cloud_path, raise_on_error=True)
    self._cloud_meta.validate_completeness()

    # versions of the raw string that provide useful methods
    self._str = str(cloud_path)
    self._url = urlparse(self._str)
    self._path = PurePosixPath(f"/{self._no_prefix}")

    # setup client
    if client is None:
        if isinstance(cloud_path, CloudPath):
            self._client = cloud_path.client
    else:
        self._client = client

    if client is not None and not isinstance(client, self._cloud_meta.client_class):
        raise ClientMismatchError(
            f"Client of type [{client.__class__}] is not valid for cloud path of type "
            f"[{self.__class__}]; must be instance of [{self._cloud_meta.client_class}], or "
            f"None to use default client for this cloud path class."
        )

    # track if local has been written to, if so it may need to be uploaded
    self._dirty = False

absolute() -> Self

Source code in cloudpathlib/cloudpath.py
1008
1009
def absolute(self) -> Self:
    return self

as_uri() -> str

Source code in cloudpathlib/cloudpath.py
478
479
def as_uri(self) -> str:
    return str(self)

as_url(presign: bool = False, expire_seconds: int = 60 * 60) -> str

Source code in cloudpathlib/cloudpath.py
460
461
462
463
464
465
def as_url(self, presign: bool = False, expire_seconds: int = 60 * 60) -> str:
    if presign:
        url = self.client._generate_presigned_url(self, expire_seconds=expire_seconds)
    else:
        url = self.client._get_public_url(self)
    return url

clear_cache()

Removes cache if it exists

Source code in cloudpathlib/cloudpath.py
1521
1522
1523
1524
1525
1526
1527
def clear_cache(self):
    """Removes cache if it exists"""
    if self._local.exists():
        if self._local.is_file():
            self._local.unlink()
        else:
            shutil.rmtree(self._local)

copy(target: Union[str, os.PathLike, Self], follow_symlinks: bool = True, preserve_metadata: bool = False, force_overwrite_to_cloud: Optional[bool] = None) -> Union[Path, Self]

copy(
    target: Self,
    follow_symlinks=True,
    preserve_metadata=False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Self
copy(
    target: Path,
    follow_symlinks=True,
    preserve_metadata=False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Path
copy(
    target: str,
    follow_symlinks=True,
    preserve_metadata=False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, CloudPath]

Copy self to target folder or file, if self is a file.

Source code in cloudpathlib/cloudpath.py
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
def copy(
    self,
    target: Union[str, os.PathLike, Self],
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, Self]:
    """Copy self to target folder or file, if self is a file."""
    return self._copy(
        target,
        follow_symlinks=follow_symlinks,
        preserve_metadata=preserve_metadata,
        force_overwrite_to_cloud=force_overwrite_to_cloud,
        remove_src=False,
    )

copy_into(target_dir: Union[str, os.PathLike, Self], follow_symlinks: bool = True, preserve_metadata: bool = False, force_overwrite_to_cloud: Optional[bool] = None) -> Union[Path, Self]

copy_into(
    target_dir: Self,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Self
copy_into(
    target_dir: Path,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Path
copy_into(
    target_dir: str,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, CloudPath]

Copy self into target directory, preserving the filename.

Source code in cloudpathlib/cloudpath.py
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
def copy_into(
    self,
    target_dir: Union[str, os.PathLike, Self],
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, Self]:
    """Copy self into target directory, preserving the filename."""
    target_path = anypath.to_anypath(target_dir) / self.name

    result = self._copy(
        target_path,
        follow_symlinks=follow_symlinks,
        preserve_metadata=preserve_metadata,
        force_overwrite_to_cloud=force_overwrite_to_cloud,
        remove_src=False,
    )
    return cast(Union[Path, Self], result)

copytree(destination, force_overwrite_to_cloud=None, ignore=None)

copytree(
    destination: Self,
    force_overwrite_to_cloud: Optional[bool] = None,
    ignore: Optional[
        Callable[[str, Iterable[str]], Container[str]]
    ] = None,
) -> Self
copytree(
    destination: Path,
    force_overwrite_to_cloud: Optional[bool] = None,
    ignore: Optional[
        Callable[[str, Iterable[str]], Container[str]]
    ] = None,
) -> Path
copytree(
    destination: Union[str, os.PathLike, Self],
    force_overwrite_to_cloud: Optional[bool] = None,
    ignore: Optional[
        Callable[[str, Iterable[str]], Container[str]]
    ] = None,
) -> Union[Path, CloudPath]

Copy self to a directory, if self is a directory.

Source code in cloudpathlib/cloudpath.py
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
def copytree(self, destination, force_overwrite_to_cloud=None, ignore=None):
    """Copy self to a directory, if self is a directory."""
    if not self.is_dir():
        raise CloudPathNotADirectoryError(
            f"Origin path {self} must be a directory. To copy a single file use the method copy."
        )

    destination = anypath.to_anypath(destination)

    if destination.exists() and destination.is_file():
        raise CloudPathFileExistsError(
            f"Destination path {destination} of copytree must be a directory."
        )

    contents = list(self.iterdir())

    if ignore is not None:
        ignored_names = ignore(self._no_prefix_no_drive, [x.name for x in contents])
    else:
        ignored_names = set()

    destination.mkdir(parents=True, exist_ok=True)

    for subpath in contents:
        if subpath.name in ignored_names:
            continue
        if subpath.is_file():
            subpath.copy(
                destination / subpath.name, force_overwrite_to_cloud=force_overwrite_to_cloud
            )
        elif subpath.is_dir():
            subpath.copytree(
                destination / (subpath.name + ("" if subpath.name.endswith("/") else "/")),
                force_overwrite_to_cloud=force_overwrite_to_cloud,
                ignore=ignore,
            )

    return destination

download_to(destination: Union[str, os.PathLike]) -> Path

Source code in cloudpathlib/cloudpath.py
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
def download_to(self, destination: Union[str, os.PathLike]) -> Path:
    destination = Path(destination)

    if not self.exists():
        raise CloudPathNotExistsError(f"Cannot download because path does not exist: {self}")

    if self.is_file():
        if destination.is_dir():
            destination = destination / self.name
        return self.client._download_file(self, destination)
    else:
        destination.mkdir(exist_ok=True)
        for f in self.iterdir():
            rel = str(self)
            if not rel.endswith("/"):
                rel = rel + "/"

            rel_dest = str(f)[len(rel) :]
            f.download_to(destination / rel_dest)

        return destination

exists(follow_symlinks=True) -> bool

Source code in cloudpathlib/cloudpath.py
481
482
def exists(self, follow_symlinks=True) -> bool:
    return self.client._exists(self)

from_uri(uri: str) -> Self classmethod

Source code in cloudpathlib/cloudpath.py
494
495
496
@classmethod
def from_uri(cls, uri: str) -> Self:
    return cls(uri)

full_match(pattern: str, case_sensitive: Optional[bool] = None) -> bool

Source code in cloudpathlib/cloudpath.py
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
def full_match(self, pattern: str, case_sensitive: Optional[bool] = None) -> bool:
    if sys.version_info < (3, 13):
        raise NotImplementedError("full_match requires Python 3.13 or higher")

    # strip scheme from start of pattern before testing
    if pattern.startswith(self.anchor + self.drive):
        pattern = pattern[len(self.anchor + self.drive) :]
    elif pattern.startswith(self.anchor):
        # for http paths, keep leading slash
        pattern = pattern[len(self.anchor) - 1 :]

    # remove drive, which is kept on normal dispatch to pathlib
    return PurePosixPath(self._no_prefix_no_drive).full_match(  # type: ignore[attr-defined]
        pattern, case_sensitive=case_sensitive
    )

glob(pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None, recurse_symlinks: bool = True) -> Generator[Self, None, None]

Source code in cloudpathlib/cloudpath.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def glob(
    self,
    pattern: Union[str, os.PathLike],
    case_sensitive: Optional[bool] = None,
    recurse_symlinks: bool = True,
) -> Generator[Self, None, None]:
    pattern = self._glob_checks(pattern)

    pattern_parts = PurePosixPath(pattern).parts
    selector = _make_selector(
        tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
    )

    yield from self._glob(
        selector,
        "/" in pattern
        or "**"
        in pattern,  # recursive listing needed if explicit ** or any sub folder in pattern
    )

info() -> CloudPathInfo

Return a CloudPathInfo object for this path.

Source code in cloudpathlib/cloudpath.py
1159
1160
1161
def info(self) -> "CloudPathInfo":
    """Return a CloudPathInfo object for this path."""
    return CloudPathInfo(self)

is_absolute() -> bool

Source code in cloudpathlib/cloudpath.py
1011
1012
def is_absolute(self) -> bool:
    return True

is_dir(follow_symlinks=True) -> bool

Source code in cloudpathlib/cloudpath.py
484
485
def is_dir(self, follow_symlinks=True) -> bool:
    return self.client._is_file_or_dir(self) == "dir"

is_file(follow_symlinks=True) -> bool

Source code in cloudpathlib/cloudpath.py
487
488
def is_file(self, follow_symlinks=True) -> bool:
    return self.client._is_file_or_dir(self) == "file"

is_junction()

Source code in cloudpathlib/cloudpath.py
957
958
def is_junction(self):
    return False  # only windows paths can be junctions, not cloudpaths

is_relative_to(other: Self) -> bool

Source code in cloudpathlib/cloudpath.py
1034
1035
1036
1037
1038
1039
def is_relative_to(self, other: Self) -> bool:
    try:
        self.relative_to(other)
        return True
    except ValueError:
        return False

is_valid_cloudpath(path: Union[str, CloudPath], raise_on_error: bool = False) -> Union[bool, TypeGuard[Self]] classmethod

is_valid_cloudpath(
    path: CloudPath, raise_on_error: bool = ...
) -> TypeGuard[Self]
is_valid_cloudpath(
    path: str, raise_on_error: bool = ...
) -> bool
Source code in cloudpathlib/cloudpath.py
358
359
360
361
362
363
364
365
366
367
368
369
@classmethod
def is_valid_cloudpath(
    cls, path: Union[str, "CloudPath"], raise_on_error: bool = False
) -> Union[bool, TypeGuard[Self]]:
    valid = str(path).lower().startswith(cls.cloud_prefix.lower())

    if raise_on_error and not valid:
        raise InvalidPrefixError(
            f"'{path}' is not a valid path since it does not start with '{cls.cloud_prefix}'"
        )

    return valid

iterdir() -> Generator[Self, None, None]

Source code in cloudpathlib/cloudpath.py
599
600
601
602
def iterdir(self) -> Generator[Self, None, None]:
    for f, _ in self.client._list_dir(self, recursive=False):
        if f != self:  # iterdir does not include itself in pathlib
            yield f

joinpath(*pathsegments: Union[str, os.PathLike]) -> Self

Source code in cloudpathlib/cloudpath.py
1005
1006
def joinpath(self, *pathsegments: Union[str, os.PathLike]) -> Self:
    return self._dispatch_to_path("joinpath", *pathsegments)

match(path_pattern: str, case_sensitive: Optional[bool] = None) -> bool

Source code in cloudpathlib/cloudpath.py
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
def match(self, path_pattern: str, case_sensitive: Optional[bool] = None) -> bool:
    # strip scheme from start of pattern before testing
    if path_pattern.startswith(self.anchor + self.drive + "/"):
        path_pattern = path_pattern[len(self.anchor + self.drive + "/") :]

    kwargs = dict(case_sensitive=case_sensitive)

    if sys.version_info < (3, 12):
        kwargs.pop("case_sensitive")

    return self._dispatch_to_path("match", path_pattern, **kwargs)

mkdir(parents=False, exist_ok=False, mode: Optional[Any] = None)

Source code in cloudpathlib/azure/azblobpath.py
42
43
def mkdir(self, parents=False, exist_ok=False, mode: Optional[Any] = None):
    self.client._mkdir(self, parents=parents, exist_ok=exist_ok)

move(target: Union[str, os.PathLike, Self], follow_symlinks: bool = True, preserve_metadata: bool = False, force_overwrite_to_cloud: Optional[bool] = None) -> Union[Path, Self]

move(
    target: Self,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Self
move(
    target: Path,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Path
move(
    target: str,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, CloudPath]

Move self to target location, removing the source.

Source code in cloudpathlib/cloudpath.py
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
def move(
    self,
    target: Union[str, os.PathLike, Self],
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, Self]:
    """Move self to target location, removing the source."""
    return self._copy(
        target,
        follow_symlinks=follow_symlinks,
        preserve_metadata=preserve_metadata,
        force_overwrite_to_cloud=force_overwrite_to_cloud,
        remove_src=True,
    )

move_into(target_dir: Union[str, os.PathLike, Self], follow_symlinks: bool = True, preserve_metadata: bool = False, force_overwrite_to_cloud: Optional[bool] = None) -> Union[Path, Self]

move_into(
    target_dir: Self,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Self
move_into(
    target_dir: Path,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Path
move_into(
    target_dir: str,
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, CloudPath]

Move self into target directory, preserving the filename and removing the source.

Source code in cloudpathlib/cloudpath.py
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
def move_into(
    self,
    target_dir: Union[str, os.PathLike, Self],
    follow_symlinks: bool = True,
    preserve_metadata: bool = False,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Union[Path, Self]:
    """Move self into target directory, preserving the filename and removing the source."""
    target_path = anypath.to_anypath(target_dir) / self.name

    result = self._copy(
        target_path,
        follow_symlinks=follow_symlinks,
        preserve_metadata=preserve_metadata,
        force_overwrite_to_cloud=force_overwrite_to_cloud,
        remove_src=True,
    )
    return cast(Union[Path, Self], result)

open(mode: str = 'r', buffering: int = -1, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, force_overwrite_from_cloud: Optional[bool] = None, force_overwrite_to_cloud: Optional[bool] = None, buffer_size: Optional[int] = None) -> IO[Any]

open(
    mode: OpenTextMode = "r",
    buffering: int = -1,
    encoding: Optional[str] = None,
    errors: Optional[str] = None,
    newline: Optional[str] = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> TextIOWrapper
open(
    mode: OpenBinaryMode,
    buffering: Literal[0],
    encoding: None = None,
    errors: None = None,
    newline: None = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> FileIO
open(
    mode: OpenBinaryModeUpdating,
    buffering: Literal[-1, 1] = -1,
    encoding: None = None,
    errors: None = None,
    newline: None = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedRandom
open(
    mode: OpenBinaryModeWriting,
    buffering: Literal[-1, 1] = -1,
    encoding: None = None,
    errors: None = None,
    newline: None = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedWriter
open(
    mode: OpenBinaryModeReading,
    buffering: Literal[-1, 1] = -1,
    encoding: None = None,
    errors: None = None,
    newline: None = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> BufferedReader
open(
    mode: OpenBinaryMode,
    buffering: int = -1,
    encoding: None = None,
    errors: None = None,
    newline: None = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> BinaryIO
open(
    mode: str,
    buffering: int = -1,
    encoding: Optional[str] = None,
    errors: Optional[str] = None,
    newline: Optional[str] = None,
    force_overwrite_from_cloud: Optional[bool] = None,
    force_overwrite_to_cloud: Optional[bool] = None,
) -> IO[Any]
Source code in cloudpathlib/cloudpath.py
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
def open(
    self,
    mode: str = "r",
    buffering: int = -1,
    encoding: Optional[str] = None,
    errors: Optional[str] = None,
    newline: Optional[str] = None,
    force_overwrite_from_cloud: Optional[bool] = None,  # extra kwarg not in pathlib
    force_overwrite_to_cloud: Optional[bool] = None,  # extra kwarg not in pathlib
    buffer_size: Optional[int] = None,  # extra kwarg for streaming mode
) -> "IO[Any]":
    # if trying to call open on a directory that exists
    exists_on_cloud = self.exists()

    if exists_on_cloud and not self.is_file():
        raise CloudPathIsADirectoryError(
            f"Cannot open directory, only files. Tried to open ({self})"
        )

    if not exists_on_cloud and any(m in mode for m in ("r", "a")):
        raise CloudPathFileNotFoundError(
            f"File opened for read or append, but it does not exist on cloud: {self}"
        )

    if mode == "x" and self.exists():
        raise CloudPathFileExistsError(f"Cannot open existing file ({self}) for creation.")

    # Use streaming I/O if file_cache_mode is streaming
    if self.client.file_cache_mode == FileCacheMode.streaming:
        # Import here to keep it localized to streaming functionality
        from .cloud_io import CloudBufferedIO, CloudTextIO

        # Get the raw IO class from the cloud implementation
        raw_io_class = self._cloud_meta.raw_io_class
        if raw_io_class is None:
            raise NotImplementedError(
                f"Streaming I/O is not implemented for {self._cloud_meta.name}"
            )

        # Calculate buffer size from buffering or buffer_size parameter
        if buffer_size is None:
            if buffering == 0:
                # Unbuffered binary mode
                buffer_size = 1  # Minimal buffering
                if "b" not in mode:
                    mode += "b"  # Force binary mode for unbuffered
            elif buffering > 0:
                buffer_size = buffering
            else:
                buffer_size = 64 * 1024  # Default 64 KiB

        # Return appropriate streaming I/O object
        if "b" in mode:
            return CloudBufferedIO(  # type: ignore[return-value]
                raw_io_class=raw_io_class,
                client=self.client,
                cloud_path=self,
                mode=mode,
                buffer_size=buffer_size,
            )
        else:
            return CloudTextIO(  # type: ignore[return-value]
                raw_io_class=raw_io_class,
                client=self.client,
                cloud_path=self,
                mode=mode,
                encoding=encoding,
                errors=errors,
                newline=newline,
                buffer_size=buffer_size,
            )

    # Standard cached mode
    self._refresh_cache(force_overwrite_from_cloud=force_overwrite_from_cloud)

    # create any directories that may be needed if the file is new
    if not self._local.exists():
        self._local.parent.mkdir(parents=True, exist_ok=True)
        original_mtime = 0.0
    else:
        original_mtime = self._local.stat().st_mtime

    buffer = self._local.open(
        mode=mode,
        buffering=buffering,
        encoding=encoding,
        errors=errors,
        newline=newline,
    )

    # write modes need special on closing the buffer
    if any(m in mode for m in ("w", "+", "x", "a")):
        # dirty, handle, patch close
        wrapped_close = buffer.close

        # since we are pretending this is a cloud file, upload it to the cloud
        # when the buffer is closed
        def _patched_close_upload(*args, **kwargs) -> None:
            wrapped_close(*args, **kwargs)

            # we should be idempotent and not upload again if
            # we already ran our close method patch
            if not self._dirty:
                return

            # original mtime should match what was in the cloud; because of system clocks or rounding
            # by the cloud provider, the new version in our cache is "older" than the original version;
            # explicitly set the new modified time to be after the original modified time.
            if self._local.stat().st_mtime < original_mtime:
                new_mtime = original_mtime + 1
                os.utime(self._local, times=(new_mtime, new_mtime))

            self._upload_local_to_cloud(force_overwrite_to_cloud=force_overwrite_to_cloud)
            self._dirty = False

        buffer.close = _patched_close_upload  # type: ignore

        # keep reference in case we need to close when __del__ is called on this object
        self._handle = buffer

        # opened for write, so mark dirty
        self._dirty = True

    # if we don't want any cache around, remove the cache
    # as soon as the file is closed
    if self.client.file_cache_mode == FileCacheMode.close_file:
        # this may be _patched_close_upload, in which case we need to
        # make sure to call that first so the file gets uploaded
        wrapped_close_for_cache = buffer.close

        def _patched_close_empty_cache(*args, **kwargs):
            wrapped_close_for_cache(*args, **kwargs)

            # remove local file as last step on closing
            self.clear_cache()

        buffer.close = _patched_close_empty_cache  # type: ignore

    return buffer

read_bytes() -> bytes

Source code in cloudpathlib/cloudpath.py
944
945
946
def read_bytes(self) -> bytes:
    with self.open(mode="rb") as f:
        return f.read()

read_text(encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None) -> str

Source code in cloudpathlib/cloudpath.py
948
949
950
951
952
953
954
955
def read_text(
    self,
    encoding: Optional[str] = None,
    errors: Optional[str] = None,
    newline: Optional[str] = None,
) -> str:
    with self.open(mode="r", encoding=encoding, errors=errors, newline=newline) as f:
        return f.read()

relative_to(other: Self, walk_up: bool = False) -> PurePosixPath

Source code in cloudpathlib/cloudpath.py
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
def relative_to(self, other: Self, walk_up: bool = False) -> PurePosixPath:
    # We don't dispatch regularly since this never returns a cloud path (since it is relative, and cloud paths are
    # absolute)
    if not isinstance(other, CloudPath):
        raise ValueError(f"{self} is a cloud path, but {other} is not")
    if self.anchor != other.anchor:
        raise ValueError(
            f"{self} is a {self.anchor} path, but {other} is a {other.anchor} path"
        )

    kwargs = dict(walk_up=walk_up)

    if sys.version_info < (3, 12):
        kwargs.pop("walk_up")

    return self._path.relative_to(other._path, **kwargs)  # type: ignore[call-arg]

rename(target: Self) -> Self

Source code in cloudpathlib/cloudpath.py
884
885
886
887
def rename(self, target: Self) -> Self:
    # for cloud services replace == rename since we don't just rename,
    # we actually move files
    return self.replace(target)

replace(target: AzureBlobPath) -> AzureBlobPath

Source code in cloudpathlib/azure/azblobpath.py
82
83
84
85
86
87
88
89
90
91
def replace(self, target: "AzureBlobPath") -> "AzureBlobPath":
    try:
        return super().replace(target)

    # we can rename directories on ADLS Gen2
    except CloudPathIsADirectoryError:
        if self.client._check_hns(self):
            return self.client._move_file(self, target)
        else:
            raise

resolve(strict: bool = False) -> Self

Source code in cloudpathlib/cloudpath.py
1014
1015
def resolve(self, strict: bool = False) -> Self:
    return self

rglob(pattern: Union[str, os.PathLike], case_sensitive: Optional[bool] = None, recurse_symlinks: bool = True) -> Generator[Self, None, None]

Source code in cloudpathlib/cloudpath.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
def rglob(
    self,
    pattern: Union[str, os.PathLike],
    case_sensitive: Optional[bool] = None,
    recurse_symlinks: bool = True,
) -> Generator[Self, None, None]:
    pattern = self._glob_checks(pattern)

    pattern_parts = PurePosixPath(pattern).parts
    selector = _make_selector(
        ("**",) + tuple(pattern_parts), _posix_flavour, case_sensitive=case_sensitive
    )

    yield from self._glob(selector, True)

rmdir() -> None

Source code in cloudpathlib/cloudpath.py
889
890
891
892
893
894
895
896
897
898
899
900
901
def rmdir(self) -> None:
    if self.is_file():
        raise CloudPathNotADirectoryError(
            f"Path {self} is a file; call unlink instead of rmdir."
        )
    try:
        next(self.iterdir())
        raise DirectoryNotEmptyError(
            f"Directory not empty: '{self}'. Use rmtree to delete recursively."
        )
    except StopIteration:
        pass
    self.client._remove(self)

rmtree() -> None

Delete an entire directory tree.

Source code in cloudpathlib/cloudpath.py
1186
1187
1188
1189
1190
1191
1192
def rmtree(self) -> None:
    """Delete an entire directory tree."""
    if self.is_file():
        raise CloudPathNotADirectoryError(
            f"Path {self} is a file; call unlink instead of rmtree."
        )
    self.client._remove(self)

samefile(other_path: Union[str, os.PathLike]) -> bool

Source code in cloudpathlib/cloudpath.py
903
904
905
def samefile(self, other_path: Union[str, os.PathLike]) -> bool:
    # all cloud paths are absolute and the paths are used for hash
    return self == other_path

stat(follow_symlinks=True)

Source code in cloudpathlib/azure/azblobpath.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def stat(self, follow_symlinks=True):
    try:
        meta = self.client._get_metadata(self)
    except ResourceNotFoundError:
        raise NoStatError(
            f"No stats available for {self}; it may be a directory or not exist."
        )

    return os.stat_result(
        (
            None,  # mode
            None,  # ino
            self.cloud_prefix,  # dev,
            None,  # nlink,
            None,  # uid,
            None,  # gid,
            meta.get("size", 0),  # size,
            None,  # atime,
            meta.get("last_modified", 0).timestamp(),  # mtime,
            None,  # ctime,
        )
    )

touch(exist_ok: bool = True, mode: Optional[Any] = None)

Source code in cloudpathlib/azure/azblobpath.py
45
46
47
48
49
50
51
52
53
54
55
56
57
def touch(self, exist_ok: bool = True, mode: Optional[Any] = None):
    if self.exists():
        if not exist_ok:
            raise FileExistsError(f"File exists: {self}")
        self.client._move_file(self, self)
    else:
        tf = TemporaryDirectory()
        p = Path(tf.name) / "empty"
        p.touch()

        self.client._upload_file(p, self)

        tf.cleanup()
Source code in cloudpathlib/cloudpath.py
907
908
909
910
911
912
913
def unlink(self, missing_ok: bool = True) -> None:
    # Note: missing_ok defaults to False in pathlib, but changing the default now would be a breaking change.
    if self.is_dir():
        raise CloudPathIsADirectoryError(
            f"Path {self} is a directory; call rmdir instead of unlink."
        )
    self.client._remove(self, missing_ok)

upload_from(source: Union[str, os.PathLike], force_overwrite_to_cloud: Optional[bool] = None) -> Self

Upload a file or directory to the cloud path.

Source code in cloudpathlib/cloudpath.py
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
def upload_from(
    self,
    source: Union[str, os.PathLike],
    force_overwrite_to_cloud: Optional[bool] = None,
) -> Self:
    """Upload a file or directory to the cloud path."""
    source = Path(source)

    if source.is_dir():
        for p in source.iterdir():
            (self / p.name).upload_from(p, force_overwrite_to_cloud=force_overwrite_to_cloud)

        return self

    else:
        if self.exists() and self.is_dir():
            dst = self / source.name
        else:
            dst = self

        dst._upload_file_to_cloud(source, force_overwrite_to_cloud=force_overwrite_to_cloud)

        return dst

validate(v: str) -> Self classmethod

Used as a Pydantic validator. See https://docs.pydantic.dev/2.0/usage/types/custom/

Source code in cloudpathlib/cloudpath.py
1679
1680
1681
1682
1683
@classmethod
def validate(cls, v: str) -> Self:
    """Used as a Pydantic validator. See
    https://docs.pydantic.dev/2.0/usage/types/custom/"""
    return cls(v)

walk(top_down: bool = True, on_error: Optional[Callable] = None, follow_symlinks: bool = False) -> Generator[Tuple[Self, List[str], List[str]], None, None]

Source code in cloudpathlib/cloudpath.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
def walk(
    self,
    top_down: bool = True,
    on_error: Optional[Callable] = None,
    follow_symlinks: bool = False,
) -> Generator[Tuple[Self, List[str], List[str]], None, None]:
    try:
        file_tree = self._build_subtree(recursive=True)  # walking is always recursive
        yield from self._walk_results_from_tree(self, file_tree, top_down=top_down)

    except Exception as e:
        if on_error is not None:
            on_error(e)
        else:
            raise

with_name(name: str) -> Self

Source code in cloudpathlib/cloudpath.py
1115
1116
def with_name(self, name: str) -> Self:
    return self._dispatch_to_path("with_name", name)

with_segments(*pathsegments) -> Self

Create a new CloudPath with the same client out of the given segments. The first segment will be interpreted as the bucket/container name.

Source code in cloudpathlib/cloudpath.py
1118
1119
1120
1121
1122
def with_segments(self, *pathsegments) -> Self:
    """Create a new CloudPath with the same client out of the given segments.
    The first segment will be interpreted as the bucket/container name.
    """
    return self._new_cloudpath("/".join(pathsegments))

with_stem(stem: str) -> Self

Source code in cloudpathlib/cloudpath.py
1108
1109
1110
1111
1112
1113
def with_stem(self, stem: str) -> Self:
    try:
        return self._dispatch_to_path("with_stem", stem)
    except AttributeError:
        # with_stem was only added in python 3.9, so we fallback for compatibility
        return self.with_name(stem + self.suffix)

with_suffix(suffix: str) -> Self

Source code in cloudpathlib/cloudpath.py
1124
1125
def with_suffix(self, suffix: str) -> Self:
    return self._dispatch_to_path("with_suffix", suffix)

write_bytes(data: bytes) -> int

Open the file in bytes mode, write to it, and close the file.

NOTE: vendored from pathlib since we override open https://github.com/python/cpython/blob/3.8/Lib/pathlib.py#L1235-L1242

Source code in cloudpathlib/cloudpath.py
915
916
917
918
919
920
921
922
923
924
def write_bytes(self, data: bytes) -> int:
    """Open the file in bytes mode, write to it, and close the file.

    NOTE: vendored from pathlib since we override open
    https://github.com/python/cpython/blob/3.8/Lib/pathlib.py#L1235-L1242
    """
    # type-check for the buffer interface before truncating the file
    view = memoryview(data)
    with self.open(mode="wb") as f:
        return f.write(view)

write_text(data: str, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None) -> int

Open the file in text mode, write to it, and close the file.

NOTE: vendored from pathlib since we override open https://github.com/python/cpython/blob/3.10/Lib/pathlib.py#L1146-L1155

Source code in cloudpathlib/cloudpath.py
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
def write_text(
    self,
    data: str,
    encoding: Optional[str] = None,
    errors: Optional[str] = None,
    newline: Optional[str] = None,
) -> int:
    """Open the file in text mode, write to it, and close the file.

    NOTE: vendored from pathlib since we override open
    https://github.com/python/cpython/blob/3.10/Lib/pathlib.py#L1146-L1155
    """
    if not isinstance(data, str):
        raise TypeError("data must be str, not %s" % data.__class__.__name__)

    with self.open(mode="w", encoding=encoding, errors=errors, newline=newline) as f:
        return f.write(data)