Skip to content

Commit 33119b7

Browse files
committed
Improve handling of large commits
1 parent fed5217 commit 33119b7

File tree

2 files changed

+64
-34
lines changed

2 files changed

+64
-34
lines changed

Diff for: holonote/annotate/connector.py

+41-25
Original file line numberDiff line numberDiff line change
@@ -374,31 +374,33 @@ def delete_table(self):
374374
self.cursor.execute(f"DROP TABLE IF EXISTS {self.table_name}")
375375
self.con.commit()
376376

377-
def add_rows(self, field_list): # Used execute_many
378-
for field in field_list:
379-
self.add_row(**field)
377+
def add_rows(self, fields_df):
378+
fields_df.to_sql(self.table_name, self.con, if_exists="append")
380379

381380
def add_row(self, **fields):
382-
# Note, missing fields will be set as NULL
383-
columns = self.columns
384-
field_values = [fields.get(col, None) for col in self.columns]
385-
field_values = [
386-
pd.to_datetime(el) if isinstance(el, np.datetime64) else el for el in field_values
387-
]
388-
field_values = [
389-
el.to_pydatetime() if isinstance(el, pd.Timestamp) else el for el in field_values
390-
]
391-
392-
if self.primary_key.policy != "insert":
393-
field_values = field_values[1:]
394-
columns = columns[1:]
395-
396-
placeholders = ", ".join(["?"] * len(field_values))
397-
self.cursor.execute(
398-
f"INSERT INTO {self.table_name} {columns!s} VALUES({placeholders});", field_values
399-
)
400-
self.primary_key.validate(self.cursor.lastrowid, fields[self.primary_key.field_name])
401-
self.con.commit()
381+
df = pd.DataFrame([fields]).set_index(self.primary_key.field_name)
382+
self.add_rows(df)
383+
384+
# # Note, missing fields will be set as NULL
385+
# columns = self.columns
386+
# field_values = [fields.get(col, None) for col in self.columns]
387+
# field_values = [
388+
# pd.to_datetime(el) if isinstance(el, np.datetime64) else el for el in field_values
389+
# ]
390+
# field_values = [
391+
# el.to_pydatetime() if isinstance(el, pd.Timestamp) else el for el in field_values
392+
# ]
393+
394+
# if self.primary_key.policy != "insert":
395+
# field_values = field_values[1:]
396+
# columns = columns[1:]
397+
398+
# placeholders = ", ".join(["?"] * len(field_values))
399+
# self.cursor.execute(
400+
# f"INSERT INTO {self.table_name} {columns!s} VALUES({placeholders});", field_values
401+
# )
402+
# self.primary_key.validate(self.cursor.lastrowid, fields[self.primary_key.field_name])
403+
# self.con.commit()
402404

403405
def delete_all_rows(self):
404406
"Obviously a destructive operation!"
@@ -412,10 +414,24 @@ def delete_row(self, id_val):
412414
)
413415
self.con.commit()
414416

417+
def delete_rows(self, id_vals):
418+
query = f"DELETE FROM {self.table_name} WHERE {self.primary_key.field_name} = ?"
419+
self.cursor.executemany(query, [tuple(map(self.primary_key.cast, id_vals.index))])
420+
self.con.commit()
421+
415422
def update_row(self, **updates): # updates as a dictionary OR remove posarg?
416423
assert self.primary_key.field_name in updates
417424
id_val = updates.pop(self.primary_key.field_name)
418-
set_updates = ", ".join('"' + k + '"' + " = ?" for k in updates)
425+
set_updates = ", ".join([f'"{k}" = ?' for k in updates])
419426
query = f'UPDATE {self.table_name} SET {set_updates} WHERE "{self.primary_key.field_name}" = ?;'
420427
self.cursor.execute(query, [*updates.values(), id_val])
421-
self.con.commit()
428+
429+
def update_rows(self, updates_df):
430+
def replace_into(table_name, conn, keys, data_iter):
431+
# First key is the primary key
432+
placeholders = ", ".join([f'"{k}" = ?' for k in keys[1:]])
433+
query = f'UPDATE {table_name.name} SET {placeholders} WHERE "{keys[0]}" = ?;'
434+
data = [(*row[1:], row[0]) for row in data_iter]
435+
conn.executemany(query, data)
436+
437+
updates_df.to_sql(self.table_name, self.con, if_exists="append", method=replace_into)

Diff for: holonote/annotate/table.py

+23-9
Original file line numberDiff line numberDiff line change
@@ -141,28 +141,42 @@ def _create_commits(self):
141141
kwargs = self._expand_commit_by_id(edit["id"])
142142

143143
elif operation == "delete":
144-
kwargs = {"id_val": edit["id"]}
144+
kwargs = {self.index_name: edit["id"]}
145145
elif operation == "update":
146146
if edit["id"] not in self._field_df.index:
147147
continue
148-
kwargs = self._expand_commit_by_id(
149-
edit["id"], fields=edit["fields"], region_fields=edit["region_fields"]
150-
)
148+
kwargs = self._expand_commit_by_id(edit["id"])
149+
# , fields=edit["fields"], region_fields=edit["region_fields"]
151150
elif operation == "save":
152151
kwargs = self._expand_save_commits(edit["ids"])
153152
commits.append({"operation": operation, "kwargs": kwargs})
154153

155154
return commits
156155

157-
def commits(self, connector):
158-
commits = self._create_commits()
156+
def commits(self, connector) -> list[dict[str, Any]]:
157+
raw_commits = self._create_commits()
158+
159+
commits = self._groupby_operation(raw_commits) if raw_commits else []
160+
159161
for commit in commits:
160162
operation = commit["operation"]
161-
kwargs = connector.transforms[operation](commit["kwargs"])
162-
getattr(connector, connector.operation_mapping[operation])(**kwargs)
163+
fn = connector.operation_mapping[operation]
164+
kwargs = commit["kwargs"].apply(connector.transforms[operation])
165+
getattr(connector, fn + "s")(kwargs)
163166

164167
self.clear_edits()
165-
return commits
168+
return raw_commits
169+
170+
def _groupby_operation(self, commits) -> list[dict[str, Any]]:
171+
df = pd.DataFrame(commits)
172+
change_indices = (df["operation"] != df["operation"].shift(1)).cumsum()
173+
return [
174+
{
175+
"operation": dfg["operation"].iloc[0],
176+
"kwargs": pd.json_normalize(dfg["kwargs"]).set_index(self.index_name),
177+
}
178+
for _, dfg in df.groupby(change_indices)
179+
]
166180

167181
def clear_edits(self, edit_type=None):
168182
"Clear edit state and index mapping"

0 commit comments

Comments
 (0)