Skip to content

Commit 3cc344c

Browse files
Updated BigFiles combination functions to support deleting of old files once combined
Updated BigFiles combination functions to support kwargs for tweaking output file
1 parent 8bb95a8 commit 3cc344c

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

src/lib/bigFiles.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,16 +92,21 @@ def combine(self, removeParts: bool = False, **kwargs) -> None:
9292
def combinedIterator(dataFiles: list[DataFile], chunkSize: int, **kwargs: dict) -> Iterator[pd.DataFrame]:
9393
return (chunk for file in dataFiles for chunk in file.readIterator(chunkSize, **kwargs))
9494

95-
def combineDirectoryFiles(outputFilePath: Path, inputFolderPath: Path, matchPattern: str = "*.*") -> None:
95+
def combineDirectoryFiles(outputFilePath: Path, inputFolderPath: Path, matchPattern: str = "*.*", deleteOld: bool = False, **kwargs: dict) -> None:
9696
inputDataFiles = [dataFile for dataFile in [DataFile(path) for path in inputFolderPath.glob(matchPattern)] if dataFile.format != DataFormat.UNKNOWN and dataFile.format != DataFormat.STACKED]
9797
logging.info(f"Found {len(inputDataFiles)} files to combine")
9898
columns = {column: None for dataFile in inputDataFiles for column in dataFile.getColumns()}
99-
combineDataFiles(outputFilePath, inputDataFiles, columns)
99+
combineDataFiles(outputFilePath, inputDataFiles, columns, deleteOld, **kwargs)
100100

101-
def combineDataFiles(outputFilePath: Path, dataFiles: list[DataFile], columns: list[str]) -> None:
101+
def combineDataFiles(outputFilePath: Path, dataFiles: list[DataFile], columns: list[str], deleteOld: bool = False, **kwargs: dict) -> None:
102102
outputDataFile = DataFile(outputFilePath)
103103
logging.info(f"Combining into one file at {outputFilePath}")
104-
outputDataFile.writeIterator(combinedIterator(dataFiles, 1024), list(columns), index=False)
104+
outputDataFile.writeIterator(combinedIterator(dataFiles, 1024), list(columns), index=False, **kwargs)
105+
106+
if deleteOld:
107+
logging.info(f"Cleaning up old sections of combined file")
108+
for dataFile in dataFiles:
109+
dataFile.delete()
105110

106111
class StackedDFWriter:
107112
def __init__(self, outputFilePath: Path, subsections: list[str], chunkFormat: DataFormat = DataFormat.PARQUET):

0 commit comments

Comments
 (0)