diff --git a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py index 141801e8fa..b69f63ae24 100644 --- a/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py @@ -597,142 +597,142 @@ def parse_and_upload( def parse_and_validate( hour: RTHourlyAggregation, jar_path: Path, - tmp_dir: str, verbose: bool = False, pbar=None, ) -> List[RTFileProcessingOutcome]: - with sentry_sdk.push_scope() as scope: - scope.set_tag("config_feed_type", hour.first_extract.config.feed_type) - scope.set_tag("config_name", hour.first_extract.config.name) - scope.set_tag("config_url", hour.first_extract.config.url) - scope.set_context("RT Hourly Aggregation", json.loads(hour.json())) - - fs = get_fs() - dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/" - get_with_retry( - fs, - rpath=[ - extract.path - for extract in hour.local_paths_to_extract(dst_path_rt).values() - ], - lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()), - ) + with tempfile.TemporaryDirectory() as tmp_dir: + with sentry_sdk.push_scope() as scope: + scope.set_tag("config_feed_type", hour.first_extract.config.feed_type) + scope.set_tag("config_name", hour.first_extract.config.name) + scope.set_tag("config_url", hour.first_extract.config.url) + scope.set_context("RT Hourly Aggregation", json.loads(hour.json())) + + fs = get_fs() + dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/" + get_with_retry( + fs, + rpath=[ + extract.path + for extract in hour.local_paths_to_extract(dst_path_rt).values() + ], + lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()), + ) - if hour.step == RTProcessingStep.validate: - if not hour.extracts[0].config.schedule_url_for_validation: - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=NoScheduleDataSpecified(), + if hour.step == RTProcessingStep.validate: + if not hour.extracts[0].config.schedule_url_for_validation: + return [ + RTFileProcessingOutcome( + step=hour.step, + success=False, + extract=extract, + exception=NoScheduleDataSpecified(), + ) + for extract in hour.extracts + ] + + try: + first_extract = hour.extracts[0] + extract_day = first_extract.dt + for target_date in reversed( + list(extract_day - extract_day.subtract(days=7)) + ): # Fall back to most recent available schedule within 7 days + try: + schedule_extract = get_schedule_extracts_for_day( + target_date + )[first_extract.config.base64_validation_url] + + scope.set_context( + "Schedule Extract", json.loads(schedule_extract.json()) + ) + + gtfs_zip = download_gtfs_schedule_zip( + fs, + schedule_extract=schedule_extract, + dst_dir=tmp_dir, + pbar=pbar, + ) + + break + except (KeyError, FileNotFoundError): + print( + f"no schedule data found for {first_extract.path} and day {target_date}" + ) + else: + raise ScheduleDataNotFound( + f"no recent schedule data found for {first_extract.path}" + ) + + return validate_and_upload( + fs=fs, + jar_path=jar_path, + dst_path_rt=dst_path_rt, + tmp_dir=tmp_dir, + hour=hour, + gtfs_zip=gtfs_zip, + verbose=verbose, + pbar=pbar, ) - for extract in hour.extracts - ] - try: - first_extract = hour.extracts[0] - extract_day = first_extract.dt - for target_date in reversed( - list(extract_day - extract_day.subtract(days=7)) - ): # Fall back to most recent available schedule within 7 days - try: - schedule_extract = get_schedule_extracts_for_day(target_date)[ - first_extract.config.base64_validation_url - ] + # these are the only two types of errors we expect; let any others bubble up + except (ScheduleDataNotFound, subprocess.CalledProcessError) as e: + stderr = None + + fingerprint: List[Any] = [ + type(e), + # convert back to url manually, I don't want to mess around with the hourly class + base64.urlsafe_b64decode(hour.base64_url.encode()).decode(), + ] + if isinstance(e, subprocess.CalledProcessError): + fingerprint.append(e.returncode) + stderr = e.stderr.decode("utf-8") + # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above scope.set_context( - "Schedule Extract", json.loads(schedule_extract.json()) + "Process", {"stderr": e.stderr.decode("utf-8")[-2000:]} ) - gtfs_zip = download_gtfs_schedule_zip( - fs, - schedule_extract=schedule_extract, - dst_dir=tmp_dir, + # we could also use a custom exception for this + if "Unexpected end of ZLIB input stream" in stderr: + fingerprint.append("Unexpected end of ZLIB input stream") + + scope.fingerprint = fingerprint + sentry_sdk.capture_exception(e, scope=scope) + + if verbose: + log( + f"{str(e)} thrown for {hour.path}", + fg=typer.colors.RED, pbar=pbar, ) - - break - except (KeyError, FileNotFoundError): - print( - f"no schedule data found for {first_extract.path} and day {target_date}" + if isinstance(e, subprocess.CalledProcessError): + log( + e.stderr.decode("utf-8"), + fg=typer.colors.YELLOW, + pbar=pbar, + ) + + return [ + RTFileProcessingOutcome( + step=hour.step, + success=False, + extract=extract, + exception=e, + process_stderr=stderr, ) - else: - raise ScheduleDataNotFound( - f"no recent schedule data found for {first_extract.path}" - ) + for extract in hour.extracts + ] - return validate_and_upload( + if hour.step == RTProcessingStep.parse: + return parse_and_upload( fs=fs, - jar_path=jar_path, dst_path_rt=dst_path_rt, tmp_dir=tmp_dir, hour=hour, - gtfs_zip=gtfs_zip, verbose=verbose, pbar=pbar, ) - # these are the only two types of errors we expect; let any others bubble up - except (ScheduleDataNotFound, subprocess.CalledProcessError) as e: - stderr = None - - fingerprint: List[Any] = [ - type(e), - # convert back to url manually, I don't want to mess around with the hourly class - base64.urlsafe_b64decode(hour.base64_url.encode()).decode(), - ] - if isinstance(e, subprocess.CalledProcessError): - fingerprint.append(e.returncode) - stderr = e.stderr.decode("utf-8") - - # get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above - scope.set_context( - "Process", {"stderr": e.stderr.decode("utf-8")[-2000:]} - ) - - # we could also use a custom exception for this - if "Unexpected end of ZLIB input stream" in stderr: - fingerprint.append("Unexpected end of ZLIB input stream") - - scope.fingerprint = fingerprint - sentry_sdk.capture_exception(e, scope=scope) - - if verbose: - log( - f"{str(e)} thrown for {hour.path}", - fg=typer.colors.RED, - pbar=pbar, - ) - if isinstance(e, subprocess.CalledProcessError): - log( - e.stderr.decode("utf-8"), - fg=typer.colors.YELLOW, - pbar=pbar, - ) - - return [ - RTFileProcessingOutcome( - step=hour.step, - success=False, - extract=extract, - exception=e, - process_stderr=stderr, - ) - for extract in hour.extracts - ] - - if hour.step == RTProcessingStep.parse: - return parse_and_upload( - fs=fs, - dst_path_rt=dst_path_rt, - tmp_dir=tmp_dir, - hour=hour, - verbose=verbose, - pbar=pbar, - ) - - raise RuntimeError("we should not be here") + raise RuntimeError("we should not be here") @app.command() @@ -836,37 +836,35 @@ def main( # gcfs does not seem to play nicely with multiprocessing right now, so use threads :( # https://github.com/fsspec/gcsfs/issues/379 - with tempfile.TemporaryDirectory() as tmp_dir: - with ThreadPoolExecutor(max_workers=threads) as pool: - futures: Dict[Future, RTHourlyAggregation] = { - pool.submit( - parse_and_validate, - hour=hour, - jar_path=jar_path, - tmp_dir=tmp_dir, - verbose=verbose, + with ThreadPoolExecutor(max_workers=threads) as pool: + futures: Dict[Future, RTHourlyAggregation] = { + pool.submit( + parse_and_validate, + hour=hour, + jar_path=jar_path, + verbose=verbose, + pbar=pbar, + ): hour + for hour in aggregations_to_process + } + + for future in concurrent.futures.as_completed(futures): + hour = futures[future] + if pbar: + pbar.update(1) + try: + outcomes.extend(future.result()) + except KeyboardInterrupt: + raise + except Exception as e: + log( + f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}", + err=True, + fg=typer.colors.RED, pbar=pbar, - ): hour - for hour in aggregations_to_process - } - - for future in concurrent.futures.as_completed(futures): - hour = futures[future] - if pbar: - pbar.update(1) - try: - outcomes.extend(future.result()) - except KeyboardInterrupt: - raise - except Exception as e: - log( - f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}", - err=True, - fg=typer.colors.RED, - pbar=pbar, - ) - sentry_sdk.capture_exception(e) - exceptions.append((e, hour.path, traceback.format_exc())) + ) + sentry_sdk.capture_exception(e) + exceptions.append((e, hour.path, traceback.format_exc())) if pbar: del pbar diff --git a/jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py b/jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py index ba5d612808..c6ab80c937 100644 --- a/jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py +++ b/jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py @@ -88,10 +88,9 @@ def test_no_vehicle_positions_for_date(): def test_no_vehicle_positions_for_url(): - base64url = "nope" result = runner.invoke( app, - ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", base64url], + ["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", "nope"], catch_exceptions=False, ) assert result.exit_code == 0