Skip to content

Commit d5ed349

Browse files
committed
Merge branch 'master' into ce-use-compound-datasets
2 parents 4ffab3c + e1e9134 commit d5ed349

File tree

88 files changed

+2125
-1232
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+2125
-1232
lines changed

.github/actions/prepare-breadbox/action.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ runs:
1212
- name: "Install and configure Poetry"
1313
uses: snok/install-poetry@v1
1414
with:
15-
version: 2.2.1
15+
version: 1.8.2
1616
virtualenvs-create: true
1717
virtualenvs-in-project: true
1818

breadbox-client/bump_version_and_publish.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/usr/bin/env python3
2+
# force build
23
"""
34
Breadbox Version Bumping and Publishing Script
45
=============================================
@@ -181,7 +182,8 @@ def publish():
181182

182183
# Build and publish the package
183184
print(" Building and publishing package...")
184-
subprocess.run(["poetry", "publish", "--build", "--repository", "public-python"], check=True)
185+
subprocess.run(["poetry", "build"], check=True)
186+
subprocess.run(["poetry", "publish", "--repository", "public-python"], check=True)
185187
print(" Package published successfully!")
186188
except Exception as e:
187189
print(f"Error publishing package: {str(e)}")

breadbox-client/pyproject.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,24 @@
11
[tool.poetry]
22
name = "breadbox-client"
3-
version = "4.3.11"
3+
version = "4.3.19"
44
description = "A client library for accessing Breadbox"
55

66
authors = []
77

88
readme = "README.md"
99
packages = [
10-
{include = "breadbox_client"},
1110
{include = "breadbox_facade"},
1211
]
13-
include = ["CHANGELOG.md", "breadbox_client/py.typed", "breadbox_client/**/*"]
12+
13+
include = [
14+
{ path = "CHANGELOG.md", format = ["sdist", "wheel"] },
15+
# because the breadbox_client directory is generated, it's listed in .gitignore
16+
# however, this causes poetry to also ignore these files. There doesn't seem to
17+
# be a way to override that behavior so explictly tell it to add all the files
18+
# to the wheel and sdist files
19+
{ path = "breadbox_client/py.typed", format = ["sdist", "wheel"] },
20+
{ path = "breadbox_client/**/*.py", format = ["sdist", "wheel"] }
21+
]
1422

1523
[tool.poetry.dependencies]
1624
python = "^3.9"

breadbox/breadbox/compute/celery.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import psutil
22
from celery import Celery, Task, signals
33
import os
4+
import uuid
45
from logging import getLogger
56
from fastapi import HTTPException
67

78
from breadbox.logging import GCPExceptionReporter
89
from breadbox.celery_task.utils import check_celery
10+
from breadbox.utils.debug_event_log import log_event, _get_log_filename
911

1012
from ..config import Settings, get_settings
1113
from pydantic import ValidationError
@@ -57,32 +59,6 @@ def _get_rss():
5759
return process.memory_info().rss
5860

5961

60-
# from typing import Optional, Any, Dict
61-
#
62-
# @signals.task_prerun.connect
63-
# def task_prerun_handler(
64-
# sender:Optional[Any] =None, task_id : Optional[str] =None, task : Optional[Task]=None, args:Optional[Any]=None, kwargs:Optional[Dict[str, Any]]=None, **extras
65-
# ):
66-
# print(
67-
# f"[BEFORE] Running task {sender.name} rss:{_get_rss()} ({task_id}) with args={args}, kwargs={kwargs}"
68-
# )
69-
#
70-
#
71-
# @signals.task_postrun.connect
72-
# def task_postrun_handler(
73-
# sender:Optional[Any]=None,
74-
# task_id : Optional[str]=None,
75-
# task: Optional[Task]=None,
76-
# args:Optional[Any] = None,
77-
# kwargs:Optional[Dict[str, Any]] =None,
78-
# retval:Any = None,
79-
# state : Any=None,
80-
# **extras,
81-
# ):
82-
# print(
83-
# f"[AFTER] Finished task {sender.name} rss:{_get_rss()} ({task_id}) with result={retval}, state={state}"
84-
# )
85-
8662
try:
8763
settings = get_settings()
8864
except ValidationError:
@@ -107,5 +83,33 @@ def _get_rss():
10783
)
10884
app.conf.update(**storage_configuration) # pyright: ignore
10985

86+
# Set up task logging using Celery signals
87+
@signals.task_prerun.connect
88+
def task_prerun_handler(task_id, task, *args, **kwargs):
89+
log_filename = _get_log_filename()
90+
if log_filename:
91+
# Generate a readable task name
92+
task_name = task.name if hasattr(task, "name") else str(task)
93+
# Log task start
94+
log_event(log_filename, "start", task_id, {"n": f"Task {task_name}"})
95+
96+
97+
@signals.task_success.connect
98+
def task_success_handler(result, **kwargs):
99+
log_filename = _get_log_filename()
100+
if log_filename:
101+
task_id = kwargs["sender"].request.id
102+
# Log task success
103+
log_event(log_filename, "end", task_id, {"s": "success"})
104+
105+
106+
@signals.task_failure.connect
107+
def task_failure_handler(task_id, exception, **kwargs):
108+
log_filename = _get_log_filename()
109+
if log_filename:
110+
# Log task failure
111+
log_event(log_filename, "end", task_id, {"s": "error", "e": str(exception)})
112+
113+
110114
if __name__ == "__main__":
111115
app.start()

breadbox/breadbox/service/search.py

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from typing import Dict, List
44

55
import pandas as pd
6+
from sqlalchemy import insert
7+
68
from ..crud.metadata import cast_tabular_cell_value_type
79
from ..crud import dimension_types as types_crud
810

@@ -33,11 +35,12 @@ class PropertyValuePair:
3335
class MetadataCacheEntry:
3436
properties_to_index_df: pd.DataFrame
3537
columns_metadata: Dict[str, ColumnMetadata]
36-
label_by_given_id: Dict[str, str]
37-
rows_by_index: Dict[str, Dict[str, str]]
38+
39+
def get_label_for_given_id(self, given_id):
40+
return self.properties_to_index_df.loc[given_id, "label"]
3841

3942
def get_properties_dict(self, given_id: str):
40-
return self.rows_by_index.get(given_id)
43+
return self.properties_to_index_df.loc[given_id].to_dict()
4144

4245

4346
class MetadataCache:
@@ -74,19 +77,13 @@ def get(self, dimension_type_name: str):
7477

7578
columns_metadata = dict(dimension_type.dataset.columns_metadata)
7679

77-
label_by_given_id = get_dimension_type_metadata_col(
78-
self.db, dimension_type_name=dimension_type.name, col_name="label"
79-
)
80-
8180
rows_by_index = {}
8281
for record in properties_to_index_df.to_records():
8382
rows_by_index[record.index] = record
8483

8584
entry = MetadataCacheEntry(
8685
properties_to_index_df=properties_to_index_df,
8786
columns_metadata=columns_metadata,
88-
label_by_given_id=label_by_given_id,
89-
rows_by_index=rows_by_index,
9087
)
9188
self.cache[dimension_type_name] = entry
9289

@@ -140,36 +137,46 @@ def refresh_search_index_for_dimension_type(
140137
_delete_search_index_records(db, dimension_type)
141138
log.info("_delete_search_index_records complete")
142139

143-
dimension_search_index_rows = []
144-
145140
cache_entry = metadata_cache.get(dimension_type.name)
146141

147-
for given_id in cache_entry.properties_to_index_df.index:
148-
for record in get_property_value_pairs_for_given_id(
149-
db=db,
150-
dimension_type_name=dimension_type.name,
151-
given_id=given_id,
152-
metadata_cache=metadata_cache,
153-
):
154-
# if given_id in cache_entry.dimension_id_by_given_id:
155-
dimension_search_index_rows.append(
156-
DimensionSearchIndex(
157-
# dimension_id=cache_entry.dimension_id_by_given_id[given_id],
142+
def row_generator():
143+
for given_id in cache_entry.properties_to_index_df.index:
144+
for record in get_property_value_pairs_for_given_id(
145+
db=db,
146+
dimension_type_name=dimension_type.name,
147+
given_id=given_id,
148+
metadata_cache=metadata_cache,
149+
):
150+
# if given_id in cache_entry.dimension_id_by_given_id:
151+
yield dict(
158152
property=record.property,
159153
value=record.value,
160154
group_id=dimension_type.dataset.group_id,
161155
dimension_type_name=dimension_type.name,
162156
dimension_given_id=given_id,
163-
label=cache_entry.label_by_given_id[given_id],
157+
label=cache_entry.get_label_for_given_id(given_id),
164158
)
165-
)
159+
160+
dimension_search_index_row_count = 0
161+
for batch in _make_batches(row_generator(), batch_size=1000):
162+
db.execute(insert(DimensionSearchIndex), batch)
163+
dimension_search_index_row_count += len(batch)
164+
f"Wrote batch of {len(batch)} search index records for {dimension_type_name}"
166165

167166
log.info(
168-
f"refresh_search_index_for_dimension_type generated {len(dimension_search_index_rows)} search index records for {len(cache_entry.properties_to_index_df.index)} rows in {dimension_type_name}. Writing..."
167+
f"Finished writing all {(dimension_search_index_row_count)} search index records for {len(cache_entry.properties_to_index_df.index)} rows in {dimension_type_name}"
169168
)
170169

171-
db.bulk_save_objects(dimension_search_index_rows)
172-
log.info("refresh_search_index_for_dimension_type complete")
170+
171+
def _make_batches(iterable, batch_size):
172+
batch = []
173+
for item in iterable:
174+
batch.append(item)
175+
if len(batch) >= batch_size:
176+
yield batch
177+
batch = []
178+
if len(batch) > 0:
179+
yield batch
173180

174181

175182
def _get_datatypes_referencing(db, dimension_type_name):

breadbox/breadbox/startup.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import os
22
import pathlib
3+
import time
4+
import uuid
35

46
from fastapi.routing import APIRouter
7+
from starlette.middleware.base import BaseHTTPMiddleware
8+
from starlette.types import ASGIApp, Receive, Scope, Send
9+
from .utils.debug_event_log import log_event, _get_log_filename
510

611
from fastapi import FastAPI
712
from fastapi.middleware.cors import CORSMiddleware
@@ -61,6 +66,9 @@ def create_app(settings: Settings):
6166
scheme_override=scheme_override,
6267
host_override=host_override,
6368
)
69+
70+
# Add request logging middleware
71+
app.add_middleware(RequestLoggingMiddleware)
6472

6573
return app
6674

@@ -70,6 +78,63 @@ def ensure_directories_exist(settings):
7078
os.mkdir(settings.filestore_location)
7179

7280

81+
class RequestLoggingMiddleware:
82+
def __init__(self, app: ASGIApp):
83+
self.app = app
84+
85+
async def __call__(self, scope: Scope, receive: Receive, send: Send):
86+
if scope["type"] != "http":
87+
await self.app(scope, receive, send)
88+
return
89+
90+
log_filename = _get_log_filename()
91+
if not log_filename:
92+
# If logging is disabled, just pass through
93+
await self.app(scope, receive, send)
94+
return
95+
96+
path = scope.get("path", "unknown")
97+
method = scope.get("method", "unknown")
98+
request_id = str(uuid.uuid4())
99+
100+
# Log request start
101+
span_name = f"{method} {path}"
102+
log_event(log_filename, "start", request_id, {"n": span_name})
103+
104+
# Capture the original send function to intercept the response
105+
original_send = send
106+
response_status = None
107+
108+
async def wrapped_send(message):
109+
nonlocal response_status
110+
111+
if message["type"] == "http.response.start":
112+
# Capture the status code when headers are sent
113+
response_status = message.get("status", 0)
114+
115+
# Check if this is the final response body message
116+
is_final_message = (
117+
message["type"] == "http.response.body" and
118+
not message.get("more_body", False)
119+
)
120+
121+
# Pass through to the original send
122+
await original_send(message)
123+
124+
# If this is the final message, log the completion
125+
if is_final_message:
126+
log_event(log_filename, "end", request_id, {"s": response_status})
127+
128+
# Use the wrapped send function and catch any exceptions
129+
try:
130+
await self.app(scope, receive, wrapped_send)
131+
except Exception as e:
132+
# Log the exception as an end event
133+
log_event(log_filename, "end", request_id, {"s": 500, "error": str(e)})
134+
# Re-raise the exception to maintain the original behavior
135+
raise
136+
137+
73138
class OverrideMiddleWare:
74139
def __init__(self, app, scheme_override=None, host_override=None):
75140
self.app = app
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import os
2+
import datetime
3+
4+
import psutil
5+
import json
6+
7+
8+
def _get_log_filename():
9+
return os.environ.get("DEBUG_EVENT_LOG")
10+
11+
def log_event(filename, phase, op_id, context=None):
12+
process = psutil.Process(os.getpid())
13+
rss = process.memory_info().rss
14+
with open(filename, "a") as f:
15+
record = {
16+
"p": phase,
17+
"id": op_id,
18+
"t": datetime.datetime.now().isoformat(timespec="seconds"),
19+
"m": rss,
20+
}
21+
if context:
22+
record.update(context)
23+
f.write(json.dumps(record) + "\n")

breadbox/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "breadbox"
3-
version = "4.3.11"
3+
version = "4.3.19"
44
description = "A peristent service that stores and fetches datasets"
55
authors = ["Jessica Cheng <[email protected]>"]
66
packages = [{include = "breadbox"}]

0 commit comments

Comments
 (0)