From 79ef49ea5fb13bd59e67c8b0c69351fc629fd8f7 Mon Sep 17 00:00:00 2001 From: raphaeljafriLB <79109817+raphaeljafriLB@users.noreply.github.com> Date: Thu, 26 Jan 2023 13:11:57 -0500 Subject: [PATCH 1/3] Update client.py --- labelboxbigquery/client.py | 146 ++----------------------------------- 1 file changed, 6 insertions(+), 140 deletions(-) diff --git a/labelboxbigquery/client.py b/labelboxbigquery/client.py index 085abd8..08bc487 100644 --- a/labelboxbigquery/client.py +++ b/labelboxbigquery/client.py @@ -38,142 +38,6 @@ def __init__( self.lb_client = labelboxClient(lb_api_key, endpoint=lb_endpoint, enable_experimental=lb_enable_experimental, app_url=lb_app_url) bq_creds = service_account.Credentials.from_service_account_file(google_key) if google_key else None self.bq_client = bigquery.Client(project=google_project_name, credentials=bq_creds) - - def _sync_metadata_fields(self, bq_table_id, metadata_index={}): - """ Ensures Labelbox's Metadata Ontology has all necessary metadata fields given a metadata_index - Args: - bq_table_id : Required (str) - Table ID structured in the following schema: google_project_name.dataset_name.table_name - metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} - metadata_type must be one of "enum", "string", "datetime" or "number" - Returns: - True if the sync is successful, False if not - """ - # Get your metadata ontology - lb_mdo = self.lb_client.get_data_row_metadata_ontology() - bq_table = self.bq_client.get_table(bq_table_id) - # Convert your meatdata_index values from strings into labelbox.schema.data_row_metadata.DataRowMetadataKind types - conversion = {"enum" : DataRowMetadataKind.enum, "string" : DataRowMetadataKind.string, "datetime" : DataRowMetadataKind.datetime, "number" : DataRowMetadataKind.number} - # Grab all the metadata field names - lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()] - # Iterate over your metadata_index, if a metadata_index key is not an existing metadata_field, then create it in Labelbox - if metadata_index: - for column_name in metadata_index.keys(): - metadata_type = metadata_index[column_name] - if metadata_type not in conversion.keys(): - print(f'Error: Invalid value for metadata_index field {column_name}: {metadata_type}') - return False - if column_name not in lb_metadata_names: - # For enum fields, grab all the unique values from that column as a list - if metadata_type == "enum": - query_job = self.bq_client.query(f"""SELECT DISTINCT {column_name} FROM {bq_table.table_id}""") - enum_options = [row.values()[0] for row in query_job] - else: - enum_options = [] - lb_mdo.create_schema(name=column_name, kind=conversion[metadata_type], options=enum_options) - lb_mdo = self.lb_client.get_data_row_metadata_ontology() - lb_metadata_names = [field['name'] for field in lb_mdo._get_ontology()] - # Iterate over your metadata_index, if a metadata_index key is not an existing column name, then create it in BigQuery - if metadata_index: - column_names = [schema_field.name for schema_field in bq_table.schema] - for metadata_field_name in metadata_index.keys(): - if metadata_field_name not in column_names: - original_schema = bq_table.schema - new_schema = original_schema[:] - new_schema.append(bigquery.SchemaField(metadata_field_name, "STRING")) - bq_table.schema = new_schema - bq_table = self.bq_client.update_table(bq_table, ["schema"]) - # Track data rows loaded from BigQuery - if "lb_integration_source" not in lb_metadata_names: - lb_mdo.create_schema(name="lb_integration_source", kind=DataRowMetadataKind.string) - return True - - def __get_metadata_schema_to_name_key(self, lb_mdo:labelbox.schema.data_row_metadata.DataRowMetadataOntology, divider="///", invert=False): - """ Creates a dictionary where {key=metadata_schema_id: value=metadata_name_key} - - name_key is name for all metadata fields, and for enum options, it is "parent_name{divider}child_name" - Args: - lb_mdo : Required (labelbox.schema.data_row_metadata.DataRowMetadataOntology) - Labelbox metadata ontology - divider : Optional (str) - String separating parent and enum option metadata values - invert : Optional (bool) - If True, will make the name_key the dictionary key and the schema_id the dictionary value - Returns: - Dictionary where {key=metadata_schema_id: value=metadata_name_key} - """ - lb_metadata_dict = lb_mdo.reserved_by_name - lb_metadata_dict.update(lb_mdo.custom_by_name) - metadata_schema_to_name_key = {} - for metadata_field_name in lb_metadata_dict: - if type(lb_metadata_dict[metadata_field_name]) == dict: - metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][next(iter(lb_metadata_dict[metadata_field_name]))].parent] = str(metadata_field_name) - for enum_option in lb_metadata_dict[metadata_field_name]: - metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name][enum_option].uid] = f"{str(metadata_field_name)}{str(divider)}{str(enum_option)}" - else: - metadata_schema_to_name_key[lb_metadata_dict[metadata_field_name].uid] = str(metadata_field_name) - return_value = metadata_schema_to_name_key if not invert else {v:k for k,v in metadata_schema_to_name_key.items()} - return return_value - - def __batch_create_data_rows(self, client, dataset, global_key_to_upload_dict, skip_duplicates=True, batch_size=20000): - """ Checks to make sure no duplicate global keys are uploaded before batch uploading data rows - Args: - client : Required (labelbox.client.Client) : Labelbox Client object - dataset : Required (labelbox.dataset.Dataset) : Labelbox Dataset object - global_key_to_upload_dict : Required (dict) : Dictionary where {key=global_key : value=data_row_dict to-be-uploaded to Labelbox} - skip_duplicates : Optional (bool) - If True, will skip duplicate global_keys, otherwise will generate a unique global_key with a suffix "_1", "_2" and so on - batch_size : Optional (int) : Upload batch size, 20,000 is recommended - Returns: - A concatenated list of upload results for all batch uploads - """ - def __check_global_keys(client, global_keys): - """ Checks if data rows exist for a set of global keys - Args: - client : Required (labelbox.client.Client) : Labelbox Client object - global_keys : Required (list(str)) : List of global key strings - Returns: - True if global keys are available, False if not - """ - query_keys = [str(x) for x in global_keys] - # Create a query job to get data row IDs given global keys - query_str_1 = """query get_datarow_with_global_key($global_keys:[ID!]!){dataRowsForGlobalKeys(where:{ids:$global_keys}){jobId}}""" - query_job_id = client.execute(query_str_1, {"global_keys":global_keys})['dataRowsForGlobalKeys']['jobId'] - # Get the results of this query job - query_str_2 = """query get_job_result($job_id:ID!){dataRowsForGlobalKeysResult(jobId:{id:$job_id}){data{ - accessDeniedGlobalKeys\ndeletedDataRowGlobalKeys\nfetchedDataRows{id}\nnotFoundGlobalKeys}jobStatus}}""" - res = client.execute(query_str_2, {"job_id":query_job_id})['dataRowsForGlobalKeysResult']['data'] - return res - global_keys_list = list(global_key_to_upload_dict.keys()) - payload = __check_global_keys(client, global_keys_list) - loop_counter = 0 - if payload: - while len(payload['notFoundGlobalKeys']) != len(global_keys_list): - loop_counter += 1 - if payload['deletedDataRowGlobalKeys']: - client.clear_global_keys(payload['deletedDataRowGlobalKeys']) - payload = __check_global_keys(client, global_keys_list) - continue - if payload['fetchedDataRows']: - for i in range(0, len(payload['fetchedDataRows'])): - if payload['fetchedDataRows'][i] != "": - if skip_duplicates: - global_key = str(global_keys_list[i]) - del global_key_to_upload_dict[str(global_key)] - else: - global_key = str(global_keys_list[i]) - new_upload_dict = global_key_to_upload_dict[str(global_key)] - del global_key_to_upload_dict[str(global_key)] - new_global_key = f"{global_key}_{loop_counter}" - new_upload_dict['global_key'] = new_global_key - global_key_to_upload_dict[new_global_key] = new_upload_dict - global_keys_list = list(global_key_to_upload_dict.keys()) - payload = __check_global_keys(client, global_keys_list) - upload_list = list(global_key_to_upload_dict.values()) - upload_results = [] - for i in range(0,len(upload_list),batch_size): - batch = upload_list[i:] if i + batch_size >= len(upload_list) else upload_list[i:i+batch_size] - task = dataset.create_data_rows(batch) - errors = task.errors - if errors: - print(f'Data Row Creation Error: {errors}') - return errors - else: - upload_results.extend(task.result) - return upload_results def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, global_key_col=None, external_id_col=None, metadata_index={}, attachment_index={}, skip_duplicates=False): """ Creates Labelbox data rows given a BigQuery table and a Labelbox Dataset @@ -189,10 +53,12 @@ def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, glo Returns: List of errors from data row upload - if successful, is an empty list """ - # Sync metadata index keys with metadata ontology - check = self._sync_metadata_fields(bq_table_id, metadata_index) - if not check: - return None + # Sync metadata index keys with metadata ontology + table = sync_metadata_fields( + client=self.lb_client, table=self.bq_client.get_table(bq_table_id) , get_columns_function=connector.get_columns_function, + add_column_function=connector.add_column_function, get_unique_values_function=connector.get_unique_values_function, + metadata_index=metadata_index, verbose=verbose, extra_client=self.bq_client + ) # Create a metadata_schema_dict where {key=metadata_field_name : value=metadata_schema_id} lb_mdo = self.lb_client.get_data_row_metadata_ontology() metadata_name_key_to_schema = self.__get_metadata_schema_to_name_key(lb_mdo, invert=True) From 92de0bc95eaa45851061a3b0bbe00d2e429be5c4 Mon Sep 17 00:00:00 2001 From: raphaeljafriLB <79109817+raphaeljafriLB@users.noreply.github.com> Date: Thu, 26 Jan 2023 16:19:08 -0500 Subject: [PATCH 2/3] Update client.py --- labelboxbigquery/client.py | 154 +++++++++++-------------------------- 1 file changed, 44 insertions(+), 110 deletions(-) diff --git a/labelboxbigquery/client.py b/labelboxbigquery/client.py index 08bc487..9e6424a 100644 --- a/labelboxbigquery/client.py +++ b/labelboxbigquery/client.py @@ -1,7 +1,8 @@ -import labelbox from labelbox import Client as labelboxClient -from labelbox.schema.data_row_metadata import DataRowMetadataKind +from labelbox.schema.data_row_metadata import DataRowMetadataKind, DataRowMetadata +from labelboxbigquery import connector from labelbase.metadata import sync_metadata_fields, get_metadata_schema_to_name_key +from labelbase.uploaders import batch_create_data_rows from google.cloud import bigquery from google.oauth2 import service_account @@ -37,121 +38,53 @@ def __init__( self.lb_client = labelboxClient(lb_api_key, endpoint=lb_endpoint, enable_experimental=lb_enable_experimental, app_url=lb_app_url) bq_creds = service_account.Credentials.from_service_account_file(google_key) if google_key else None - self.bq_client = bigquery.Client(project=google_project_name, credentials=bq_creds) + self.bq_client = bigquery.Client(project=google_project_name, credentials=bq_creds) def create_data_rows_from_table(self, bq_table_id, lb_dataset, row_data_col, global_key_col=None, external_id_col=None, metadata_index={}, attachment_index={}, skip_duplicates=False): """ Creates Labelbox data rows given a BigQuery table and a Labelbox Dataset Args: - bq_table_id : Required (str) - BigQuery Table ID structured in the following format: "google_project_name.dataset_name.table_name" - lb_dataset : Required (labelbox.schema.dataset.Dataset) - Labelbox dataset to add data rows to - row_data_col : Required (str) - Column name where the data row row data URL is located - global_key_col : Optional (str) - Column name where the data row global key is located - defaults to the row_data column - external_id_col : Optional (str) - Column name where the data row external ID is located - defaults to the row_data column - metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} - metadata_type must be one of "enum", "string", "datetime" or "number" - attachment_index : Optional (dict) - Dictionary where {key=column_name : value=attachment_type} - attachment_type must be one of "IMAGE", "VIDEO", "TEXT", "HTML" + bq_table_id : Required (str) - BigQuery Table ID structured in the following format: "google_project_name.dataset_name.table_name" + lb_dataset : Required (labelbox.schema.dataset.Dataset) - Labelbox dataset to add data rows to + row_data_col : Required (str) - Column name where the data row row data URL is located + global_key_col : Optional (str) - Column name where the data row global key is located - defaults to the row_data column + external_id_col : Optional (str) - Column name where the data row external ID is located - defaults to the row_data column + metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} - metadata_type must be one of "enum", "string", "datetime" or "number" + attachment_index : Optional (dict) - Dictionary where {key=column_name : value=attachment_type} + attachment_type must be one of "IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL" skip_duplicates : Optional (bool) - If True, will skip duplicate global_keys, otherwise will generate a unique global_key with a suffix "_1", "_2" and so on Returns: List of errors from data row upload - if successful, is an empty list """ - # Sync metadata index keys with metadata ontology table = sync_metadata_fields( - client=self.lb_client, table=self.bq_client.get_table(bq_table_id) , get_columns_function=connector.get_columns_function, - add_column_function=connector.add_column_function, get_unique_values_function=connector.get_unique_values_function, - metadata_index=metadata_index, verbose=verbose, extra_client=self.bq_client + client=self.lb_client, table=table, get_columns_function=connector.get_columns_function, add_column_function=connector.add_column_function, + get_unique_values_function=connector.get_unique_values_function, metadata_index=metadata_index, verbose=verbose + ) + # If df returns False, the sync failed - terminate the upload + if type(table) == bool: + return {"upload_results" : [], "conversion_errors" : []} + + # Create a dictionary where {key=global_key : value=labelbox_upload_dictionary} - this is unique to Pandas + global_key_to_upload_dict, conversion_errors = connector.create_upload_dict( + table=table, lb_client=self.lb_client, bq_client=self.bq_client, + row_data_col=row_data_col, global_key_col=global_key_col, external_id_col=external_id_col, + metadata_index=metadata_index, local_files=local_files, divider=divider, verbose=verbose ) - # Create a metadata_schema_dict where {key=metadata_field_name : value=metadata_schema_id} - lb_mdo = self.lb_client.get_data_row_metadata_ontology() - metadata_name_key_to_schema = self.__get_metadata_schema_to_name_key(lb_mdo, invert=True) - # Ensure your row_data, external_id, global_key and metadata_index keys are in your BigQery table, build your query - bq_table = self.bq_client.get_table(bq_table_id) - column_names = [schema_field.name for schema_field in bq_table.schema] - if row_data_col not in column_names: - print(f'Error: No column matching provided "row_data_col" column value {row_data_col}') - return None - else: - index_value = 0 - query_lookup = {row_data_col:index_value} - col_query = row_data_col - index_value += 1 - if global_key_col: - if global_key_col not in column_names: - print(f'Error: No column matching provided "global_key_col" column value {global_key_col}') - return None + # If there are conversion errors, let the user know; if there are no successful conversions, terminate the upload + if conversion_errors: + print(f'There were {len(conversion_errors)} errors in creating your upload list - see result["conversion_errors"] for more information') + if global_key_to_upload_dict: + print(f'Data row upload will continue') else: - col_query += f", {global_key_col}" - query_lookup[global_key_col] = index_value - index_value += 1 - else: - print(f'No global_key_col provided, will default global_key_col to {row_data_col} column') - global_key_col = row_data_col - col_query += f", {global_key_col}" - query_lookup[global_key_col] = index_value - index_value += 1 - if external_id_col: - if external_id_col not in column_names: - print(f'Error: No column matching provided "gloabl_key" column value {external_id_col}') - return None - else: - col_query+= f", {external_id_col}" - query_lookup[external_id_col] = index_value - index_value += 1 - if metadata_index: - for metadata_field_name in metadata_index: - mdf = metadata_field_name.replace(" ", "_") - if mdf not in column_names: - print(f'Error: No column matching metadata_index key {metadata_field_name}') - return None - else: - col_query+=f', {mdf}' - query_lookup[mdf] = index_value - index_value += 1 - if attachment_index: - for attachment_field_name in attachment_index: - atf = attachment_field_name.replace(" ", "_") - attachment_whitelist = ["IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL"] - if attachment_index[attachment_field_name] not in attachment_whitelist: - print(f'Error: Invalid value for attachment_index key {attachment_field_name} : {attachment_index[attachment_field_name]}\n must be one of {attachment_whitelist}') - return None - if atf not in column_names: - print(f'Error: No column matching attachment_index key {attachment_field_name}') - return None - else: - col_query+=f', {atf}' - query_lookup[atf] = index_value - index_value += 1 - # Query your row_data, external_id, global_key and metadata_index key columns from - query = f"""SELECT {col_query} FROM {bq_table.project}.{bq_table.dataset_id}.{bq_table.table_id}""" - query_job = self.bq_client.query(query) - # Iterate over your query payload to construct a list of data row dictionaries in Labelbox format - global_key_to_upload_dict = {} - for row in query_job: - data_row_upload_dict = { - "row_data" : row[query_lookup[row_data_col]], - "metadata_fields" : [{"schema_id":metadata_name_key_to_schema['lb_integration_source'],"value":"BigQuery"}], - "global_key" : str(row[query_lookup[global_key_col]]) - } - if external_id_col: - data_row_upload_dict['external_id'] = row[query_lookup[external_id_col]] - if metadata_index: - for metadata_field_name in metadata_index: - mdf = metadata_field_name.replace(" ", "_") - metadata_schema_id = metadata_name_key_to_schema[metadata_field_name] - mdx_value = f"{metadata_field_name}///{row[query_lookup[mdf]]}" - if mdx_value in metadata_name_key_to_schema.keys(): - metadata_value = metadata_name_key_to_schema[mdx_value] - else: - metadata_value = row[query_lookup[mdf]] - data_row_upload_dict['metadata_fields'].append({ - "schema_id" : metadata_schema_id, - "value" : metadata_value - }) - if attachment_index: - data_row_upload_dict['attachments'] = [{"type" : attachment_index[attachment_field_name], "value" : row[query_lookup[attachment_field_name]]} for attachment_field_name in attachment_index] - global_key_to_upload_dict[row[query_lookup[global_key_col]]] = data_row_upload_dict - # Batch upload your list of data row dictionaries in Labelbox format - upload_results = self.__batch_create_data_rows(client=self.lb_client, dataset=lb_dataset, global_key_to_upload_dict=global_key_to_upload_dict) - print(f'Success') - return upload_results + print(f'Data row upload will not continue') + return {"upload_results" : [], "conversion_errors" : errors} + + # Upload your data rows to Labelbox + upload_results = batch_create_data_rows( + client=self.lb_client, dataset=lb_dataset, global_key_to_upload_dict=global_key_to_upload_dict, + skip_duplicates=skip_duplicates, divider=divider, verbose=verbose + ) + return {"upload_results" : upload_results, "conversion_errors" : conversion_errors} + def create_table_from_dataset(self, bq_dataset_id, bq_table_name, lb_dataset, metadata_index={}): """ Creates a BigQuery Table from a Labelbox dataset given a BigQuery Dataset ID, desired Table name, and optional metadata_index @@ -222,9 +155,10 @@ def upsert_table_metadata(self, bq_table_id, lb_dataset, global_key_col, metadat Updated BigQuery table """ # Sync metadata index keys with metadata ontology - check = self._sync_metadata_fields(bq_table_id, metadata_index) - if not check: - return None + bq_table = sync_metadata_fields( + client=self.lb_client, table=table, get_columns_function=connector.get_columns_function, add_column_function=connector.add_column_function, + get_unique_values_function=connector.get_unique_values_function, metadata_index=metadata_index, verbose=verbose + ) bq_table = self.bq_client.get_table(bq_table_id) data_rows = lb_dataset.export_data_rows(include_metadata=True) metadata_schema_to_name_key = self.__get_metadata_schema_to_name_key(self.lb_client.get_data_row_metadata_ontology(), invert=False) @@ -304,6 +238,6 @@ def upsert_labelbox_metadata(self, bq_table_id, global_key_col, global_keys_list table_value = query_dict[drid_to_global_key[drid]][field_name] name_key = f"{field_name}///{table_value}" field.value = metadata_name_key_to_schema[name_key] if name_key in metadata_name_key_to_schema.keys() else table_value - upload_metadata.append(labelbox.schema.data_row_metadata.DataRowMetadata(data_row_id=drid, fields=new_metadata)) + upload_metadata.append(DataRowMetadata(data_row_id=drid, fields=new_metadata)) results = lb_mdo.bulk_upsert(upload_metadata) return results From 29a0989df93497ef34a765f5865d9325cd0ad66a Mon Sep 17 00:00:00 2001 From: raphaeljafriLB <79109817+raphaeljafriLB@users.noreply.github.com> Date: Thu, 26 Jan 2023 16:40:55 -0500 Subject: [PATCH 3/3] Update connector.py --- labelboxbigquery/connector.py | 120 ++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/labelboxbigquery/connector.py b/labelboxbigquery/connector.py index 27430c2..4783a49 100644 --- a/labelboxbigquery/connector.py +++ b/labelboxbigquery/connector.py @@ -1,7 +1,118 @@ +from labelbox import Client +from labelbase.metadata import get_metadata_schema_to_name_key from google.cloud.bigquery import SchemaField from google.cloud.bigquery.table import Table from google.cloud.bigquery import Client as bqClient +def create_upload_dict(table:Table, lb_client:Client, bq_client:bqClient, row_data_col:str, global_key_col:str="", + external_id_col:str="", metadata_index:dict={}, attachment_index:dict={}local_files:bool=False, + divider:str="///", verbose=False): + """ Structures a query from your BigQuery table, then transforms the payload into an upload dictionary + Args: + table : Required (google.cloud.bigquery.table.Table) - BigQuery Table + lb_client : Required (labelbox.client.Client) - Labelbox Client object + bq_client : Required (bigquery.Client) - BigQuery Client object + row_data_col : Required (str) - Column containing asset URL or file path + global_key_col : Optional (str) - Column name containing the data row global key - defaults to row data + external_id_col : Optional (str) - Column name containing the data row external ID - defaults to global key + metadata_index : Optional (dict) - Dictionary where {key=column_name : value=metadata_type} + metadata_type must be either "enum", "string", "datetime" or "number" + attachment_index : Optional (dict) - Dictionary where {key=column_name : value=attachment_type} + attachment_type must be one of "IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL" + local_files : Optional (bool) - Determines how to handle row_data_col values + If True, treats row_data_col values as file paths uploads the local files to Labelbox + If False, treats row_data_col values as urls (assuming delegated access is set up) + divider : Optional (str) - String delimiter for all name keys generated for parent/child schemas + verbose : Optional (bool) - If True, prints details about code execution; if False, prints minimal information + Returns: + Two values: + - global_key_to_upload_dict - Dictionary where {key=global_key : value=data row dictionary in upload format} + - errors - List of dictionaries containing conversion error information; see connector.create_data_rows() for more information + """ + global_key_to_upload_dict = {} + try: + global_key_col = global_key_col if global_key_col else row_data_col + external_id_col = external_id_col if external_id_col else global_key_col + if verbose: + print(f'Creating upload list - {get_table_length_function(table)} rows in BigQuery Table') + if get_table_length_function(table=table) != get_unique_values_function(table=table, column_name=global_key_col): + print(f"Warning: Your global key column is not unique - upload will resume, only uploading 1 data row for duplicate global keys") + metadata_name_key_to_schema = get_metadata_schema_to_name_key(client=lb_client, lb_mdo=False, divider=divider, invert=True) + column_names = get_columns_function(table=table) + if row_data_col not in column_names: + raise ValueError(f'Error: No column matching provided "row_data_col" column value {row_data_col}') + else: + index_value = 0 + query_lookup = {row_data_col:index_value} + col_query = row_data_col + index_value += 1 + if global_key_col not in column_names: + raise ValueError(f'Error: No column matching provided "global_key_col" column value {global_key_col}') + else: + col_query += f", {global_key_col}" + query_lookup[global_key_col] = index_value + index_value += 1 + if external_id_col not in column_names: + raise ValueError(f'Error: No column matching provided "gloabl_key" column value {external_id_col}') + else: + col_query+= f", {external_id_col}" + query_lookup[external_id_col] = index_value + index_value += 1 + if metadata_index: + for metadata_field_name in metadata_index: + mdf = metadata_field_name.replace(" ", "_") + if mdf not in column_names: + raise ValueError(f'Error: No column matching metadata_index key {metadata_field_name}') + else: + col_query+=f', {mdf}' + query_lookup[mdf] = index_value + index_value += 1 + if attachment_index: + for attachment_field_name in attachment_index: + atf = attachment_field_name.replace(" ", "_") + attachment_whitelist = ["IMAGE", "VIDEO", "RAW_TEXT", "HTML", "TEXT_URL"] + if attachment_index[attachment_field_name] not in attachment_whitelist: + raise ValueError(f'Error: Invalid value for attachment_index key {attachment_field_name} : {attachment_index[attachment_field_name]}\n must be one of {attachment_whitelist}') + if atf not in column_names: + raise ValueError(f'Error: No column matching attachment_index key {attachment_field_name}') + else: + col_query+=f', {atf}' + query_lookup[atf] = index_value + index_value += 1 + # Query your row_data, external_id, global_key and metadata_index key columns from + query = f"""SELECT {col_query} FROM {table.project}.{table.dataset_id}.{table.table_id}""" + query_job = bq_client.query(query) + # Iterate over your query payload to construct a list of data row dictionaries in Labelbox format + global_key_to_upload_dict = {} + for row in query_job: + data_row_upload_dict = { + "row_data" : row[query_lookup[row_data_col]], + "metadata_fields" : [{"schema_id":metadata_name_key_to_schema['lb_integration_source'],"value":"BigQuery"}], + "global_key" : str(row[query_lookup[global_key_col]]), + "external_id" : str(row[query_lookup[external_id_col]]) + } + if metadata_index: + for metadata_field_name in metadata_index: + mdf = metadata_field_name.replace(" ", "_") + metadata_schema_id = metadata_name_key_to_schema[metadata_field_name] + mdx_value = f"{metadata_field_name}///{row[query_lookup[mdf]]}" + if mdx_value in metadata_name_key_to_schema.keys(): + metadata_value = metadata_name_key_to_schema[mdx_value] + else: + metadata_value = row[query_lookup[mdf]] + data_row_upload_dict['metadata_fields'].append({ + "schema_id" : metadata_schema_id, + "value" : metadata_value + }) + if attachment_index: + data_row_upload_dict['attachments'] = [{"type" : attachment_index[attachment_field_name], "value" : row[query_lookup[attachment_field_name]]} for attachment_field_name in attachment_index] + global_key_to_upload_dict[row[query_lookup[global_key_col]]] = data_row_upload_dict + errors = None + except Exception as e: + global_key_to_upload_dict = global_key_to_upload_dict if global_key_to_upload_dict else {} + errors = e + return global_key_to_upload_dict, errors + def get_columns_function(table:Table): """Grabs all column names from a Pandas DataFrame Args: @@ -39,3 +150,12 @@ def add_column_function(table:Table, column_name:str, client:bqClient=None, defa table.schema = new_schema table = bq_client.update_table(table, ["schema"]) return table + +def get_table_length_function(table:Table): + """ Tells you the size of a Pandas DataFrame + Args: + table : Required (google.cloud.bigquery.table.Table) - BigQuery Table + Returns: + The length of your table as an integer + """ + return int(table.num_rows)