Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats and affiliations improvements #191

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cds_migrator_kit/rdm/migration/affiliations/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ def _save_affiliation(self, affiliations):
legacy_affiliation_input=_original_input,
ror_exact_match=affiliation["ror_exact_match"],
)
else:
elif affiliation.get("ror_not_exact_match"):
_affiliation_model = CDSMigrationAffiliationMapping(
legacy_affiliation_input=_original_input,
ror_not_exact_match=affiliation["ror_not_exact_match"],
)
else:
_affiliation_model = CDSMigrationAffiliationMapping(
legacy_affiliation_input=_original_input,
)
db.session.add(_affiliation_model)
db.session.commit()
except IntegrityError as e:
Expand Down
31 changes: 19 additions & 12 deletions cds_migrator_kit/rdm/migration/affiliations/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,34 @@ def _affiliations(self, json_entry, key):
for affiliation_name in affiliations:
if not affiliation_name:
continue

_affiliation = {
"original_input": affiliation_name,
}

(chosen, match_or_suggestions) = affiliations_search(affiliation_name)

if chosen:
_affiliations.append(
_affiliation.update(
{
"original_input": affiliation_name,
"ror_exact_match": match_or_suggestions["organization"][
"id"
],
}
)
else:
for not_exact_match in match_or_suggestions:
if not_exact_match["score"] >= 0.9:
_affiliations.append(
{
"original_input": affiliation_name,
"ror_not_exact_match": not_exact_match[
"organization"
]["id"],
}
)
if match_or_suggestions:
for not_exact_match in match_or_suggestions:
if not_exact_match["score"] >= 0.9:
_affiliation.update(
{
"ror_not_exact_match": not_exact_match[
"organization"
]["id"],
}
)
break
_affiliations.append(_affiliation)

return _affiliations

Expand Down
12 changes: 7 additions & 5 deletions cds_migrator_kit/rdm/migration/stats/event_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def process_download_event(entry, rec_context, logger):
"via_api": False,
"is_robot": entry.get("bot", False),
"country": entry.get("country", ""),
"visitor_id": entry["visitor_id"],
"visitor_id": entry.get("visitor_id", ""),
"unique_session_id": entry["unique_session_id"],
# Note: id_bibrec doesn't have the new format pids
"unique_id": f"ui_{_record_version['new_recid']}",
Expand Down Expand Up @@ -168,7 +168,7 @@ def process_pageview_event(entry, rec_context, logger):
"referrer": None,
"via_api": False,
"is_robot": entry.get("bot", False),
"visitor_id": entry["visitor_id"],
"visitor_id": entry.get("visitor_id", ""),
# Note: id_bibrec doesn't have the new format pids
"unique_id": f"ui_{rec_context['latest_version']}",
"unique_session_id": entry["unique_session_id"],
Expand All @@ -189,14 +189,15 @@ def prepare_new_doc(
try:
new_doc = deepcopy(doc)
# remove to avoid reindexing
new_doc.pop("_id", None)
new_doc["_id"] = f"migrated_{new_doc['_id']}"

new_doc.pop("_score", None)

event_type = new_doc["_source"].pop("event_type", None)

if event_type != doc_type:
raise Exception("Inconsistent doc type")
processed_doc = {}

if event_type == "events.downloads":
processed_doc = process_download_event(
new_doc["_source"], rec_context, logger
Expand Down Expand Up @@ -228,9 +229,10 @@ def prepare_new_doc(
month = f"{date_object.month:02}"

yield {
"_op_type": "index",
"_op_type": "create",
"_index": f"{dest_search_index_prefix}-{index_type}-{year}-{month}",
"_source": processed_doc,
"_id": new_doc["_id"],
}
except Exception as ex:
logger.error(
Expand Down
12 changes: 7 additions & 5 deletions cds_migrator_kit/rdm/migration/stats/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
os_count,
os_scroll,
os_search,
bulk_index_documents,
)


logger = StatsLogger.get_logger()

from opensearchpy import OpenSearch
from opensearchpy.exceptions import OpenSearchException
from opensearchpy.helpers import bulk
from opensearchpy.helpers import bulk, parallel_bulk, BulkIndexError

_QUERY_VIEWS = {
"query": {
Expand Down Expand Up @@ -83,12 +84,13 @@ def _generate_new_events(self, data, rec_context, logger, doc_type):
self.config["DEST_SEARCH_INDEX_PREFIX"],
)
if self.dry_run:
for new_doc in new_docs:
for new_doc in new_docs_generated:
logger.info(json.dumps(new_doc))
else:
bulk(self.dest_os_client, new_docs_generated, raise_on_error=True)
bulk_index_documents(self.dest_os_client, new_docs_generated, logger)

except Exception as ex:
logger.error(ex)
logger.error(str(ex))

def _process_legacy_events_for_recid(self, recid, rec_context, index, event_type):
data = os_search(
Expand Down Expand Up @@ -169,7 +171,7 @@ def validate_stats_for_recid(self, recid, record, event_type):
)
except AssertionError as e:
logger.error(
f"Not all events of type {event_type} were migrated for record: {recid}"
f"Not all events of type {event_type} were migrated for record: {recid}. Legacy count: {legacy_total['count']} - RDM count: {new_total['count']}"
)

def _load(self, entry):
Expand Down
36 changes: 36 additions & 0 deletions cds_migrator_kit/rdm/migration/stats/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

"""CDS-RDM migration stats search module."""

import json
import time

from copy import deepcopy
from opensearchpy import OpenSearch
from opensearchpy.exceptions import OpenSearchException
from opensearchpy.helpers import parallel_bulk, BulkIndexError


def generate_query(doc_type, identifier, legacy_to_rdm_events_map):
Expand Down Expand Up @@ -77,3 +79,37 @@ def os_count(src_os_client, index, q):
ex = _ex
time.sleep(10)
raise ex


def bulk_index_documents(
client,
documents,
logger,
chunk_size=500,
max_chunk_bytes=50 * 1024 * 1024,
):
"""
Index documents into Opensearch using parallel_bulk with improved readability and error handling.
"""
try:
# Execute parallel_bulk with configuration for improved performance
for ok, action in parallel_bulk(
client,
actions=documents,
chunk_size=chunk_size,
max_chunk_bytes=max_chunk_bytes,
raise_on_error=True, # Handle errors manually for better control
raise_on_exception=True,
ignore_status=409, # Ignore 409 Conflict status for existing documents
):
pass

except BulkIndexError as e:
for error in e.errors:
_failed_doc = {
"_op_type": "create",
"_index": error["create"]["_index"],
"_source": error["create"]["data"],
"_id": error["create"]["_id"],
}
logger.error(f"Failed to index: {json.dumps(_failed_doc)}")
2 changes: 1 addition & 1 deletion cds_migrator_kit/rdm/migration/streams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ records:
transform:
files_dump_dir: cds_migrator_kit/rdm/migration/data/summer_student_reports/files/
missing_users: cds_migrator_kit/rdm/migration/data/users
community_id: 9d5c9f2a-9221-4bfa-85cd-bb3736a779b8
community_id: a62b4836-bb73-4f0e-993a-79a35baf5881
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class CDSAffiliations(CdsOverdo):

__ignore_keys__ = {
# IGNORED
"037__9",
"0247_2", # DOI, summer student notes do not have it
"0247_a", # DOI
"0248_a", # oai identifier, not needed to migrate, TBD
Expand All @@ -45,6 +46,7 @@ class CDSAffiliations(CdsOverdo):
"269__a",
"269__b",
"269__c",
"300__a",
"270__m", # document contact email
"595__a", # always value CERN EDS, not displayed, TODO: do we keep?
"595__z", # SOME RECORD HAVE UNCL as value, do we keep it? what does UNCL mean
Expand Down
25 changes: 16 additions & 9 deletions cds_migrator_kit/rdm/migration/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,21 @@ def _match_affiliation(self, affiliation_name):
# Step 1: check if there is a curated input
if match.curated_affiliation:
return match.curated_affiliation
# Step 2: check if there is an exact match
elif match.ror_exact_match:
return {"id": normalize_ror(match.ror_exact_match)}
# Step 3: check if there is not exact match
elif match.ror_not_exact_match:
_affiliation_ror_id = normalize_ror(match.ror_not_exact_match)
raise RecordFlaggedCuration(
subfield="u",
value={"id": normalize_ror(match.ror_not_exact_match)},
value={"id": _affiliation_ror_id},
field="author",
message=f"Affiliation {normalize_ror(match.ror_not_exact_match)} not found as an exact match, ROR id should be checked.",
message=f"Affiliation {_affiliation_ror_id} not found as an exact match, ROR id should be checked.",
stage="vocabulary match",
)
else:
# Step 4: set the originally inserted value from legacy
raise RecordFlaggedCuration(
subfield="u",
value={"name": affiliation_name},
Expand Down Expand Up @@ -306,12 +310,12 @@ def _resource_type(entry):
# filter empty keys
return {k: v for k, v in metadata.items() if v}



def _custom_fields(self, json_entry, json_output):

def field_experiments(record_json, custom_fields_dict):
experiments = record_json.get("custom_fields", {}).get("cern:experiments", [])
experiments = record_json.get("custom_fields", {}).get(
"cern:experiments", []
)
for experiment in experiments:
result = search_vocabulary(experiment, "experiments")

Expand All @@ -331,9 +335,10 @@ def field_experiments(record_json, custom_fields_dict):
stage="vocabulary match",
)


def field_departments(record_json, custom_fields_dict):
departments = record_json.get("custom_fields", {}).get("cern:departments", [])
departments = record_json.get("custom_fields", {}).get(
"cern:departments", []
)
for department in departments:
result = search_vocabulary(department, "departments")
if result["hits"]["total"]:
Expand All @@ -350,7 +355,9 @@ def field_departments(record_json, custom_fields_dict):
)

def field_accelerators(record_json, custom_fields_dict):
accelerators = record_json.get("custom_fields", {}).get("cern:accelerators", [])
accelerators = record_json.get("custom_fields", {}).get(
"cern:accelerators", []
)
for accelerator in accelerators:
result = search_vocabulary(accelerator, "accelerators")
if result["hits"]["total"]:
Expand Down Expand Up @@ -559,7 +566,7 @@ def compute_files(file_dump, versions_dict):
{
file["full_name"]: {
"eos_tmp_path": tmp_eos_root
/ full_path.relative_to(legacy_path_root),
/ full_path.relative_to(legacy_path_root),
"id_bibdoc": file["bibdocid"],
"key": file["full_name"],
"metadata": {},
Expand Down
Loading