Skip to content

Commit

Permalink
Automatically choose chunk size
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidStirling committed Jul 31, 2024
1 parent ca471b8 commit 7164289
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 45 deletions.
5 changes: 3 additions & 2 deletions omero2pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def read_table(file_id=None, annotation_id=None, column_names=(), rows=None,


def upload_table(source, table_name, parent_id, parent_type='Image',
extra_links=(), chunk_size=1000, omero_connector=None,
extra_links=(), chunk_size=None, omero_connector=None,
server=None, port=4064, username=None, password=None):
"""
Upload a pandas dataframe to a new OMERO table.
Expand All @@ -197,7 +197,8 @@ def upload_table(source, table_name, parent_id, parent_type='Image',
One of: Image, Dataset, Plate, Well
:param extra_links: List of (Type, ID) tuples specifying extra objects to
link the table to.
:param chunk_size: Rows to transmit to the server in a single operation
:param chunk_size: Rows to transmit to the server in a single operation.
Default: Automatically choose a size
:param omero_connector: OMERO.client object which is already connected
to a server. Supersedes any other connection details.
:param server: Address of the server
Expand Down
103 changes: 60 additions & 43 deletions omero2pandas/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@
}


def optimal_chunk_size(column_count):
# We can optimally send ~2m values at a time
rows = 2000000 // column_count
if rows > 50000:
LOGGER.warning(f"Limiting automatic chunk size to 50000 (was {rows})")
return max(min(rows, 50000), 1)


def generate_omero_columns(df):
# Inspect a pandas dataframe to generate OMERO.tables columns
omero_columns = []
string_columns = []
for column_name, column_type in df.dtypes.items():
Expand All @@ -83,8 +92,55 @@ def generate_omero_columns(df):
return omero_columns, string_columns


def generate_omero_columns_csv(csv_path, chunk_size=1000):
# Inspect a CSV file to generate OMERO.tables columns
LOGGER.info(f"Inspecting {csv_path}")
scan = pd.read_csv(csv_path, nrows=chunk_size or 1000)
LOGGER.debug(f"Shape is {scan.shape[0]}x{scan.shape[1]}")
if chunk_size is None:
chunk_size = optimal_chunk_size(len(scan.columns))
LOGGER.debug(f"Using chunk size {chunk_size}")
omero_columns = []
to_resolve = {}
for idx, (column_name, column_type) in enumerate(scan.dtypes.items()):
cleaned_name = column_name.replace('/', '\\')
if column_name in SPECIAL_NAMES and column_type.kind == 'i':
col_class = SPECIAL_NAMES[column_name]
elif column_type.kind in COLUMN_TYPES:
col_class = COLUMN_TYPES[column_type.kind]
else:
raise NotImplementedError(f"Column type "
f"{column_type} not supported")
if col_class == omero.grid.StringColumn:
max_len = scan[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
col = col_class(cleaned_name, "", int(max_len), [])
to_resolve[column_name] = idx
else:
col = col_class(cleaned_name, "", [])
omero_columns.append(col)
LOGGER.debug(f"Generated columns, found {len(to_resolve)} string columns")
# Use a subset of columns to get row count and string lengths
use_cols = to_resolve.keys() or [0]
row_count = 0
LOGGER.info("Scanning CSV for size and column metadata")
for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=use_cols):
# chunk is a DataFrame. To "process" the rows in the chunk:
row_count += len(chunk)
for column_name, index in to_resolve.items():
max_len = chunk[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
max_len = int(max_len)
omero_columns[index].size = max(max_len, omero_columns[index].size)
LOGGER.info(f"Initial scan completed, found {row_count} rows")
return omero_columns, to_resolve.keys(), row_count, chunk_size


def create_table(source, table_name, parent_id, parent_type, conn, chunk_size,
extra_links):
# Create an OMERO.table and upload data
# Make type case-insensitive
parent_type = parent_type.lower().capitalize()
if parent_type not in OBJECT_TYPES:
Expand Down Expand Up @@ -122,14 +178,17 @@ def create_table(source, table_name, parent_id, parent_type, conn, chunk_size,

if isinstance(source, str):
assert os.path.exists(source), f"Could not find file {source}"
columns, str_cols, total_rows = generate_omero_columns_csv(
columns, str_cols, total_rows, chunk_size = generate_omero_columns_csv(
source, chunk_size)
iter_data = (chunk for chunk in pd.read_csv(
source, chunksize=chunk_size))
else:
source = source.copy()
columns, str_cols = generate_omero_columns(source)
total_rows = len(source)
if chunk_size is None:
chunk_size = optimal_chunk_size(len(columns))
LOGGER.debug(f"Using chunk size {chunk_size}")
iter_data = (source.iloc[i:i + chunk_size]
for i in range(0, len(source), chunk_size))

Expand Down Expand Up @@ -194,45 +253,3 @@ def create_table(source, table_name, parent_id, parent_type, conn, chunk_size,
finally:
if table is not None:
table.close()


def generate_omero_columns_csv(csv_path, chunk_size=1000):
LOGGER.info(f"Inspecting {csv_path}")
scan = pd.read_csv(csv_path, nrows=chunk_size)
LOGGER.debug("Shape is ", scan.shape)
omero_columns = []
to_resolve = {}
for idx, (column_name, column_type) in enumerate(scan.dtypes.items()):
cleaned_name = column_name.replace('/', '\\')
if column_name in SPECIAL_NAMES and column_type.kind == 'i':
col_class = SPECIAL_NAMES[column_name]
elif column_type.kind in COLUMN_TYPES:
col_class = COLUMN_TYPES[column_type.kind]
else:
raise NotImplementedError(f"Column type "
f"{column_type} not supported")
if col_class == omero.grid.StringColumn:
max_len = scan[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
col = col_class(cleaned_name, "", int(max_len), [])
to_resolve[column_name] = idx
else:
col = col_class(cleaned_name, "", [])
omero_columns.append(col)
LOGGER.debug(f"Generated columns, found {len(to_resolve)} string columns")
# Use a subset of columns to get row count and string lengths
use_cols = to_resolve.keys() or [0]
row_count = 0
LOGGER.info("Scanning CSV for size and column metadata")
for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=use_cols):
# chunk is a DataFrame. To "process" the rows in the chunk:
row_count += len(chunk)
for column_name, index in to_resolve.items():
max_len = chunk[column_name].str.len().max()
if math.isnan(max_len):
max_len = 1
max_len = int(max_len)
omero_columns[index].size = max(max_len, omero_columns[index].size)
LOGGER.info(f"Initial scan completed, found {row_count} rows")
return omero_columns, to_resolve.keys(), row_count

0 comments on commit 7164289

Please sign in to comment.