From 0fd4b5d2dfa89d9ccc82a0aef0ef990a1bb5b3c1 Mon Sep 17 00:00:00 2001 From: Joshua Greben Date: Thu, 31 Oct 2024 10:02:16 -0700 Subject: [PATCH 1/5] Increase timeout for client --- libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py b/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py index b1de1812..23106bce 100644 --- a/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py +++ b/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py @@ -23,7 +23,7 @@ dryrun = False # request timeout in seconds -ASYNC_CLIENT_TIMEOUT = 30 +ASYNC_CLIENT_TIMEOUT = 60 # limit the number of parallel threads. # Try different values. Bigger values - for increasing performance, but could produce "Connection timeout exception" From 74266123251d4ed5fbb6d73386df15ffe39c3d16 Mon Sep 17 00:00:00 2001 From: Joshua Greben Date: Thu, 31 Oct 2024 10:08:27 -0700 Subject: [PATCH 2/5] Add logger to fix_encumbrances script so mesages appear in airflow logs --- .../folio/encumbrances/fix_encumbrances.py | 117 +++++++++--------- 1 file changed, 60 insertions(+), 57 deletions(-) diff --git a/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py b/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py index 23106bce..0a993350 100644 --- a/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py +++ b/libsys_airflow/plugins/folio/encumbrances/fix_encumbrances.py @@ -12,8 +12,11 @@ import httpx import requests +import logging from airflow.models import Variable +logger = logging.getLogger(__name__) + ITEM_MAX = 2147483647 MAX_BY_CHUNK = 1000 @@ -45,7 +48,7 @@ def login(tenant, username, password): r = requests.post(okapi_url + '/authn/login', headers=login_headers, json=data) if r.status_code != 201: raise_exception_for_reply(r) - print('Logged in successfully.') + logger.info('Logged in successfully.') okapi_token = r.json()['okapiToken'] return { 'x-okapi-tenant': tenant, @@ -53,7 +56,7 @@ def login(tenant, username, password): 'Content-Type': 'application/json', } except Exception as err: - print('Error during login:', err) + logger.info('Error during login:', err) raise SystemExit(1) @@ -64,10 +67,10 @@ async def get_request_without_query(url: str) -> dict: if resp.status_code == HTTPStatus.OK: return resp.json() else: - print(f'Error getting record with url {url} : \n{resp.text} ') + logger.info(f'Error getting record with url {url} : \n{resp.text} ') raise SystemExit(1) except Exception as err: - print(f'Error getting record with url {url} : {err=}') + logger.info(f'Error getting record with url {url} : {err=}') raise SystemExit(1) @@ -82,10 +85,10 @@ async def get_request(url: str, query: str) -> dict: if resp.status_code == HTTPStatus.OK: return resp.json() else: - print(f'Error getting records by {url} ?query= "{query}": \n{resp.text} ') + logger.info(f'Error getting records by {url} ?query= "{query}": \n{resp.text} ') raise SystemExit(1) except Exception as err: - print(f'Error getting records by {url}?query={query}: {err=}') + logger.info(f'Error getting records by {url}?query={query}: {err=}') raise SystemExit(1) @@ -98,11 +101,11 @@ async def put_request(url: str, data): ) if resp.status_code == HTTPStatus.NO_CONTENT: return - print(f'Error updating record {url} "{data}": {resp.text}') + logger.info(f'Error updating record {url} "{data}": {resp.text}') raise SystemExit(1) except Exception as err: - print(f'Error updating record {url} "{data}": {err=}') + logger.info(f'Error updating record {url} "{data}": {err=}') raise SystemExit(1) @@ -113,11 +116,11 @@ async def delete_request(url: str): resp = await client.delete(url, headers=headers, timeout=ASYNC_CLIENT_TIMEOUT) if resp.status_code == HTTPStatus.NO_CONTENT: return - print(f'Error deleting record {url}: {resp.text}') + logger.info(f'Error deleting record {url}: {resp.text}') raise SystemExit(1) except Exception as err: - print(f'Error deleting record {url}: {err=}') + logger.info(f'Error deleting record {url}: {err=}') raise SystemExit(1) @@ -131,7 +134,7 @@ def get_fiscal_years_by_query(query) -> dict: raise_exception_for_reply(r) return r.json()['fiscalYears'] except Exception as err: - print(f'Error getting fiscal years with query "{query}": {err}') + logger.info(f'Error getting fiscal years with query "{query}": {err}') raise SystemExit(1) @@ -170,7 +173,7 @@ def get_order_ids_by_query(query) -> list: for order in orders: ids.append(order['id']) except Exception as err: - print(f'Error getting order ids with query "{query}": {err}') + logger.info(f'Error getting order ids with query "{query}": {err}') raise SystemExit(1) return ids @@ -197,7 +200,7 @@ def get_fiscal_year(fiscal_year_code) -> dict: query = f'code=="{fiscal_year_code}"' fiscal_years = get_fiscal_years_by_query(query) if len(fiscal_years) == 0: - print(f'Could not find fiscal year "{fiscal_year_code}".') + logger.info(f'Could not find fiscal year "{fiscal_year_code}".') raise SystemExit(1) return fiscal_years[0] @@ -210,18 +213,18 @@ def test_fiscal_year_current(fiscal_year) -> bool: def get_closed_orders_ids() -> list: - print('Retrieving closed order ids...') + logger.info('Retrieving closed order ids...') query = 'workflowStatus=="Closed"' closed_orders_ids = get_order_ids_by_query(query) - print(' Closed orders:', len(closed_orders_ids)) + logger.info(' Closed orders:', len(closed_orders_ids)) return closed_orders_ids def get_open_orders_ids() -> list: - print('Retrieving open order ids...') + logger.info('Retrieving open order ids...') query = 'workflowStatus=="Open"' open_orders_ids = get_order_ids_by_query(query) - print(' Open orders:', len(open_orders_ids)) + logger.info(' Open orders:', len(open_orders_ids)) return open_orders_ids @@ -240,7 +243,7 @@ async def get_budget_by_fund_id(fund_id, fiscal_year_id) -> dict: query = f'fundId=={fund_id} AND fiscalYearId=={fiscal_year_id}' budgets = await get_budgets_by_query(query) if len(budgets) == 0: - print( + logger.info( f'Could not find budget for fund "{fund_id}" and fiscal year "{fiscal_year_id}".' ) raise SystemExit(1) @@ -357,16 +360,16 @@ async def remove_duplicate_encumbrances_in_order(order_id, fiscal_year_id, sem) if len(encumbrance_changes) == 0: sem.release() return 0 - print(f" Removing the following encumbrances for order {order_id}:") + logger.info(f" Removing the following encumbrances for order {order_id}:") for change in encumbrance_changes: - print(f" {change['remove']['id']}") + logger.info(f" {change['remove']['id']}") await remove_encumbrances_and_update_polines(encumbrance_changes) sem.release() return len(encumbrance_changes) async def remove_duplicate_encumbrances(open_and_closed_orders_ids, fiscal_year_id): - print('Removing duplicate encumbrances...') + logger.info('Removing duplicate encumbrances...') futures = [] sem = asyncio.Semaphore(MAX_ACTIVE_THREADS) for idx, order_id in enumerate(open_and_closed_orders_ids): @@ -380,7 +383,7 @@ async def remove_duplicate_encumbrances(open_and_closed_orders_ids, fiscal_year_ nb_removed_encumbrances = await asyncio.gather(*futures) # progress(len(open_and_closed_orders_ids), len(open_and_closed_orders_ids)) - print(f' Removed {sum(nb_removed_encumbrances)} encumbrance(s).') + logger.info(f' Removed {sum(nb_removed_encumbrances)} encumbrance(s).') # --------------------------------------------------- @@ -398,7 +401,7 @@ async def update_encumbrance_fund_id(encumbrance, new_fund_id, poline): encumbrance['fromFundId'] = new_fund_id encumbrance_id = encumbrance['id'] order_id = poline['purchaseOrderId'] - print( + logger.info( f" Fixing fromFundId for po line {poline['id']} ({poline['poLineNumber']}) encumbrance {encumbrance_id}" ) await transaction_summary(order_id, 1) @@ -416,20 +419,20 @@ async def fix_fund_id_with_duplicate_encumbrances(encumbrances, fd_fund_id, poli else: encumbrances_with_bad_fund.append(encumbrance) if len(encumbrances_with_bad_fund) == 0: - print( + logger.info( f" Warning: there is a remaining duplicate encumbrance for poline {poline['id']} " f"({poline['poLineNumber']})." ) return if len(encumbrances_with_right_fund) != 1: - print( + logger.info( f" Problem fixing encumbrances for poline {poline['id']} ({poline['poLineNumber']}), " "please fix by hand." ) return replace_by = encumbrances_with_right_fund[0] for encumbrance_to_remove in encumbrances_with_bad_fund: - print( + logger.info( f" Removing encumbrance {encumbrance_to_remove['id']} for po line {poline['id']} " f"({poline['poLineNumber']})" ) @@ -524,12 +527,12 @@ async def process_order_encumbrances_relations(order_id, fiscal_year_id, order_s async def fix_poline_encumbrances_relations( open_orders_ids, fiscal_year_id, fy_is_current ): - print('Fixing poline-encumbrance links...') + logger.info('Fixing poline-encumbrance links...') if len(open_orders_ids) == 0: - print(' Found no open orders.') + logger.info(' Found no open orders.') return if not fy_is_current: - print( + logger.info( ' Fiscal year is not current, the step to fix po line encumbrance relations will be skipped.' ) return @@ -615,7 +618,7 @@ async def fix_order_status_and_release_encumbrances(order_id, encumbrances): await put_request(url, encumbrance) except Exception as err: - print( + logger.info( f'Error when fixing order status in encumbrances for order {order_id}:', err ) raise SystemExit(1) @@ -628,9 +631,9 @@ async def fix_order_encumbrances_order_status(order_id, encumbrances): # Eventually we could rely on the post-MODFISTO-328 implementation to change orderStatus directly # (Morning Glory bug fix). try: - # print(f'\n Fixing the following encumbrance(s) for order {order_id} :') + # logger.info(f'\n Fixing the following encumbrance(s) for order {order_id} :') for encumbrance in encumbrances: - print(f" {encumbrance['id']}") + logger.info(f" {encumbrance['id']}") modified_encumbrances = await unrelease_order_encumbrances( order_id, encumbrances ) @@ -639,7 +642,7 @@ async def fix_order_encumbrances_order_status(order_id, encumbrances): order_id, modified_encumbrances ) except Exception as err: - print( + logger.info( f'Error when fixing order status in encumbrances for order {order_id}:', err ) raise SystemExit(1) @@ -660,9 +663,9 @@ async def fix_encumbrance_order_status_for_closed_order( async def fix_encumbrance_order_status_for_closed_orders( closed_orders_ids, fiscal_year_id ): - print('Fixing encumbrance order status for closed orders...') + logger.info('Fixing encumbrance order status for closed orders...') if len(closed_orders_ids) == 0: - print(' Found no closed orders.') + logger.info(' Found no closed orders.') return fix_encumbrance_futures = [] max_active_order_threads = 5 @@ -677,7 +680,7 @@ async def fix_encumbrance_order_status_for_closed_orders( nb_fixed_encumbrances = await asyncio.gather(*fix_encumbrance_futures) # progress(len(closed_orders_ids), len(closed_orders_ids)) - print(f' Fixed order status for {sum(nb_fixed_encumbrances)} encumbrance(s).') + logger.info(f' Fixed order status for {sum(nb_fixed_encumbrances)} encumbrance(s).') # --------------------------------------------------- @@ -685,9 +688,9 @@ async def fix_encumbrance_order_status_for_closed_orders( async def unrelease_encumbrances(order_id, encumbrances): - # print(f'\n Unreleasing the following encumbrance(s) for order {order_id} :') + # logger.info(f'\n Unreleasing the following encumbrance(s) for order {order_id} :') for encumbrance in encumbrances: - print(f" {encumbrance['id']}") + logger.info(f" {encumbrance['id']}") await transaction_summary(order_id, len(encumbrances)) @@ -720,9 +723,9 @@ async def unrelease_encumbrances_with_non_zero_amounts( async def unrelease_open_orders_encumbrances_with_nonzero_amounts( fiscal_year_id, open_orders_ids ): - print('Unreleasing open orders encumbrances with non-zero amounts...') + logger.info('Unreleasing open orders encumbrances with non-zero amounts...') if len(open_orders_ids) == 0: - print(' Found no open orders.') + logger.info(' Found no open orders.') return enc_futures = [] sem = asyncio.Semaphore(MAX_ACTIVE_THREADS) @@ -739,7 +742,7 @@ async def unrelease_open_orders_encumbrances_with_nonzero_amounts( unreleased_encumbrances_amounts = await asyncio.gather(*enc_futures) # progress(len(open_orders_ids), len(open_orders_ids)) - print( + logger.info( f' Unreleased {sum(unreleased_encumbrances_amounts)} open order encumbrance(s) with non-zero amounts.' ) @@ -749,9 +752,9 @@ async def unrelease_open_orders_encumbrances_with_nonzero_amounts( async def release_encumbrances(order_id, encumbrances): - # print\(f'\n Releasing the following encumbrances for order {order_id} :') + # logger.info\(f'\n Releasing the following encumbrances for order {order_id} :') for encumbrance in encumbrances: - print(f" {encumbrance['id']}") + logger.info(f" {encumbrance['id']}") await transaction_summary(order_id, len(encumbrances)) @@ -785,9 +788,9 @@ async def release_encumbrances_with_negative_amounts( async def release_open_orders_encumbrances_with_negative_amounts( fiscal_year_id, open_orders_ids ): - print('Releasing open orders encumbrances with negative amounts...') + logger.info('Releasing open orders encumbrances with negative amounts...') if len(open_orders_ids) == 0: - print(' Found no open orders.') + logger.info(' Found no open orders.') return enc_futures = [] sem = asyncio.Semaphore(MAX_ACTIVE_THREADS) @@ -804,7 +807,7 @@ async def release_open_orders_encumbrances_with_negative_amounts( released_encumbrances_amounts = await asyncio.gather(*enc_futures) # progress(len(open_orders_ids), len(open_orders_ids)) - print( + logger.info( f' Released {sum(released_encumbrances_amounts)} open order encumbrance(s) with negative amounts.' ) @@ -845,9 +848,9 @@ async def release_cancelled_pol_encumbrances(order_id, fiscal_year_id, sem) -> i async def release_cancelled_order_line_encumbrances(fiscal_year_id, open_orders_ids): - print('Releasing cancelled order line encumbrances...') + logger.info('Releasing cancelled order line encumbrances...') if len(open_orders_ids) == 0: - print(' Found no open orders.') + logger.info(' Found no open orders.') return enc_futures = [] sem = asyncio.Semaphore(MAX_ACTIVE_THREADS) @@ -862,7 +865,7 @@ async def release_cancelled_order_line_encumbrances(fiscal_year_id, open_orders_ released_encumbrances_amounts = await asyncio.gather(*enc_futures) # progress(len(open_orders_ids), len(open_orders_ids)) - print( + logger.info( f' Released {sum(released_encumbrances_amounts)} cancelled order line encumbrance(s).' ) @@ -877,7 +880,7 @@ async def update_budgets(encumbered, fund_id, fiscal_year_id, sem) -> int: # Cast into decimal values, so 0 == 0.0 == 0.00 will return true if Decimal(str(budget['encumbered'])) != Decimal(encumbered): - # print(f" Budget \"{budget['name']}\": changing encumbered from {budget['encumbered']} to {encumbered}") + # logger.info(f" Budget \"{budget['name']}\": changing encumbered from {budget['encumbered']} to {encumbered}") budget['encumbered'] = encumbered url = f"{okapi_url}/finance-storage/budgets/{budget['id']}" @@ -890,7 +893,7 @@ async def update_budgets(encumbered, fund_id, fiscal_year_id, sem) -> int: async def recalculate_budget_encumbered(open_and_closed_orders_ids, fiscal_year_id): # Recalculate the encumbered property for all the budgets related to these encumbrances # Take closed orders into account because we might have to set a budget encumbered to 0 - print( + logger.info( f'Recalculating budget encumbered for {len(open_and_closed_orders_ids)} orders ...' ) enc_future = [] @@ -917,7 +920,7 @@ async def recalculate_budget_encumbered(open_and_closed_orders_ids, fiscal_year_ if fund_id in encumbered_for_fund: encumbered_for_fund[fund_id] += Decimal(str(encumbrance['amount'])) - print(' Updating budgets...') + logger.info(' Updating budgets...') update_budget_futures = [] for fund_id, encumbered in encumbered_for_fund.items(): @@ -929,8 +932,8 @@ async def recalculate_budget_encumbered(open_and_closed_orders_ids, fiscal_year_ ) nb_modified = sum(await asyncio.gather(*update_budget_futures)) - print(f' Edited {nb_modified} budget(s).') - print(' Done recalculating budget encumbered.') + logger.info(f' Edited {nb_modified} budget(s).') + logger.info(' Done recalculating budget encumbered.') # --------------------------------------------------- @@ -956,9 +959,9 @@ async def release_order_encumbrances(order_id, fiscal_year_id, sem) -> int: async def release_unreleased_encumbrances_for_closed_orders( closed_orders_ids, fiscal_year_id ): - print('Releasing unreleased encumbrances for closed orders...') + logger.info('Releasing unreleased encumbrances for closed orders...') if len(closed_orders_ids) == 0: - print(' Found no closed orders.') + logger.info(' Found no closed orders.') return nb_released_encumbrance_futures = [] sem = asyncio.Semaphore(MAX_ACTIVE_THREADS) @@ -974,7 +977,7 @@ async def release_unreleased_encumbrances_for_closed_orders( nb_released_encumbrances = await asyncio.gather(*nb_released_encumbrance_futures) # progress(len(closed_orders_ids), len(closed_orders_ids)) - print(f' Released {sum(nb_released_encumbrances)} encumbrance(s).') + logger.info(f' Released {sum(nb_released_encumbrances)} encumbrance(s).') # --------------------------------------------------- From a3a600d54d95ed55f2ea77f56cb7faf63c702662 Mon Sep 17 00:00:00 2001 From: Joshua Greben Date: Thu, 31 Oct 2024 10:16:07 -0700 Subject: [PATCH 3/5] Black ignore fix_encumbrances scripts --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 9aadd97f..242c09aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -73,6 +73,7 @@ exclude = ''' /( vendor_loads_migration | digital_bookplates_migration + | libsys_airflow/plugins/folio/encumbrances )/ ''' skip-string-normalization = true From fd9c3a2d525d0b425926349645355d3a2ba53af9 Mon Sep 17 00:00:00 2001 From: Joshua Greben Date: Thu, 31 Oct 2024 10:24:44 -0700 Subject: [PATCH 4/5] Do not retry failed fix_encumbrances dags --- libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py | 1 + libsys_airflow/dags/folio_finance/law_fix_encumbrances.py | 1 + libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py | 1 + 3 files changed, 3 insertions(+) diff --git a/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py index 2b235a68..cdc1dd40 100644 --- a/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py @@ -34,6 +34,7 @@ ), start_date=datetime(2024, 8, 29), catchup=False, + retries=0, tags=["folio"], params={ "choice": Param( diff --git a/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py index bc5ab7ca..9cbcbc1b 100644 --- a/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py @@ -33,6 +33,7 @@ ), start_date=datetime(2024, 8, 29), catchup=False, + retries=0, tags=["folio"], params={ "choice": Param( diff --git a/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py index 3ba99dec..e6ad8b3b 100644 --- a/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py @@ -34,6 +34,7 @@ ), start_date=datetime(2024, 8, 29), catchup=False, + retries=0, tags=["folio"], params={ "choice": Param( From 4c7fc2179acbd009959c533312418b7e1f8c529d Mon Sep 17 00:00:00 2001 From: Joshua Greben Date: Thu, 31 Oct 2024 10:27:28 -0700 Subject: [PATCH 5/5] Do not retry failed fix_encumbrances dags --- libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py | 3 +-- libsys_airflow/dags/folio_finance/law_fix_encumbrances.py | 3 +-- libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py | 3 +-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py index cdc1dd40..e5e1d7d9 100644 --- a/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/lane_fix_encumbrances.py @@ -18,7 +18,7 @@ "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, - "retries": 1, + "retries": 0, "retry_delay": timedelta(minutes=1), } @@ -34,7 +34,6 @@ ), start_date=datetime(2024, 8, 29), catchup=False, - retries=0, tags=["folio"], params={ "choice": Param( diff --git a/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py index 9cbcbc1b..effabcd9 100644 --- a/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/law_fix_encumbrances.py @@ -18,7 +18,7 @@ "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, - "retries": 1, + "retries": 0, "retry_delay": timedelta(minutes=1), } @@ -33,7 +33,6 @@ ), start_date=datetime(2024, 8, 29), catchup=False, - retries=0, tags=["folio"], params={ "choice": Param( diff --git a/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py b/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py index e6ad8b3b..02176a40 100644 --- a/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py +++ b/libsys_airflow/dags/folio_finance/sul_fix_encumbrances.py @@ -18,7 +18,7 @@ "depends_on_past": False, "email_on_failure": True, "email_on_retry": False, - "retries": 1, + "retries": 0, "retry_delay": timedelta(minutes=1), } @@ -34,7 +34,6 @@ ), start_date=datetime(2024, 8, 29), catchup=False, - retries=0, tags=["folio"], params={ "choice": Param(