|
7 | 7 | from typing import Generator |
8 | 8 |
|
9 | 9 | def _collectFields(iterator: Generator[pd.DataFrame, None, None], entryLimit: int, seed: int) -> dict[str, pd.Series]: |
10 | | - df = next(iterator).fillna("").drop_duplicates() |
| 10 | + |
| 11 | + def columnCleanup(series: pd.Series) -> pd.Series: |
| 12 | + shortSeries = series.dropna() |
| 13 | + if len(shortSeries) > entryLimit: |
| 14 | + return shortSeries.sample(n=entryLimit, random_state=seed) |
| 15 | + |
| 16 | + return shortSeries.add(["" * (entryLimit - len(shortSeries))]) |
| 17 | + |
| 18 | + df = next(iterator) |
11 | 19 | for idx, chunk in enumerate(iterator, start=1): |
12 | 20 | print(f"Scanning chunk: {idx}", end='\r') |
13 | | - chunk = chunk.fillna("").drop_duplicates() |
14 | 21 | df = pd.concat([df, chunk], ignore_index=True) |
15 | | - df = df.drop_duplicates().sample(n=entryLimit, replace=True, random_state=seed) |
| 22 | + df = df.drop_duplicates() |
| 23 | + df = df.apply(columnCleanup, axis=0) |
16 | 24 |
|
17 | 25 | return df |
18 | 26 |
|
19 | 27 | def _collectRecords(iterator: Generator[pd.DataFrame, None, None], entryLimit: int, seed: int) -> dict[str, pd.Series]: |
| 28 | + nanColumn = "NaN" |
20 | 29 | df = next(iterator) |
21 | | - df = df.sample(n=min(len(df), entryLimit), random_state=seed) |
22 | 30 | for idx, chunk in enumerate(iterator, start=1): |
23 | 31 | print(f"Scanning chunk: {idx}", end='\r') |
24 | | - chunk = chunk.drop_duplicates().sample(n=min(len(chunk), entryLimit), random_state=seed) |
25 | | - df = pd.concat([df, chunk]) |
26 | | - emptyDF = df.isna().sum(axis=1) |
27 | | - indexes = [idx for idx, _ in sorted(emptyDF.items(), key=lambda x: x[1])] |
28 | | - df = df.loc[indexes[:entryLimit]] |
| 32 | + df = pd.concat([df, chunk], ignore_index=True) |
| 33 | + df = df.drop_duplicates() |
| 34 | + |
| 35 | + if len(df) > entryLimit: |
| 36 | + df = df.sample(n=entryLimit, random_state=seed) |
| 37 | + |
| 38 | + df.reset_index() |
| 39 | + df[nanColumn] = df.isna().sum(axis=1).sort_values(ascending=True) |
| 40 | + df = df.sort_values(nanColumn, axis=0, ignore_index=True) |
| 41 | + df = df.drop([nanColumn], axis=1) |
| 42 | + df = df.head(entryLimit) |
| 43 | + df.reset_index() |
29 | 44 |
|
30 | 45 | return df |
31 | 46 |
|
@@ -57,7 +72,7 @@ def _collectRecords(iterator: Generator[pd.DataFrame, None, None], entryLimit: i |
57 | 72 | random.seed(seed) |
58 | 73 | outputPath = outputDir / f"{'fields' if kwargs.ignoreRecord else 'records'}_{kwargs.chunksize}_{seed}.tsv" |
59 | 74 |
|
60 | | - dfIterator = stageFile.loadDataFrameIterator(kwargs.chunksize, kwargs.firstrow, kwargs.rows) |
| 75 | + dfIterator = stageFile.readIterator(kwargs.chunksize, on_bad_lines="skip") |
61 | 76 | df = _collectFields(dfIterator, kwargs.entries, seed) if kwargs.ignoreRecord else _collectRecords(dfIterator, kwargs.entries, seed) |
62 | 77 |
|
63 | 78 | df = dff.removeSpaces(df) |
|
0 commit comments