-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
bf45593
commit ca471b8
Showing
3 changed files
with
169 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,9 +8,12 @@ | |
# [email protected]. | ||
import logging | ||
import math | ||
import os | ||
from typing import Iterable | ||
|
||
import omero | ||
import omero.grid | ||
import pandas as pd | ||
from tqdm.auto import tqdm | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
@@ -42,6 +45,7 @@ | |
"Project": omero.model.ProjectAnnotationLinkI, | ||
"Screen": omero.model.ScreenAnnotationLinkI, | ||
"Well": omero.model.WellAnnotationLinkI, | ||
"Roi": omero.model.RoiAnnotationLinkI, | ||
} | ||
|
||
OBJECT_TYPES = { | ||
|
@@ -51,11 +55,13 @@ | |
"Project": omero.model.ProjectI, | ||
"Screen": omero.model.ScreenI, | ||
"Well": omero.model.WellI, | ||
"Roi": omero.model.RoiI, | ||
} | ||
|
||
|
||
def generate_omero_columns(df): | ||
omero_columns = [] | ||
string_columns = [] | ||
for column_name, column_type in df.dtypes.items(): | ||
cleaned_name = column_name.replace('/', '\\') | ||
if column_name in SPECIAL_NAMES and column_type.kind == 'i': | ||
|
@@ -66,30 +72,67 @@ def generate_omero_columns(df): | |
raise NotImplementedError(f"Column type " | ||
f"{column_type} not supported") | ||
if col_class == omero.grid.StringColumn: | ||
string_columns.append(column_name) | ||
max_len = df[column_name].str.len().max() | ||
if math.isnan(max_len): | ||
max_len = 1 | ||
col = col_class(cleaned_name, "", int(max_len), []) | ||
# Coerce missing values into strings | ||
df[column_name].fillna('', inplace=True) | ||
else: | ||
col = col_class(cleaned_name, "", []) | ||
omero_columns.append(col) | ||
return omero_columns | ||
return omero_columns, string_columns | ||
|
||
|
||
def create_table(df, table_name, parent_id, parent_type, conn, chunk_size): | ||
def create_table(source, table_name, parent_id, parent_type, conn, chunk_size, | ||
extra_links): | ||
# Make type case-insensitive | ||
parent_type = parent_type.lower().capitalize() | ||
if parent_type not in OBJECT_TYPES: | ||
raise NotImplementedError(f"Type {parent_type} not " | ||
f"supported as a parent object") | ||
elif parent_type == "Roi": | ||
LOGGER.warning("ROI selected as the primary attachment target, " | ||
"resulting table may not be shown in OMERO.web UI.") | ||
parent_ob = conn.getObject(parent_type, parent_id) | ||
if parent_ob is None: | ||
raise ValueError(f"{parent_type} ID {parent_id} not found") | ||
parent_group = parent_ob.details.group.id.val | ||
if extra_links is not None and not isinstance(extra_links, Iterable): | ||
raise ValueError(f"Extra Links should be an iterable list of " | ||
f"type/id pairs, not {extra_links}") | ||
link_to = [] | ||
for ob_type, ob_id in extra_links: | ||
ob_type = ob_type.lower().capitalize() | ||
if ob_type not in OBJECT_TYPES: | ||
raise NotImplementedError(f"Type {ob_type} not " | ||
f"supported as a link target") | ||
if isinstance(ob_id, str): | ||
assert ob_id.isdigit(), f"Object ID {ob_id} is not numeric" | ||
ob_id = int(ob_id) | ||
link_ob = conn.getObject(ob_type, ob_id) | ||
if link_ob is None: | ||
LOGGER.warning(f"{ob_type} ID {ob_id} not found, won't link") | ||
continue | ||
link_to.append((ob_type, ob_id)) | ||
|
||
progress_monitor = tqdm( | ||
desc="Inspecting table...", initial=1, dynamic_ncols=True, | ||
bar_format='{desc}: {percentage:3.0f}%|{bar}| ' | ||
'{n_fmt}/{total_fmt} rows, {elapsed} {postfix}') | ||
|
||
if isinstance(source, str): | ||
assert os.path.exists(source), f"Could not find file {source}" | ||
columns, str_cols, total_rows = generate_omero_columns_csv( | ||
source, chunk_size) | ||
iter_data = (chunk for chunk in pd.read_csv( | ||
source, chunksize=chunk_size)) | ||
else: | ||
source = source.copy() | ||
columns, str_cols = generate_omero_columns(source) | ||
total_rows = len(source) | ||
iter_data = (source.iloc[i:i + chunk_size] | ||
for i in range(0, len(source), chunk_size)) | ||
|
||
df = df.copy() | ||
orig_columns = df.columns.tolist() | ||
columns = generate_omero_columns(df) | ||
resources = conn.c.sf.sharedResources(_ctx={ | ||
"omero.group": str(parent_group)}) | ||
repository_id = resources.repositories().descriptions[0].getId().getValue() | ||
|
@@ -99,23 +142,20 @@ def create_table(df, table_name, parent_id, parent_type, conn, chunk_size): | |
table = resources.newTable(repository_id, table_name, _ctx={ | ||
"omero.group": str(parent_group)}) | ||
table.initialize(columns) | ||
total_to_upload = len(df) | ||
slicer = range(0, total_to_upload, chunk_size) | ||
|
||
bar_fmt = '{desc}: {percentage:3.0f}%|{bar}| ' \ | ||
'{n_fmt}/{total_fmt} rows, {elapsed} {postfix}' | ||
|
||
chunk_iter = tqdm(desc="Uploading table to OMERO", | ||
total=total_to_upload, | ||
bar_format=bar_fmt) | ||
for start in slicer: | ||
to_upload = df[start:start + chunk_size] | ||
for idx, column in enumerate(columns): | ||
ref_name = orig_columns[idx] | ||
column.values = to_upload[ref_name].tolist() | ||
progress_monitor.reset(total=total_rows) | ||
progress_monitor.set_description("Uploading table to OMERO") | ||
|
||
for chunk in iter_data: | ||
if str_cols: | ||
# Coerce missing values into strings | ||
chunk.loc[:, str_cols] = chunk.loc[:, str_cols].fillna('') | ||
for omero_column, (name, col_data) in zip(columns, chunk.items()): | ||
if omero_column.name != name: | ||
LOGGER.debug("Matching", omero_column.name, name) | ||
omero_column.values = col_data.tolist() | ||
table.addData(columns) | ||
chunk_iter.update(len(to_upload)) | ||
chunk_iter.close() | ||
progress_monitor.update(len(chunk)) | ||
progress_monitor.close() | ||
|
||
LOGGER.info("Table creation complete, linking to image") | ||
orig_file = table.getOriginalFile() | ||
|
@@ -131,11 +171,68 @@ def create_table(df, table_name, parent_id, parent_type, conn, chunk_size): | |
link_obj.link(target_obj, annotation) | ||
link_obj = conn.getUpdateService().saveAndReturnObject( | ||
link_obj, _ctx={"omero.group": str(parent_group)}) | ||
LOGGER.info("Saved annotation link") | ||
|
||
annotation_id = link_obj.child.id.val | ||
LOGGER.info(f"Uploaded as FileAnnotation {annotation_id}") | ||
extra_link_objs = [] | ||
unloaded_ann = omero.model.FileAnnotationI(annotation_id, False) | ||
for ob_type, ob_id in link_to: | ||
# Construct additional links | ||
link_obj = LINK_TYPES[ob_type]() | ||
target_obj = OBJECT_TYPES[ob_type](ob_id, False) | ||
link_obj.link(target_obj, unloaded_ann) | ||
extra_link_objs.append(link_obj) | ||
if extra_link_objs: | ||
try: | ||
conn.getUpdateService().saveArray( | ||
extra_link_objs, _ctx={"omero.group": str(parent_group)}) | ||
LOGGER.info(f"Added links to {len(extra_link_objs)} objects") | ||
except Exception as e: | ||
LOGGER.error("Failed to create extra links", exc_info=e) | ||
LOGGER.info(f"Finished creating table {table_name} under " | ||
f"{parent_type} {parent_id}") | ||
return link_obj.child.id.val | ||
return annotation_id | ||
finally: | ||
if table is not None: | ||
table.close() | ||
|
||
|
||
def generate_omero_columns_csv(csv_path, chunk_size=1000): | ||
LOGGER.info(f"Inspecting {csv_path}") | ||
scan = pd.read_csv(csv_path, nrows=chunk_size) | ||
LOGGER.debug("Shape is ", scan.shape) | ||
omero_columns = [] | ||
to_resolve = {} | ||
for idx, (column_name, column_type) in enumerate(scan.dtypes.items()): | ||
cleaned_name = column_name.replace('/', '\\') | ||
if column_name in SPECIAL_NAMES and column_type.kind == 'i': | ||
col_class = SPECIAL_NAMES[column_name] | ||
elif column_type.kind in COLUMN_TYPES: | ||
col_class = COLUMN_TYPES[column_type.kind] | ||
else: | ||
raise NotImplementedError(f"Column type " | ||
f"{column_type} not supported") | ||
if col_class == omero.grid.StringColumn: | ||
max_len = scan[column_name].str.len().max() | ||
if math.isnan(max_len): | ||
max_len = 1 | ||
col = col_class(cleaned_name, "", int(max_len), []) | ||
to_resolve[column_name] = idx | ||
else: | ||
col = col_class(cleaned_name, "", []) | ||
omero_columns.append(col) | ||
LOGGER.debug(f"Generated columns, found {len(to_resolve)} string columns") | ||
# Use a subset of columns to get row count and string lengths | ||
use_cols = to_resolve.keys() or [0] | ||
row_count = 0 | ||
LOGGER.info("Scanning CSV for size and column metadata") | ||
for chunk in pd.read_csv(csv_path, chunksize=chunk_size, usecols=use_cols): | ||
# chunk is a DataFrame. To "process" the rows in the chunk: | ||
row_count += len(chunk) | ||
for column_name, index in to_resolve.items(): | ||
max_len = chunk[column_name].str.len().max() | ||
if math.isnan(max_len): | ||
max_len = 1 | ||
max_len = int(max_len) | ||
omero_columns[index].size = max(max_len, omero_columns[index].size) | ||
LOGGER.info(f"Initial scan completed, found {row_count} rows") | ||
return omero_columns, to_resolve.keys(), row_count |