Skip to content

fix: Indexing optimization in structure tool #1338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions tools/structure/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ def run(
output_dir: str,
) -> None:
prompt_registry_id: str = settings[SettingsKeys.PROMPT_REGISTRY_ID]
enable_challenge: bool = settings.get(SettingsKeys.ENABLE_CHALLENGE, False)
summarize_as_source: bool = settings.get(SettingsKeys.SUMMARIZE_AS_SOURCE, False)
single_pass_extraction_mode: bool = settings.get(
is_challenge_enabled: bool = settings.get(SettingsKeys.ENABLE_CHALLENGE, False)
is_summarization_enabled: bool = settings.get(
SettingsKeys.SUMMARIZE_AS_SOURCE, False
)
is_single_pass_enabled: bool = settings.get(
SettingsKeys.SINGLE_PASS_EXTRACTION_MODE, False
)
challenge_llm: str = settings.get(SettingsKeys.CHALLENGE_LLM_ADAPTER_ID, "")
enable_highlight: bool = settings.get(SettingsKeys.ENABLE_HIGHLIGHT, False)
is_highlight_enabled: bool = settings.get(SettingsKeys.ENABLE_HIGHLIGHT, False)
responder: PromptTool = PromptTool(
tool=self,
prompt_port=self.get_env_or_die(SettingsKeys.PROMPT_PORT),
Expand Down Expand Up @@ -97,14 +99,12 @@ def run(
tool_settings = tool_metadata[SettingsKeys.TOOL_SETTINGS]
outputs = tool_metadata[SettingsKeys.OUTPUTS]
tool_settings[SettingsKeys.CHALLENGE_LLM] = challenge_llm
tool_settings[SettingsKeys.ENABLE_CHALLENGE] = enable_challenge
tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION] = (
single_pass_extraction_mode
)
tool_settings[SettingsKeys.SUMMARIZE_AS_SOURCE] = summarize_as_source
tool_settings[SettingsKeys.ENABLE_HIGHLIGHT] = enable_highlight
tool_settings[SettingsKeys.ENABLE_CHALLENGE] = is_challenge_enabled
tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION] = is_single_pass_enabled
tool_settings[SettingsKeys.SUMMARIZE_AS_SOURCE] = is_summarization_enabled
tool_settings[SettingsKeys.ENABLE_HIGHLIGHT] = is_highlight_enabled
_, file_name = os.path.split(input_file)
if summarize_as_source:
if is_summarization_enabled:
file_name = SettingsKeys.SUMMARIZE
tool_data_dir = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR))
execution_run_data_folder = Path(self.get_env_or_die(ToolEnv.EXECUTION_DATA_DIR))
Expand All @@ -130,7 +130,7 @@ def run(
usage_kwargs[UsageKwargs.EXECUTION_ID] = self.execution_id
extracted_text = STHelper.dynamic_extraction(
file_path=input_file,
enable_highlight=enable_highlight,
enable_highlight=is_highlight_enabled,
usage_kwargs=usage_kwargs,
run_id=self.file_execution_id,
tool_settings=tool_settings,
Expand All @@ -140,7 +140,7 @@ def run(
)

summarize_file_hash = None
if summarize_as_source:
if is_summarization_enabled:
summarize_file_path, summarize_file_hash = self._summarize_and_index(
tool_settings=tool_settings,
tool_data_dir=tool_data_dir,
Expand All @@ -150,54 +150,70 @@ def run(
)
payload[SettingsKeys.FILE_HASH] = summarize_file_hash
payload[SettingsKeys.FILE_PATH] = summarize_file_path

if tool_settings[SettingsKeys.ENABLE_SINGLE_PASS_EXTRACTION]:
# Since indexing is not involved for summary
index_metrics = {"time_taken(s)": 0}
elif is_single_pass_enabled:
self.stream_log("Fetching response for single pass extraction...")
# Since indexing is not involved for single pass
index_metrics = {"time_taken(s)": 0}
structured_output = responder.single_pass_extraction(
payload=payload,
)
else:
# To reindex even if file is already
# indexed to get the output in required path
reindex = True
index_metrics = {}
# Track seen parameter combinations to avoid duplicate indexing
seen_params = set()

for output in outputs:
if summarize_as_source:
# Since indexing is not involved for summary
index_metrics[output[SettingsKeys.NAME]] = {"time_taken(s)": 0}
break
if (reindex or not summarize_as_source) and output[
SettingsKeys.CHUNK_SIZE
] != 0:
# Get current parameter combination
chunk_size = output[SettingsKeys.CHUNK_SIZE]
chunk_overlap = output[SettingsKeys.CHUNK_OVERLAP]
vector_db = tool_settings[SettingsKeys.VECTOR_DB]
embedding = tool_settings[SettingsKeys.EMBEDDING]
x2text = tool_settings[SettingsKeys.X2TEXT_ADAPTER]

# Create a unique key for this parameter combination
param_key = (
f"chunk_size={chunk_size}_"
f"chunk_overlap={chunk_overlap}_"
f"vector_db={vector_db}_"
f"embedding={embedding}_"
f"x2text={x2text}"
)

# Only process if we haven't seen this combination yet and chunk_size is not zero
if chunk_size != 0 and param_key not in seen_params:
seen_params.add(param_key)

indexing_start_time = datetime.datetime.now()
self.stream_log(
f"Indexing document with chunk size '{output[SettingsKeys.CHUNK_SIZE]}' and overlap '{output[SettingsKeys.CHUNK_OVERLAP]}'"
f"Indexing document with: chunk_size={chunk_size}, "
f"chunk_overlap={chunk_overlap}, vector_db={vector_db}, "
f"embedding={embedding}, x2text={x2text}"
)

STHelper.dynamic_indexing(
tool_settings=tool_settings,
run_id=self.file_execution_id,
file_path=tool_data_dir / SettingsKeys.EXTRACT,
tool=self,
execution_run_data_folder=str(execution_run_data_folder),
chunk_overlap=output[SettingsKeys.CHUNK_OVERLAP],
reindex=reindex,
chunk_overlap=chunk_overlap,
reindex=True,
usage_kwargs=usage_kwargs,
enable_highlight=enable_highlight,
chunk_size=output[SettingsKeys.CHUNK_SIZE],
enable_highlight=is_highlight_enabled,
chunk_size=chunk_size,
tool_id=tool_metadata[SettingsKeys.TOOL_ID],
file_hash=file_hash,
extracted_text=extracted_text,
)

index_metrics[output[SettingsKeys.NAME]] = {
SettingsKeys.INDEXING: {
"time_taken(s)": STHelper.elapsed_time(
start_time=indexing_start_time
)
}
}
reindex = False

for output in outputs:
if SettingsKeys.TABLE_SETTINGS in output:
Expand All @@ -207,6 +223,7 @@ def run(
)
table_settings[SettingsKeys.INPUT_FILE] = extracted_input_file
table_settings[SettingsKeys.IS_DIRECTORY_MODE] = is_directory_mode
self.stream_log(f"Performing table extraction with: {table_settings}")
output.update({SettingsKeys.TABLE_SETTINGS: table_settings})

self.stream_log(f"Fetching responses for '{len(outputs)}' prompt(s)...")
Expand All @@ -220,7 +237,7 @@ def run(
self.source_file_name
)

if not summarize_as_source:
if not is_summarization_enabled:
metadata = structured_output[SettingsKeys.METADATA]
# Update the dictionary with modified metadata
structured_output[SettingsKeys.METADATA] = metadata
Expand Down