Skip to content

Commit

Permalink
Analyst sees fewer ZLIB errors
Browse files Browse the repository at this point in the history
* This commit resolves a race condition where all parse_and_validate calls shared the same temporary directory
* That contention meant that processes would overwrite the existing GTFS schedule with the same name
* This also resulted in an elevated number of skipped protobuf validations
* The gtfs-realtime-validator skips protobufs with the same MD5
* The race condition caused elevated MD5 collisions for protobufs

Signed-off-by: Doc Ritezel <[email protected]>
  • Loading branch information
erikamov authored and ohrite committed Oct 26, 2024
1 parent 97abf70 commit eb58b29
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 146 deletions.
286 changes: 142 additions & 144 deletions jobs/gtfs-rt-parser-v2/gtfs_rt_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,142 +597,142 @@ def parse_and_upload(
def parse_and_validate(
hour: RTHourlyAggregation,
jar_path: Path,
tmp_dir: str,
verbose: bool = False,
pbar=None,
) -> List[RTFileProcessingOutcome]:
with sentry_sdk.push_scope() as scope:
scope.set_tag("config_feed_type", hour.first_extract.config.feed_type)
scope.set_tag("config_name", hour.first_extract.config.name)
scope.set_tag("config_url", hour.first_extract.config.url)
scope.set_context("RT Hourly Aggregation", json.loads(hour.json()))

fs = get_fs()
dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/"
get_with_retry(
fs,
rpath=[
extract.path
for extract in hour.local_paths_to_extract(dst_path_rt).values()
],
lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()),
)
with tempfile.TemporaryDirectory() as tmp_dir:
with sentry_sdk.push_scope() as scope:
scope.set_tag("config_feed_type", hour.first_extract.config.feed_type)
scope.set_tag("config_name", hour.first_extract.config.name)
scope.set_tag("config_url", hour.first_extract.config.url)
scope.set_context("RT Hourly Aggregation", json.loads(hour.json()))

fs = get_fs()
dst_path_rt = f"{tmp_dir}/rt_{hour.name_hash}/"
get_with_retry(
fs,
rpath=[
extract.path
for extract in hour.local_paths_to_extract(dst_path_rt).values()
],
lpath=list(hour.local_paths_to_extract(dst_path_rt).keys()),
)

