From 405de51e9b26666e27eccca7a2fef8836ae9abf9 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Mon, 11 Nov 2024 20:05:39 -0500 Subject: [PATCH 01/19] wip: adding file move to cloudfiles --- ChangeLog | 38 ++++++++ automated_test.py | 33 +++++++ cloudfiles/cloudfiles.py | 85 +++++++++++++++++- cloudfiles_cli/cloudfiles_cli.py | 146 ++++++++++++++++++++++++++++++- 4 files changed, 294 insertions(+), 8 deletions(-) diff --git a/ChangeLog b/ChangeLog index 824f1da..ac09b31 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,6 +1,44 @@ CHANGES ======= +* fix: add drop table stats +* perf: add in stats table for faster xfer performance +* feat: import improvements to ResumableFileSet from transcoder +* fix: release in xfer +* fix: leasing was broken +* feat: add middleauth+https paths indicate CAVE interface (#106) + +4.26.0 +------ + +* feat: make it possible to normalize e.g. zarr2://./helloworld +* feat: add all current neuroglancer formats to parsing list + +4.25.0 +------ + +* feat: list for apache servers (#104) + +4.24.2 +------ + +* feat: add size operator to CloudFiles for HTTP interface + +4.24.1 +------ + +* fix: close unused connections for HTTP interface + +4.24.0 +------ + +* fix: check for 404 and use proper key variable +* test: modify dne test +* docs: update authors, changelog +* feat: add support for allow\_missing to interfaces +* test: check to see if allow\_missing works +* feat: add allow\_missing to transfer\_to/from + 4.23.0 ------ diff --git a/automated_test.py b/automated_test.py index 8f18b6d..29cfaed 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1151,3 +1151,36 @@ def test_lock_clearing(): assert len(lst) == 0 +@pytest.mark.parametrize("protocol", ('mem', 'file', 's3')) +def test_move(s3, protocol): + from cloudfiles import CloudFiles + + url = compute_url(protocol, "move") + + cf = CloudFiles(url) + cf.puts([ + ('hello', b'world'), + ('lamp', b'emporium'), + ]) + + print(cf.exists(["hola"])) + + cf.move("hello", f"{protocol}://move/hola") + + # assert any(cf.exists(["hello", "lamp"]).values()) == False + print(os.listdir("/tmp/cloudfiles/move/")) + print(cf.exists(["hola"])) + assert all(cf.exists(["hola"]).values()) == True + + + + + + + + + + + + + diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index 1f28988..f960e6f 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -2,7 +2,7 @@ Any, Dict, Optional, Union, List, Tuple, Callable, Generator, - Iterable, cast, BinaryIO + Sequence, cast, BinaryIO ) from queue import Queue @@ -953,6 +953,7 @@ def transfer_to( reencode:Optional[str] = None, content_type:Optional[str] = None, allow_missing:bool = False, + progress:Optional[bool] = None, ) -> None: """ Transfer all files from this CloudFiles storage @@ -969,7 +970,7 @@ def transfer_to( - gs->gs: Uses GCS copy API to minimize data movement - s3->s3: Uses boto s3 copy API to minimize data movement - cf_src: another CloudFiles instance or cloudpath + cf_dest: another CloudFiles instance or cloudpath paths: if None transfer all files from src, else if an iterable, transfer only these files. @@ -997,7 +998,8 @@ def transfer_to( return cf_dest.transfer_from( self, paths, block_size, reencode, content_type, - allow_missing, + allow_missing, + progress, ) def transfer_from( @@ -1008,6 +1010,7 @@ def transfer_from( reencode:Optional[str] = None, content_type:Optional[str] = None, allow_missing:bool = False, + progress:Optional[bool] = None, ) -> None: """ Transfer all files from the source CloudFiles storage @@ -1054,7 +1057,15 @@ def transfer_from( total = totalfn(paths, None) - with tqdm(desc="Transferring", total=total, disable=(not self.progress)) as pbar: + disable = progress + if disable is None: + disable = self.progress + if disable is None: + disable = False + else: + disable = not disable + + with tqdm(desc="Transferring", total=total, disable=disable) as pbar: if ( cf_src.protocol == "file" and self.protocol == "file" @@ -1262,6 +1273,68 @@ def thunk_copy(key): ) return len(results) + def move(self, src:str, dest:str): + """Move (rename) src to dest. + + src and dest do not have to be on the same filesystem. + """ + epath = paths.extract(dest) + full_cloudpath = paths.asprotocolpath(epath) + dest_cloudpath = paths.dirname(full_cloudpath) + base_dest = paths.basename(full_cloudpath) + + return self.moves(dest_cloudpath, [ + (src, base_dest) + ], block_size=1, progress=False) + + def moves( + self, + cf_dest:Any, + path_pairs:Sequence[Tuple[str, str]], + block_size:int = 64, + total:Optional[int] = None, + progress:Optional[bool] = None, + ): + """ + Move (rename) files. + + pairs: [ (src, dest), (src, dest), ... ] + """ + if isinstance(cf_dest, str): + cf_dest = CloudFiles( + cf_dest, progress=False, + green=self.green, num_threads=self.num_threads, + ) + total = totalfn(path_pairs, total) + + disable = not (self.progress if progress is None else progress) + pbar = tqdm(total=total, disable=disable, desc="Moving") + + if self.protocol == "file" and edest.protocol == "file": + for src, dest in pbar: + src = self.join(self.cloudpath, src).replace("file://", "") + dest = cf_dest.join(cf_dest.cloudpath, dest).replace("file://", "") + mkdir(os.path.dirname(dest)) + src, encoding = FileInterface.get_encoded_file_path(src) + _, dest_ext = os.path.splitext(dest) + dest_ext_compress = FileInterface.get_extension(encoding) + if dest_ext_compress != dest_ext: + dest += dest_ext_compress + shutil.move(src, dest) + return + + with pbar: + for subpairs in sip(path_pairs, block_size): + self.transfer_to(cf_dest, paths=( + { + "path": src, + "dest_path": dest, + } + for src, dest in subpairs + ), progress=False) + self.delete(( src for src, dest in subpairs ), progress=False) + pbar.update(len(subpairs)) + def join(self, *paths:str) -> str: """ Convenience method for joining path strings @@ -1440,6 +1513,10 @@ def transfer_from( reencode=reencode, ) + def move(self, dest): + """Move (rename) this file to dest.""" + return self.cf.move(self.filename, dest) + def __len__(self): return self.size() diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index d989d47..2e721d3 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -184,10 +184,6 @@ def cp( If source is "-" read newline delimited filenames from stdin. If destination is "-" output to stdout. - - Note that for gs:// to gs:// transfers, the gsutil - tool is more efficient because the files never leave - Google's network. """ use_stdout = (destination == '-') if len(source) > 1 and not ispathdir(destination) and not use_stdout: @@ -330,6 +326,148 @@ def _cp_stdout(src, no_sign_request, paths): content = res["content"].decode("utf8") sys.stdout.write(content) +@main.command() +@click.argument("source", nargs=-1) +@click.argument("destination", nargs=1) +@click.option('--progress', is_flag=True, default=False, help="Show transfer progress.", show_default=True) +@click.option('-b', '--block-size', default=128, help="Number of files to download at a time.", show_default=True) +@click.option('--part-bytes', default=int(1e8), help="Composite upload threshold in bytes. Splits a file into pieces for some cloud services like gs and s3.", show_default=True) +@click.option('--no-sign-request', is_flag=True, default=False, help="Use s3 in anonymous mode (don't sign requests) for the source.", show_default=True) +@click.pass_context +def mv( + ctx, source, destination, + progress, block_size, + part_bytes, no_sign_request, +): + """ + Move one or more files from a source to destination. + + If source is "-" read newline delimited filenames from stdin. + If destination is "-" output to stdout. + """ + if len(source) > 1 and not ispathdir(destination): + print("cloudfiles: destination must be a directory for multiple source files.") + return + + for src in source: + _mv_single( + ctx, src, destination, + progress, block_size, + part_bytes, no_sign_request + ) + +def _mv_single( + ctx, source, destination, + progress, block_size, + part_bytes, no_sign_request +): + use_stdin = (source == '-') + + nsrc = normalize_path(source) + ndest = normalize_path(destination) + + issrcdir = (ispathdir(source) or CloudFiles(nsrc).isdir()) and use_stdin == False + isdestdir = (ispathdir(destination) or CloudFiles(ndest).isdir()) + + ensrc = cloudfiles.paths.extract(nsrc) + endest = cloudfiles.paths.extract(ndest) + + if ensrc.protocol == "file" and endest.protocol == "file" and issrcdir: + shutil.move(nsrc.replace("file://", ""), ndest.replace("file://")) + return + + recursive = issrcdir + + # For more information see: + # https://cloud.google.com/storage/docs/gsutil/commands/cp#how-names-are-constructed + # Try to follow cp rules. If the directory exists, + # copy the base source directory into the dest directory + # If the directory does not exist, then we copy into + # the dest directory. + # Both x* and x** should not copy the base directory + if recursive and nsrc[-1] != "*": + if isdestdir: + if nsrc[-1] == '/': + nsrc = nsrc[:-1] + ndest = cloudpathjoin(ndest, os.path.basename(nsrc)) + + ctx.ensure_object(dict) + parallel = int(ctx.obj.get("parallel", 1)) + + # The else clause here is to handle single file transfers + srcpath = nsrc if issrcdir else os.path.dirname(nsrc) + many, flat, prefix = get_mfp(nsrc, recursive) + + if issrcdir and not many: + print(f"cloudfiles: {source} is a directory (not copied).") + return + + xferpaths = os.path.basename(nsrc) + if use_stdin: + xferpaths = sys.stdin.readlines() + xferpaths = [ x.replace("\n", "") for x in xferpaths ] + prefix = os.path.commonprefix(xferpaths) + xferpaths = [ x.replace(prefix, "") for x in xferpaths ] + srcpath = cloudpathjoin(srcpath, prefix) + elif many: + xferpaths = CloudFiles( + srcpath, no_sign_request=no_sign_request + ).list(prefix=prefix, flat=flat) + + destpath = ndest + if isinstance(xferpaths, str): + destpath = ndest if isdestdir else os.path.dirname(ndest) + elif not isdestdir: + if os.path.exists(ndest.replace("file://", "")): + print(f"cloudfiles: {ndest} is not a directory (not copied).") + return + + if not isinstance(xferpaths, str): + if parallel == 1: + _mv(srcpath, destpath, progress, block_size, part_bytes, no_sign_request, xferpaths) + return + + total = None + try: + total = len(xferpaths) + except TypeError: + pass + + fn = partial(_mv, srcpath, destpath, False, block_size, part_bytes, no_sign_request) + + with tqdm(desc="Transferring", total=total, disable=(not progress)) as pbar: + with pathos.pools.ProcessPool(parallel) as executor: + for _ in executor.imap(fn, sip(xferpaths, block_size)): + pbar.update(block_size) + else: + cfsrc = CloudFiles(srcpath, progress=progress, no_sign_request=no_sign_request) + if not cfsrc.exists(xferpaths): + print(f"cloudfiles: source path not found: {cfsrc.abspath(xferpaths).replace('file://','')}") + return + + cfdest = CloudFiles( + destpath, + progress=progress, + composite_upload_threshold=part_bytes, + ) + + if isdestdir: + new_path = os.path.basename(nsrc) + else: + new_path = os.path.basename(ndest) + + cfsrc.transfer_to(cfdest, paths=[{ + "path": xferpaths, + "dest_path": new_path, + }], reencode=compression) + +def _mv(src, dst, progress, block_size, part_bytes, no_sign_request, paths): + cfsrc = CloudFiles(src, progress=progress, composite_upload_threshold=part_bytes, no_sign_request=no_sign_request) + cfdest = CloudFiles(dst, progress=progress, composite_upload_threshold=part_bytes) + cfsrc.moves( + cfdest, paths=paths, block_size=block_size + ) + @main.group("xfer") def xfergroup(): """ From f6447a8645f61bffbb395867f292c26585d5f7ca Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 13:26:57 -0500 Subject: [PATCH 02/19] fix: wrong variables --- cloudfiles/cloudfiles.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index f960e6f..c5629e4 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1308,10 +1308,9 @@ def moves( total = totalfn(path_pairs, total) disable = not (self.progress if progress is None else progress) - pbar = tqdm(total=total, disable=disable, desc="Moving") - if self.protocol == "file" and edest.protocol == "file": - for src, dest in pbar: + if self.protocol == "file" and cf_dest.protocol == "file": + for src, dest in tqdm(path_pairs, total=total, disable=disable, desc="Moving"): src = self.join(self.cloudpath, src).replace("file://", "") dest = cf_dest.join(cf_dest.cloudpath, dest).replace("file://", "") mkdir(os.path.dirname(dest)) @@ -1323,6 +1322,8 @@ def moves( shutil.move(src, dest) return + pbar = tqdm(total=total, disable=disable, desc="Moving") + with pbar: for subpairs in sip(path_pairs, block_size): self.transfer_to(cf_dest, paths=( From 71a7da86c627cc9b11ebdf2982dcd0839ac775ab Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 13:39:26 -0500 Subject: [PATCH 03/19] refactor: move file logic into its own method --- cloudfiles/cloudfiles.py | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index c5629e4..b9fdca2 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1305,21 +1305,16 @@ def moves( cf_dest, progress=False, green=self.green, num_threads=self.num_threads, ) + total = totalfn(path_pairs, total) disable = not (self.progress if progress is None else progress) if self.protocol == "file" and cf_dest.protocol == "file": - for src, dest in tqdm(path_pairs, total=total, disable=disable, desc="Moving"): - src = self.join(self.cloudpath, src).replace("file://", "") - dest = cf_dest.join(cf_dest.cloudpath, dest).replace("file://", "") - mkdir(os.path.dirname(dest)) - src, encoding = FileInterface.get_encoded_file_path(src) - _, dest_ext = os.path.splitext(dest) - dest_ext_compress = FileInterface.get_extension(encoding) - if dest_ext_compress != dest_ext: - dest += dest_ext_compress - shutil.move(src, dest) + self.__moves_file_to_file( + cf_dest, path_pairs, total, + disable, block_size + ) return pbar = tqdm(total=total, disable=disable, desc="Moving") @@ -1336,6 +1331,30 @@ def moves( self.delete(( src for src, dest in subpairs ), progress=False) pbar.update(len(subpairs)) + def __moves_file_to_file( + self, + cf_dest:Any, + path_pairs:Sequence[Tuple[str,str]], + total:Optional[int], + disable:bool, + block_size:int, + ): + for src, dest in tqdm(path_pairs, total=total, disable=disable, desc="Moving"): + src = self.join(self.cloudpath, src).replace("file://", "") + dest = cf_dest.join(cf_dest.cloudpath, dest).replace("file://", "") + + if os.path.isdir(dest): + dest = cf_dest.join(dest, os.path.basename(src)) + else: + mkdir(os.path.dirname(dest)) + + src, encoding = FileInterface.get_encoded_file_path(src) + _, dest_ext = os.path.splitext(dest) + dest_ext_compress = FileInterface.get_extension(encoding) + if dest_ext_compress != dest_ext: + dest += dest_ext_compress + shutil.move(src, dest) + def join(self, *paths:str) -> str: """ Convenience method for joining path strings From 47823ba552a9c83b8877025e36891df117503489 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 14:55:07 -0500 Subject: [PATCH 04/19] test: better testing of move --- automated_test.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/automated_test.py b/automated_test.py index 29cfaed..7f84861 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1162,15 +1162,25 @@ def test_move(s3, protocol): ('hello', b'world'), ('lamp', b'emporium'), ]) + cf.move("hello", f"{url}/hola") + + assert all(cf.exists(["hola"]).values()) == True + assert all(cf.exists(["hello"]).values()) == False - print(cf.exists(["hola"])) + cf.puts([ + ('hello', b'world'), + ('lamp', b'emporium'), + ]) - cf.move("hello", f"{protocol}://move/hola") + cf.delete("hola") - # assert any(cf.exists(["hello", "lamp"]).values()) == False - print(os.listdir("/tmp/cloudfiles/move/")) - print(cf.exists(["hola"])) - assert all(cf.exists(["hola"]).values()) == True + cf.moves(f"{url}", [ + ("hello", f"hola"), + ("lamp", f"lampara"), + ]) + + assert all(cf.exists(["hola", "lampara"]).values()) == True + assert all(cf.exists(["hello", "lamp"]).values()) == False From c58535733d7b81db9810d9cfbd0220f00c7d3ebd Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 14:55:41 -0500 Subject: [PATCH 05/19] fix: handle moving to a directory --- cloudfiles/cloudfiles.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index b9fdca2..f1c1b8a 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1222,6 +1222,9 @@ def __transfer_file_to_remote( else: raise + if dest_path == '': + dest_path = src_path + to_upload.append({ "path": dest_path, "content": handle, From 04cb2741570540d4e762a6e12a377774bb798a91 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 15:34:56 -0500 Subject: [PATCH 06/19] fix: moving dirs didn't work bc of missing argument --- cloudfiles_cli/cloudfiles_cli.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index 2e721d3..83360f6 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -373,7 +373,7 @@ def _mv_single( endest = cloudfiles.paths.extract(ndest) if ensrc.protocol == "file" and endest.protocol == "file" and issrcdir: - shutil.move(nsrc.replace("file://", ""), ndest.replace("file://")) + shutil.move(nsrc.replace("file://", ""), ndest.replace("file://", "")) return recursive = issrcdir @@ -451,15 +451,7 @@ def _mv_single( composite_upload_threshold=part_bytes, ) - if isdestdir: - new_path = os.path.basename(nsrc) - else: - new_path = os.path.basename(ndest) - - cfsrc.transfer_to(cfdest, paths=[{ - "path": xferpaths, - "dest_path": new_path, - }], reencode=compression) + cfsrc.move(xferpaths, ndest) def _mv(src, dst, progress, block_size, part_bytes, no_sign_request, paths): cfsrc = CloudFiles(src, progress=progress, composite_upload_threshold=part_bytes, no_sign_request=no_sign_request) From fd41d498047f1cd1f32b0c2f74c47cf26547ad17 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 15:35:14 -0500 Subject: [PATCH 07/19] fix: rename transferring to move --- cloudfiles_cli/cloudfiles_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index 83360f6..c199de9 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -435,7 +435,7 @@ def _mv_single( fn = partial(_mv, srcpath, destpath, False, block_size, part_bytes, no_sign_request) - with tqdm(desc="Transferring", total=total, disable=(not progress)) as pbar: + with tqdm(desc="Moving", total=total, disable=(not progress)) as pbar: with pathos.pools.ProcessPool(parallel) as executor: for _ in executor.imap(fn, sip(xferpaths, block_size)): pbar.update(block_size) From 0377928eefde338e7d24950a03eb83af72752b89 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 17:49:54 -0500 Subject: [PATCH 08/19] redesign: change path_pairs to paths --- cloudfiles/cloudfiles.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index f1c1b8a..03e00e5 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1293,7 +1293,7 @@ def move(self, src:str, dest:str): def moves( self, cf_dest:Any, - path_pairs:Sequence[Tuple[str, str]], + paths:Union[Sequence[str], Sequence[Tuple[str, str]]], block_size:int = 64, total:Optional[int] = None, progress:Optional[bool] = None, @@ -1309,13 +1309,13 @@ def moves( green=self.green, num_threads=self.num_threads, ) - total = totalfn(path_pairs, total) + total = totalfn(paths, total) disable = not (self.progress if progress is None else progress) if self.protocol == "file" and cf_dest.protocol == "file": self.__moves_file_to_file( - cf_dest, path_pairs, total, + cf_dest, paths, total, disable, block_size ) return @@ -1323,7 +1323,12 @@ def moves( pbar = tqdm(total=total, disable=disable, desc="Moving") with pbar: - for subpairs in sip(path_pairs, block_size): + for subpairs in sip(paths, block_size): + subpairs = ( + ((pair, pair) if isinstance(pair, str) else pair) + for pair in subpairs + ) + self.transfer_to(cf_dest, paths=( { "path": src, @@ -1337,12 +1342,18 @@ def moves( def __moves_file_to_file( self, cf_dest:Any, - path_pairs:Sequence[Tuple[str,str]], + paths:Union[Sequence[str], Sequence[Tuple[str,str]]], total:Optional[int], disable:bool, block_size:int, ): - for src, dest in tqdm(path_pairs, total=total, disable=disable, desc="Moving"): + for pair in tqdm(paths, total=total, disable=disable, desc="Moving"): + if isinstance(pair, str): + src = pair + dest = pair + else: + (src, dest) = pair + src = self.join(self.cloudpath, src).replace("file://", "") dest = cf_dest.join(cf_dest.cloudpath, dest).replace("file://", "") From 9405c3410608a603efb8d07d3ce9c2b1b34c943d Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 17:52:01 -0500 Subject: [PATCH 09/19] fix: pbar updates correctly --- cloudfiles/cloudfiles.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index 03e00e5..2d1978c 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1324,6 +1324,7 @@ def moves( with pbar: for subpairs in sip(paths, block_size): + subtotal = len(subpairs) subpairs = ( ((pair, pair) if isinstance(pair, str) else pair) for pair in subpairs @@ -1337,7 +1338,7 @@ def moves( for src, dest in subpairs ), progress=False) self.delete(( src for src, dest in subpairs ), progress=False) - pbar.update(len(subpairs)) + pbar.update(subtotal) def __moves_file_to_file( self, From 9d6be555c725441bfaba13d802d6d805309b1b93 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 18:00:32 -0500 Subject: [PATCH 10/19] fix: generator was consuming itself too quickly --- cloudfiles/cloudfiles.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index 2d1978c..af5cdd5 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -1324,11 +1324,10 @@ def moves( with pbar: for subpairs in sip(paths, block_size): - subtotal = len(subpairs) - subpairs = ( + subpairs = [ ((pair, pair) if isinstance(pair, str) else pair) for pair in subpairs - ) + ] self.transfer_to(cf_dest, paths=( { @@ -1338,7 +1337,7 @@ def moves( for src, dest in subpairs ), progress=False) self.delete(( src for src, dest in subpairs ), progress=False) - pbar.update(subtotal) + pbar.update(len(subpairs)) def __moves_file_to_file( self, From 0846121681cef7b0864cc35125f082d9acdec63a Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Tue, 12 Nov 2024 18:56:33 -0500 Subject: [PATCH 11/19] test: test the CLI --- automated_test.py | 35 +++++++++++++++++++++++++++++--- cloudfiles_cli/cloudfiles_cli.py | 16 ++++++++------- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/automated_test.py b/automated_test.py index 7f84861..a026a10 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1182,15 +1182,44 @@ def test_move(s3, protocol): assert all(cf.exists(["hola", "lampara"]).values()) == True assert all(cf.exists(["hello", "lamp"]).values()) == False +@pytest.mark.parametrize("protocol", ["file", "s3"]) +def test_cli_move_python(s3, protocol): + from cloudfiles_cli.cloudfiles_cli import _mv_single + from cloudfiles import CloudFiles, exceptions + test_dir = compute_url(protocol, "cli_mv_python") + test_dir2 = compute_url(protocol, "cli_mv_python2") + cf = CloudFiles(test_dir) + N = 100 + def mkfiles(): + cf.delete(cf.list()) + for i in range(N): + cf[str(i)] = b"hello world" + def run_mv(src, dest): + _mv_single( + src, dest, + progress=False, block_size=5, + part_bytes=int(100e6), no_sign_request=True, + parallel=1 + ) + mkfiles() + run_mv(test_dir, test_dir2) + assert sorted(list(cf)) == [] + cf2 = CloudFiles(test_dir2) + print(sorted(list(cf2))) + assert sorted(list(cf2)) == sorted([ f'{i}' for i in range(N) ]) + mkfiles() + run_mv(f"{test_dir}/*", f"{test_dir}/move/") + assert sorted(list(cf.list(prefix="move"))) == sorted([ f'move/{i}' for i in range(N) ]) - - - + mkfiles() + run_mv(f"{test_dir}/1", f"{test_dir}/move/1") + assert cf.exists("move/1") == True + assert cf.exists("1") == False diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index c199de9..386fe82 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -349,17 +349,22 @@ def mv( print("cloudfiles: destination must be a directory for multiple source files.") return + ctx.ensure_object(dict) + parallel = int(ctx.obj.get("parallel", 1)) + for src in source: _mv_single( - ctx, src, destination, + src, destination, progress, block_size, - part_bytes, no_sign_request + part_bytes, no_sign_request, + parallel ) def _mv_single( - ctx, source, destination, + source, destination, progress, block_size, - part_bytes, no_sign_request + part_bytes, no_sign_request, + parallel ): use_stdin = (source == '-') @@ -391,9 +396,6 @@ def _mv_single( nsrc = nsrc[:-1] ndest = cloudpathjoin(ndest, os.path.basename(nsrc)) - ctx.ensure_object(dict) - parallel = int(ctx.obj.get("parallel", 1)) - # The else clause here is to handle single file transfers srcpath = nsrc if issrcdir else os.path.dirname(nsrc) many, flat, prefix = get_mfp(nsrc, recursive) From 6c95d3b7ca5de1ab8508ede9c4abff4a1dd0a402 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 16:15:52 -0500 Subject: [PATCH 12/19] feat: adds "touch" function --- cloudfiles/cloudfiles.py | 27 ++++++++++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index af5cdd5..747a78f 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -29,7 +29,7 @@ from .exceptions import UnsupportedProtocolError, MD5IntegrityError, CRC32CIntegrityError from .lib import ( mkdir, totalfn, toiter, scatter, jsonify, nvl, - duplicates, first, sip, + duplicates, first, sip, touch, md5, crc32c, decode_crc32c_b64 ) from .paths import ALIASES @@ -919,6 +919,28 @@ def thunk_delete(path): ) return len(results) + def touch( + self, paths:GetPathType, + progress:Optional[bool] = None, total:Optional[int] = None + ): + """Create a zero byte file if it doesn't exist.""" + paths = toiter(paths) + progress = nvl(progress, self.progress) + total = totalfn(paths, total) + + if self.protocol == "file": + for path in tqdm(paths, disable=(not progress), total=total): + touch(path) + return + + result = self.exists(paths, total=total, progress=progress) + + self.puts([ + (fname, b'') + for fname, exists in results.items() + if not exists + ], progress=progress) + def list( self, prefix:str = "", flat:bool = False ) -> Generator[str,None,None]: @@ -1547,6 +1569,9 @@ def transfer_from( reencode=reencode, ) + def touch(self): + return self.cf.touch(self.filename) + def move(self, dest): """Move (rename) this file to dest.""" return self.cf.move(self.filename, dest) From 8a95e338479a2fddb4a529aefd1ee0d51f5f553c Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 17:04:17 -0500 Subject: [PATCH 13/19] test: check if touch works --- automated_test.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/automated_test.py b/automated_test.py index a026a10..bdf188c 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1223,3 +1223,14 @@ def run_mv(src, dest): assert cf.exists("move/1") == True assert cf.exists("1") == False +@pytest.mark.parametrize("protocol", ["file", "mem", "s3"]) +def test_touch(s3, protocol): + from cloudfiles import CloudFiles + + url = compute_url(protocol, "touch") + + cf = CloudFiles(url) + + cf.touch([ str(i) for i in range(20) ]) + + assert sorted(list(cf)) == sorted([ str(i) for i in range(20) ]) From e6c8f0a2635c4dd5887ab296db066405df948b09 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 17:04:30 -0500 Subject: [PATCH 14/19] fix: various problems with touch --- cloudfiles/cloudfiles.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index 747a78f..772c29a 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -929,11 +929,12 @@ def touch( total = totalfn(paths, total) if self.protocol == "file": + basepath = self.cloudpath.replace("file://", "") for path in tqdm(paths, disable=(not progress), total=total): - touch(path) + touch(self.join(basepath, path)) return - result = self.exists(paths, total=total, progress=progress) + results = self.exists(paths, total=total, progress=progress) self.puts([ (fname, b'') From 9aebe9a9b87ea13a09fee98e10818f8a2613aaa4 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 19:07:15 -0500 Subject: [PATCH 15/19] fix: several errors, add copy_file to MemoryInterface, refactor --- automated_test.py | 4 ++ cloudfiles/cloudfiles.py | 64 ++++++++++++++++++++------------ cloudfiles/interfaces.py | 8 ++++ cloudfiles/paths.py | 32 +++++++++++++++- cloudfiles_cli/cloudfiles_cli.py | 23 +++++++++++- 5 files changed, 104 insertions(+), 27 deletions(-) diff --git a/automated_test.py b/automated_test.py index bdf188c..b1eed5b 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1234,3 +1234,7 @@ def test_touch(s3, protocol): cf.touch([ str(i) for i in range(20) ]) assert sorted(list(cf)) == sorted([ str(i) for i in range(20) ]) + + cf.touch([ str(i) for i in range(20) ]) + + assert sorted(list(cf)) == sorted([ str(i) for i in range(20) ]) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index 772c29a..f68fbc5 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -182,7 +182,7 @@ def path_to_byte_range_tags(path): if isinstance(path, str): return (path, None, None, None) return (path['path'], path.get('start', None), path.get('end', None), path.get('tags', None)) - + def dl( cloudpaths:GetPathType, raw:bool=False, **kwargs ) -> Union[bytes,List[dict]]: @@ -193,23 +193,8 @@ def dl( dict. """ cloudpaths, is_multiple = toiter(cloudpaths, is_iter=True) - clustered = defaultdict(list) - total = 0 - for path in cloudpaths: - pth = path - byte_range = None - if isinstance(path, dict): - pth = path["path"] - byte_range = path["byte_range"] - - epath = paths.extract(pth) - bucketpath = paths.asbucketpath(epath) - clustered[bucketpath].append({ - "path": epath.path, - "start": (byte_range[0] if byte_range else None), # type: ignore - "end": (byte_range[1] if byte_range else None), # type: ignore - }) - total += 1 + clustered = find_common_buckets(cloudpaths) + total = sum([ len(bucket) for bucket in clustered.values() ]) progress = kwargs.get("progress", False) and total > 1 pbar = tqdm(total=total, desc="Downloading", disable=(not progress)) @@ -920,10 +905,15 @@ def thunk_delete(path): return len(results) def touch( - self, paths:GetPathType, - progress:Optional[bool] = None, total:Optional[int] = None + self, + paths:GetPathType, + progress:Optional[bool] = None, + total:Optional[int] = None, + nocopy:bool = False, ): - """Create a zero byte file if it doesn't exist.""" + """ + Create a zero byte file if it doesn't exist. + """ paths = toiter(paths) progress = nvl(progress, self.progress) total = totalfn(paths, total) @@ -931,16 +921,42 @@ def touch( if self.protocol == "file": basepath = self.cloudpath.replace("file://", "") for path in tqdm(paths, disable=(not progress), total=total): - touch(self.join(basepath, path)) + pth = path + if isinstance(path, dict): + pth = path["path"] + touch(self.join(basepath, pth)) return results = self.exists(paths, total=total, progress=progress) - self.puts([ + dne = [ (fname, b'') for fname, exists in results.items() if not exists - ], progress=progress) + ] + + self.puts(dne, progress=progress) + + # def thunk_copy(path): + # with self._get_connection() as conn: + # conn.copy_file(path, self._path.bucket, self.join(self._path.path, path)) + # return 1 + + # if not nocopy: + # already_exists = ( + # fname + # for fname, exists in results.items() + # if exists + # ) + + # results = schedule_jobs( + # fns=( partial(thunk_copy, path) for path in already_exists ), + # progress=progress, + # total=(total - len(dne)), + # concurrency=self.num_threads, + # green=self.green, + # count_return=True, + # ) def list( self, prefix:str = "", flat:bool = False diff --git a/cloudfiles/interfaces.py b/cloudfiles/interfaces.py index beb008d..639e415 100644 --- a/cloudfiles/interfaces.py +++ b/cloudfiles/interfaces.py @@ -474,6 +474,14 @@ def size(self, file_path): return None + def copy_file(self, src_path, dest_bucket, dest_key): + key = self.get_path_to_file(src_path) + with MEM_BUCKET_POOL_LOCK: + pool = MEM_POOL[MemoryPoolParams(dest_bucket)] + dest_bucket = pool.get_connection(None, None) + dest_bucket[dest_key] = self._data[key] + return True + def exists(self, file_path): path = self.get_path_to_file(file_path) return path in self._data or any(( (path + ext in self._data) for ext in COMPRESSION_EXTENSIONS )) diff --git a/cloudfiles/paths.py b/cloudfiles/paths.py index cc7fc26..80c7aa2 100644 --- a/cloudfiles/paths.py +++ b/cloudfiles/paths.py @@ -1,5 +1,5 @@ from functools import lru_cache -from collections import namedtuple +from collections import namedtuple, defaultdict import orjson import os.path import posixpath @@ -8,9 +8,10 @@ import urllib.parse from typing import Tuple, Optional +from .typing import GetPathType from .exceptions import UnsupportedProtocolError -from .lib import yellow, toabs, jsonify, mkdir +from .lib import yellow, toabs, jsonify, mkdir, toiter from .secrets import CLOUD_FILES_DIR ExtractedPath = namedtuple('ExtractedPath', @@ -390,3 +391,30 @@ def to_https_protocol(cloudpath): cloudpath = cloudpath.replace(f"{alias}://", host, 1) return cloudpath.replace("s3://", "", 1) + +def find_common_buckets(cloudpaths:GetPathType): + cloudpaths, is_multiple = toiter(cloudpaths, is_iter=True) + clustered = defaultdict(list) + + for path in cloudpaths: + pth = path + byte_range = None + if isinstance(path, dict): + pth = path["path"] + byte_range = path["byte_range"] + + epath = extract(pth) + if epath.protocol == "file": + path = os.sep.join(asfilepath(epath).split(os.sep)[2:]) + bucketpath = "file://" + os.sep.join(asfilepath(epath).split(os.sep)[:2]) + else: + path = epath.path + bucketpath = asbucketpath(epath) + + clustered[bucketpath].append({ + "path": path, + "start": (byte_range[0] if byte_range else None), # type: ignore + "end": (byte_range[1] if byte_range else None), # type: ignore + }) + + return clustered diff --git a/cloudfiles_cli/cloudfiles_cli.py b/cloudfiles_cli/cloudfiles_cli.py index 386fe82..cd28b67 100644 --- a/cloudfiles_cli/cloudfiles_cli.py +++ b/cloudfiles_cli/cloudfiles_cli.py @@ -27,7 +27,7 @@ from cloudfiles import CloudFiles from cloudfiles.resumable_tools import ResumableTransfer from cloudfiles.compression import transcode -from cloudfiles.paths import extract, get_protocol +from cloudfiles.paths import extract, get_protocol, find_common_buckets from cloudfiles.lib import ( mkdir, toabs, sip, toiter, first, red, green, @@ -462,6 +462,27 @@ def _mv(src, dst, progress, block_size, part_bytes, no_sign_request, paths): cfdest, paths=paths, block_size=block_size ) +@main.command() +@click.argument("sources", nargs=-1) +@click.option('--progress', is_flag=True, default=False, help="Show transfer progress.", show_default=True) +@click.option('--no-sign-request', is_flag=True, default=False, help="Use s3 in anonymous mode (don't sign requests) for the source.", show_default=True) +@click.pass_context +def touch( + ctx, sources, + progress, no_sign_request, +): + sources = list(map(normalize_path, sources)) + sources = [ src.replace("precomputed://", "") for src in sources ] + pbar = tqdm(total=len(sources), desc="Touch", disable=(not progress)) + + clustered = find_common_buckets(sources) + + with pbar: + for bucket, items in clustered.items(): + cf = CloudFiles(bucket, no_sign_request=no_sign_request, progress=False) + cf.touch(items) + pbar.update(len(items)) + @main.group("xfer") def xfergroup(): """ From fa2ec12921eaaf43a1d34bbc23a2b8e302574b11 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 19:34:36 -0500 Subject: [PATCH 16/19] fix: missing import --- cloudfiles/cloudfiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudfiles/cloudfiles.py b/cloudfiles/cloudfiles.py index f68fbc5..dd512e2 100644 --- a/cloudfiles/cloudfiles.py +++ b/cloudfiles/cloudfiles.py @@ -32,7 +32,7 @@ duplicates, first, sip, touch, md5, crc32c, decode_crc32c_b64 ) -from .paths import ALIASES +from .paths import ALIASES, find_common_buckets from .secrets import CLOUD_FILES_DIR, CLOUD_FILES_LOCK_DIR from .threaded_queue import ThreadedQueue, DEFAULT_THREADS from .typing import ( From aad85177fa39415d8dda2974f4c11e2844f81cc3 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 19:44:57 -0500 Subject: [PATCH 17/19] fixtest: clean up from move --- automated_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/automated_test.py b/automated_test.py index b1eed5b..003640c 100644 --- a/automated_test.py +++ b/automated_test.py @@ -1182,6 +1182,8 @@ def test_move(s3, protocol): assert all(cf.exists(["hola", "lampara"]).values()) == True assert all(cf.exists(["hello", "lamp"]).values()) == False + cf.delete([ "hola", "hello", "lamp", "lampara" ]) + @pytest.mark.parametrize("protocol", ["file", "s3"]) def test_cli_move_python(s3, protocol): from cloudfiles_cli.cloudfiles_cli import _mv_single From 83d49a1e50829aa329171750aab0a887bdb77628 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 19:48:21 -0500 Subject: [PATCH 18/19] docs: show how to use move and touch --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index ce0203d..857ba36 100644 --- a/README.md +++ b/README.md @@ -38,6 +38,12 @@ cf.delete(paths, parallel=2) # threaded + two processes boolean = cf.exists('filename') results = cf.exists([ 'filename_1', ... ]) # threaded +cf.move("a", "gs://bucket/b") +cf.moves("gs://bucket/", [ ("a", "b") ]) + +cf.touch("example") +cf.touch([ "example", "example2" ]) + # for single files cf = CloudFile("gs://bucket/file1") info = cf.head() From 3822468c0ae4a4936706628d5b4522168fa69236 Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Fri, 15 Nov 2024 19:54:41 -0500 Subject: [PATCH 19/19] docs: show how to use CLI mv, touch --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 857ba36..702de33 100644 --- a/README.md +++ b/README.md @@ -420,6 +420,10 @@ cloudfiles -p 2 cp --progress -r s3://bkt/ gs://bkt2/ cloudfiles cp -c br s3://bkt/file.txt gs://bkt2/ # decompress cloudfiles cp -c none s3://bkt/file.txt gs://bkt2/ +# move or rename files +cloudfiles mv s3://bkt/file.txt gs://bkt2/ +# create an empty file if not existing +cloudfiles touch s3://bkt/empty.txt # pass from stdin (use "-" for source argument) find some_dir | cloudfiles cp - s3://bkt/ # resumable transfers