Skip to content

Commit f9b2c30

Browse files
committed
Add copy_paths option to duplicate output
1 parent 7d7a50b commit f9b2c30

File tree

1 file changed

+42
-9
lines changed

1 file changed

+42
-9
lines changed

df_io/__init__.py

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import pandas as pd
88
import s3fs
99
import zstandard
10+
from concurrent.futures import ThreadPoolExecutor, as_completed
1011

1112

1213
def _writer_wrapper(writer, f, writer_args, writer_options):
@@ -20,6 +21,32 @@ def _writer_wrapper(writer, f, writer_args, writer_options):
2021
return f
2122

2223

24+
class FileWriter:
25+
def __init__(self, files, max_workers=None):
26+
if max_workers is None:
27+
max_workers = len(files)
28+
self._files = files
29+
self._executor = ThreadPoolExecutor(max_workers)
30+
31+
def write(self, data):
32+
futures = {self._executor.submit(f.write, data): f for f in self._files}
33+
for future in as_completed(futures):
34+
res = future.result()
35+
return res
36+
37+
def close(self):
38+
map(lambda x: x.close(), self._files)
39+
40+
def flush(self, *args, **kwargs):
41+
map(lambda x: x.flush(), self._files)
42+
43+
def __enter__(self):
44+
return self
45+
46+
def __exit__(self, exc_type, exc_value, exc_traceback):
47+
self.close()
48+
49+
2350
def read_df(path, fmt="csv", reader_args=[], reader_options={}):
2451
pd_reader = getattr(pd, 'read_{}'.format(fmt))
2552
if path.endswith(".zstd"):
@@ -35,8 +62,8 @@ def read_df(path, fmt="csv", reader_args=[], reader_options={}):
3562
return pd_reader(path, *reader_args, **reader_options)
3663

3764

38-
def write_df(df, s3_path, fmt="csv", gzip_level=9, chunksize=None,
39-
writer_args=[], writer_options={},
65+
def write_df(df, s3_path, copy_paths=[], fmt="csv", gzip_level=9,
66+
chunksize=None, writer_args=[], writer_options={},
4067
zstd_options={"level": 5, "threads": -1}):
4168
"""
4269
Pandas DataFrame write helper
@@ -46,6 +73,10 @@ def write_df(df, s3_path, fmt="csv", gzip_level=9, chunksize=None,
4673
writer_args and writer_options.
4774
If the s3_path parameter starts with s3://, it will try to do an S3 write,
4875
otherwise opens a local file with that path.
76+
77+
Additional output files can be specified in `copy_paths` parameter, as
78+
a list of either local, or `s3://...` paths. The same output will be written
79+
there as to `s3_path` in parallel to reduce overhead.
4980
"""
5081

5182
def flush_and_close(f):
@@ -66,13 +97,15 @@ def flush_and_close(f):
6697
writer_options = writer_defaults[fmt]
6798

6899
filename = os.path.basename(s3_path)
100+
_files = []
69101
# support S3 and local writes as well
70-
if s3_path.startswith('s3://'):
71-
s3 = s3fs.S3FileSystem(anon=False)
72-
_w = s3.open(s3_path, 'wb')
73-
else:
74-
_w = open(s3_path, 'wb')
75-
with _w as f:
102+
for _path in [s3_path] + copy_paths:
103+
if _path.startswith('s3://'):
104+
s3 = s3fs.S3FileSystem(anon=False)
105+
_files.append(s3.open(_path, 'wb'))
106+
else:
107+
_files.append(open(_path, 'wb'))
108+
with FileWriter(_files) as f:
76109
if filename.endswith('.gz'):
77110
f = gzip.GzipFile(filename, mode='wb', compresslevel=gzip_level,
78111
fileobj=f)
@@ -116,4 +149,4 @@ def flush_and_close(f):
116149
flush_and_close(f)
117150

118151

119-
__version__ = '0.0.6'
152+
__version__ = '0.0.7'

0 commit comments

Comments
 (0)