if hour.step == RTProcessingStep.validate:
if not hour.extracts[0].config.schedule_url_for_validation:
return [
RTFileProcessingOutcome(
step=hour.step,
success=False,
extract=extract,
exception=NoScheduleDataSpecified(),
if hour.step == RTProcessingStep.validate:
if not hour.extracts[0].config.schedule_url_for_validation:
return [
RTFileProcessingOutcome(
step=hour.step,
success=False,
extract=extract,
exception=NoScheduleDataSpecified(),
)
for extract in hour.extracts
]

try:
first_extract = hour.extracts[0]
extract_day = first_extract.dt
for target_date in reversed(
list(extract_day - extract_day.subtract(days=7))
): # Fall back to most recent available schedule within 7 days
try:
schedule_extract = get_schedule_extracts_for_day(
target_date
)[first_extract.config.base64_validation_url]

scope.set_context(
"Schedule Extract", json.loads(schedule_extract.json())
)

gtfs_zip = download_gtfs_schedule_zip(
fs,
schedule_extract=schedule_extract,
dst_dir=tmp_dir,
pbar=pbar,
)

break
except (KeyError, FileNotFoundError):
print(
f"no schedule data found for {first_extract.path} and day {target_date}"
)
else:
raise ScheduleDataNotFound(
f"no recent schedule data found for {first_extract.path}"
)

return validate_and_upload(
fs=fs,
jar_path=jar_path,
dst_path_rt=dst_path_rt,
tmp_dir=tmp_dir,
hour=hour,
gtfs_zip=gtfs_zip,
verbose=verbose,
pbar=pbar,
)
for extract in hour.extracts
]

try:
first_extract = hour.extracts[0]
extract_day = first_extract.dt
for target_date in reversed(
list(extract_day - extract_day.subtract(days=7))
): # Fall back to most recent available schedule within 7 days
try:
schedule_extract = get_schedule_extracts_for_day(target_date)[
first_extract.config.base64_validation_url
]
# these are the only two types of errors we expect; let any others bubble up
except (ScheduleDataNotFound, subprocess.CalledProcessError) as e:
stderr = None

fingerprint: List[Any] = [
type(e),
# convert back to url manually, I don't want to mess around with the hourly class
base64.urlsafe_b64decode(hour.base64_url.encode()).decode(),
]
if isinstance(e, subprocess.CalledProcessError):
fingerprint.append(e.returncode)
stderr = e.stderr.decode("utf-8")

# get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above
scope.set_context(
"Schedule Extract", json.loads(schedule_extract.json())
"Process", {"stderr": e.stderr.decode("utf-8")[-2000:]}
)

gtfs_zip = download_gtfs_schedule_zip(
fs,
schedule_extract=schedule_extract,
dst_dir=tmp_dir,
# we could also use a custom exception for this
if "Unexpected end of ZLIB input stream" in stderr:
fingerprint.append("Unexpected end of ZLIB input stream")

scope.fingerprint = fingerprint
sentry_sdk.capture_exception(e, scope=scope)

if verbose:
log(
f"{str(e)} thrown for {hour.path}",
fg=typer.colors.RED,
pbar=pbar,
)

break
except (KeyError, FileNotFoundError):
print(
f"no schedule data found for {first_extract.path} and day {target_date}"
if isinstance(e, subprocess.CalledProcessError):
log(
e.stderr.decode("utf-8"),
fg=typer.colors.YELLOW,
pbar=pbar,
)

return [
RTFileProcessingOutcome(
step=hour.step,
success=False,
extract=extract,
exception=e,
process_stderr=stderr,
)
else:
raise ScheduleDataNotFound(
f"no recent schedule data found for {first_extract.path}"
)
for extract in hour.extracts
]

return validate_and_upload(
if hour.step == RTProcessingStep.parse:
return parse_and_upload(
fs=fs,
jar_path=jar_path,
dst_path_rt=dst_path_rt,
tmp_dir=tmp_dir,
hour=hour,
gtfs_zip=gtfs_zip,
verbose=verbose,
pbar=pbar,
)

# these are the only two types of errors we expect; let any others bubble up
except (ScheduleDataNotFound, subprocess.CalledProcessError) as e:
stderr = None

fingerprint: List[Any] = [
type(e),
# convert back to url manually, I don't want to mess around with the hourly class
base64.urlsafe_b64decode(hour.base64_url.encode()).decode(),
]
if isinstance(e, subprocess.CalledProcessError):
fingerprint.append(e.returncode)
stderr = e.stderr.decode("utf-8")

# get the end of stderr, just enough to fit in MAX_STRING_LENGTH defined above
scope.set_context(
"Process", {"stderr": e.stderr.decode("utf-8")[-2000:]}
)

# we could also use a custom exception for this
if "Unexpected end of ZLIB input stream" in stderr:
fingerprint.append("Unexpected end of ZLIB input stream")

scope.fingerprint = fingerprint
sentry_sdk.capture_exception(e, scope=scope)

if verbose:
log(
f"{str(e)} thrown for {hour.path}",
fg=typer.colors.RED,
pbar=pbar,
)
if isinstance(e, subprocess.CalledProcessError):
log(
e.stderr.decode("utf-8"),
fg=typer.colors.YELLOW,
pbar=pbar,
)

return [
RTFileProcessingOutcome(
step=hour.step,
success=False,
extract=extract,
exception=e,
process_stderr=stderr,
)
for extract in hour.extracts
]

if hour.step == RTProcessingStep.parse:
return parse_and_upload(
fs=fs,
dst_path_rt=dst_path_rt,
tmp_dir=tmp_dir,
hour=hour,
verbose=verbose,
pbar=pbar,
)

raise RuntimeError("we should not be here")
raise RuntimeError("we should not be here")


@app.command()
Expand Down Expand Up @@ -836,37 +836,35 @@ def main(
# gcfs does not seem to play nicely with multiprocessing right now, so use threads :(
# https://github.com/fsspec/gcsfs/issues/379

with tempfile.TemporaryDirectory() as tmp_dir:
with ThreadPoolExecutor(max_workers=threads) as pool:
futures: Dict[Future, RTHourlyAggregation] = {
pool.submit(
parse_and_validate,
hour=hour,
jar_path=jar_path,
tmp_dir=tmp_dir,
verbose=verbose,
with ThreadPoolExecutor(max_workers=threads) as pool:
futures: Dict[Future, RTHourlyAggregation] = {
pool.submit(
parse_and_validate,
hour=hour,
jar_path=jar_path,
verbose=verbose,
pbar=pbar,
): hour
for hour in aggregations_to_process
}

for future in concurrent.futures.as_completed(futures):
hour = futures[future]
if pbar:
pbar.update(1)
try:
outcomes.extend(future.result())
except KeyboardInterrupt:
raise
except Exception as e:
log(
f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}",
err=True,
fg=typer.colors.RED,
pbar=pbar,
): hour
for hour in aggregations_to_process
}

for future in concurrent.futures.as_completed(futures):
hour = futures[future]
if pbar:
pbar.update(1)
try:
outcomes.extend(future.result())
except KeyboardInterrupt:
raise
except Exception as e:
log(
f"WARNING: exception {type(e)} {str(e)} bubbled up to top for {hour.path}\n{traceback.format_exc()}",
err=True,
fg=typer.colors.RED,
pbar=pbar,
)
sentry_sdk.capture_exception(e)
exceptions.append((e, hour.path, traceback.format_exc()))
)
sentry_sdk.capture_exception(e)
exceptions.append((e, hour.path, traceback.format_exc()))

if pbar:
del pbar
Expand Down
3 changes: 1 addition & 2 deletions jobs/gtfs-rt-parser-v2/test_gtfs_rt_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,9 @@ def test_no_vehicle_positions_for_date():


def test_no_vehicle_positions_for_url():
base64url = "nope"
result = runner.invoke(
app,
["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", base64url],
["parse", "vehicle_positions", "2024-09-14T18:00:00", "--base64url", "nope"],
catch_exceptions=False,
)
assert result.exit_code == 0
Expand Down

0 comments on commit eb58b29

Please sign in to comment.