Skip to content

Commit 09d470f

Browse files
authoredSep 17, 2024··
Improve data ingestion (#28)
* Improve data ingestion * Remove langfuse secrets
1 parent 15e97af commit 09d470f

24 files changed

+3918
-147
lines changed
 

‎.vscode/launch.json

+3-4
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88
"type": "node",
99
"request": "launch",
1010
"name": "Slackbot: Debug",
11-
"cwd": "${workspaceFolder}/services/slackbot",
12-
"program": "${workspaceFolder}/node_modules/ts-node/dist/bin.js",
13-
"args": ["${workspaceFolder}/services/slackbot/src/app.ts"],
14-
"envFile": "${workspaceFolder}/services/slackbot/.env"
11+
"cwd": "${workspaceFolder}",
12+
"program": "${workspaceFolder}/node_modules/.bin/nx",
13+
"args": ["dev", "slackbot"]
1514
},
1615
{
1716
"type": "node",

‎TROUBLESHOOTING.md

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ This error usually happens when the Slack keys (`SLACK_BOT_TOKEN`, `SLACK_APP_TO
2626

2727
If they are correct, try to restart the `slackbot` service by running `docker compose up slackbot -d`. Sometimes users update `.env` but do not restart the service itself, which causing it to take out-dated variables.
2828

29+
### `429: Too Many Requests`
30+
31+
This error might appear in LiteLLM's logs. It's a bit misleading, and most of the times it means you don't have enough credits left. Go to your LLM provider (OpenAI, Anthropic, etc.) and check your credits.
32+
2933
### Environment variabels are out-dated
3034

3135
If you use VSC Code, sometimes it loads environment variables from the `.env` file automatically. In most cases, it happens because of the python extension. In our `settings.json`, we set `"python.envFile": ""` which shoud prevent that. However, if that doesn't work, try to run the project from a separate terminal (not VS Code).

‎changes.patch

+2,242
Large diffs are not rendered by default.

‎services/api/src/agent/tools/static/semantic_search.ts

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ export default async function (context: RunContext) {
8888
title = "PagerDuty Alert";
8989
break;
9090
}
91+
case "Jira":
9192
case "Confluence": {
9293
url = document.metadata.url;
9394
title = document.metadata.title;

‎services/data-processor/src/build.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -42,29 +42,29 @@ async def build_index(
4242

4343
store = get_vector_store(index_name, index_type)
4444

45-
try:
46-
if await store.is_index_live():
47-
print("Index exists. Delete old one...")
48-
await store.delete_index()
49-
except Exception as e:
50-
print("Could not delete index", e)
51-
print("Trying to move forward")
52-
await store.create_index()
45+
if not await store.is_index_live():
46+
await store.create_index()
5347

5448
async def update_status(vendor_name: str, status: str):
5549
await db.index.update_one(
5650
{"_id": index_id},
5751
{"$set": {f"state.integrations.{vendor_name}": status}},
5852
)
5953

54+
vector_store = store.get_llama_index_store()
6055
documents, stats = await get_documents(
56+
index=index,
57+
vector_store=vector_store,
6158
organization_id=organization_id,
6259
data_sources=data_sources,
6360
on_progress=partial(update_status, status="in_progress"),
6461
on_complete=partial(update_status, status="completed"),
6562
)
63+
# Delete nodes of documents that are about to be re-indexed
64+
if len(documents) > 0:
65+
docs_to_delete = list(set([document.ref_doc_id for document in documents]))
66+
vector_store.delete(ref_doc_id=docs_to_delete)
6667

67-
vector_store = store.get_llama_index_store()
6868
storage_context = StorageContext.from_defaults(vector_store=vector_store)
6969
embed_model = LiteLLMEmbedding(
7070
api_base=litellm_url,

‎services/data-processor/src/loader.py

+96-21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import asyncio
3-
from typing import List, Optional
3+
from typing import List, Optional, Any
4+
from dateutil import parser
45
import numpy as np
56
from tqdm.auto import tqdm
67
from db.integrations import get_integrations_by_organization_id, populate_secrets
@@ -11,11 +12,86 @@
1112
Settings,
1213
)
1314

15+
from llama_index.core.schema import Document
16+
from llama_index.core.vector_stores.types import (
17+
BasePydanticVectorStore,
18+
VectorStoreQuery,
19+
MetadataFilters,
20+
MetadataFilter,
21+
FilterOperator,
22+
)
23+
24+
25+
async def filter_unchanged_documents(
26+
vector_store: BasePydanticVectorStore,
27+
documents: List[Document],
28+
):
29+
# Create a dictionary to group documents by ref_doc_id
30+
document_ids = [document.doc_id for document in documents]
31+
32+
result = vector_store.query(
33+
VectorStoreQuery(
34+
similarity_top_k=100000000000, # TODO: This is a hack to make sure we get all the documents
35+
filters=MetadataFilters(
36+
filters=[
37+
MetadataFilter(
38+
key="ref_doc_id",
39+
value=document_ids,
40+
operator=FilterOperator.IN,
41+
)
42+
]
43+
),
44+
)
45+
)
46+
db_nodes = result.nodes
47+
if len(db_nodes) == 0:
48+
return [], [], documents
49+
50+
db_nodes_groups = {}
51+
for db_node in db_nodes:
52+
ref_doc_id = db_node.ref_doc_id
53+
if ref_doc_id not in db_nodes_groups:
54+
db_nodes_groups[ref_doc_id] = []
55+
db_nodes_groups[ref_doc_id].append(db_node)
56+
57+
new_documents = []
58+
unchanged_documents = []
59+
changed_documents = []
60+
61+
for document in documents:
62+
# At the moment, if the document doesn't have an updated_at, we re-index it
63+
if not document.metadata.get("updated_at"):
64+
changed_documents.append(document)
65+
continue
66+
67+
document_id = document.doc_id
68+
document_nodes = db_nodes_groups.get(document_id, [])
69+
if len(document_nodes) > 0:
70+
document_timestamp = parser.isoparse(document.metadata["updated_at"])
71+
node_timestamp = parser.isoparse(document_nodes[0].metadata["updated_at"])
72+
73+
# If the document's updated_at date is greater than the node's updated_at date, it means
74+
# the document has been updated, so we need to re-index it
75+
if document_timestamp > node_timestamp:
76+
changed_documents.append(document)
77+
else:
78+
unchanged_documents.append(document)
79+
else:
80+
new_documents.append(document)
81+
82+
print(f"Found {len(changed_documents)} changed documents")
83+
print(f"Found {len(unchanged_documents)} unchanged documents")
84+
print(f"Found {len(new_documents)} new documents")
85+
86+
return changed_documents, unchanged_documents, new_documents
87+
1488

1589
async def get_documents(
90+
index: Any,
91+
vector_store: BasePydanticVectorStore,
1692
organization_id: str,
1793
data_sources: Optional[List[str]] = None,
18-
total_limit: Optional[int] = 10000,
94+
total_limit: Optional[int] = 10000, # unused at the moment
1995
on_progress: Optional[callable] = None,
2096
on_complete: Optional[callable] = None,
2197
):
@@ -30,14 +106,11 @@ async def get_documents(
30106
vendor_names = [integration.vendor.name for integration in integrations]
31107
print(f"Found {len(integrations)} integrations: {vendor_names}")
32108

33-
# Calculate the limit per source
34-
limit_per_source = round(total_limit / len(integrations))
35-
36109
stats = {}
37-
documents = []
38-
39-
# Settings.transformations
110+
total_nodes = []
40111
progress_bar = tqdm(integrations)
112+
n_existing_nodes = index.get("stats") and sum(index["stats"].values()) or 0
113+
41114
for integration in progress_bar:
42115
vendor_name = integration.vendor.name
43116
if on_progress:
@@ -52,9 +125,13 @@ async def get_documents(
52125
# Loader might be an async code, so we need to await it
53126
try:
54127
if asyncio.iscoroutinefunction(loader):
55-
docs = await loader(integration)
128+
raw_docs = await loader(integration)
56129
else:
57-
docs = loader(integration)
130+
raw_docs = loader(integration)
131+
132+
changed_documents, unchanged_documents, new_documents = (
133+
await filter_unchanged_documents(vector_store, raw_docs)
134+
)
58135
except Exception as e:
59136
print(f"Could not load {vendor_name}. Error: {e}")
60137
continue
@@ -66,19 +143,17 @@ async def get_documents(
66143
num_cpus = os.cpu_count()
67144
num_workers = min(4, num_cpus) if num_cpus > 1 else 1
68145

69-
# counts = [len(doc.text) for doc in docs]
70-
# limit = np.percentile(counts, [99])[0]
71-
# docs = [doc for doc in docs if len(doc.text) < limit]
72-
docs = pipeline.run(documents=docs, num_workers=num_workers)
73-
74-
# Limit the number of documents per source
75-
# docs = docs[:limit_per_source]
146+
new_nodes = pipeline.run(documents=new_documents, num_workers=num_workers)
147+
changed_nodes = pipeline.run(
148+
documents=changed_documents, num_workers=num_workers
149+
)
150+
nodes = new_nodes + changed_nodes
76151

77-
print(f"Found {len(docs)} documents for {vendor_name}")
78-
documents.extend(docs)
79-
stats[integration.vendor.name] = len(docs)
152+
print(f"Found total of {len(raw_docs)} documents for {vendor_name}")
153+
total_nodes.extend(nodes)
154+
stats[integration.vendor.name] = n_existing_nodes + len(new_nodes)
80155

81156
if on_complete:
82157
await on_complete(vendor_name)
83158

84-
return documents, stats
159+
return total_nodes, stats

‎services/data-processor/src/loaders/confluence.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from collections import namedtuple
22
import os
33
import requests
4-
from loaders.raw_readers.confluence import ConfluenceReader
4+
from loaders.readers.confluence import ConfluenceReader
55
from atlassian import Confluence
66

77
from db.types import Integration

‎services/data-processor/src/loaders/github.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
from github import Github, Auth, GithubException
33

44
# from llama_index.core import SimpleDirectoryReader
5-
from llama_index.readers.github.repository.github_client import GithubClient
5+
from loaders.utils.github_client import GithubClient
66
from llama_index.readers.github import (
77
GitHubIssuesClient,
88
)
99
from db.types import Integration
10-
from loaders.raw_readers.github_repo import GithubRepositoryReader
11-
from loaders.raw_readers.github_issues import GitHubRepositoryIssuesReader
10+
from loaders.readers.github_repo import GithubRepositoryReader
11+
from loaders.readers.github_issues import GitHubRepositoryIssuesReader
1212

1313

1414
def get_repos(token: str, repos_to_sync=None):
@@ -70,6 +70,8 @@ async def fetch_github_documents(
7070
# # TODO: this can crash if the repo is huge, because of Github API Rate limit.
7171
# # Need to find a way to "wait" maybe or to filter garbage.
7272
code_client = GithubClient(token, fail_on_http_error=False, verbose=True)
73+
74+
# TODO: updated_at timestamp doesn't seem to work (our code treats same docs as new)
7375
loader = GithubRepositoryReader(
7476
github_client=code_client,
7577
owner=owner,

‎services/data-processor/src/loaders/jira.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
import requests
2-
from llama_index.readers.jira import JiraReader
2+
from datetime import datetime, timezone
3+
from dateutil import parser
4+
from loaders.readers.jira import JiraReader
35
from db.types import Integration
46

7+
JQL_QUERY = "issuetype is not EMPTY"
8+
59

610
def fetch_jira_documents(integration: Integration):
711
integration_type = integration.type
@@ -19,9 +23,7 @@ def fetch_jira_documents(integration: Integration):
1923
loader = JiraReader(
2024
Oauth2={"cloud_id": cloud_id, "api_token": access_token}
2125
)
22-
documents = loader.load_data(
23-
"issuetype is not EMPTY"
24-
) # This "should" fetch all issues
26+
documents = loader.load_data(JQL_QUERY) # This "should" fetch all issues
2527
total_documents.extend(documents)
2628
else:
2729
loader = JiraReader(
@@ -31,12 +33,24 @@ def fetch_jira_documents(integration: Integration):
3133
"server_url": integration.metadata["site_url"],
3234
}
3335
)
34-
documents = loader.load_data("issuetype is not EMPTY")
36+
documents = loader.load_data(JQL_QUERY)
3537
total_documents.extend(documents)
3638

3739
# Adding the global "source" metadata field
3840
for document in total_documents:
3941
document.metadata.pop("labels", None)
4042
document.metadata["source"] = "Jira"
4143

42-
return documents
44+
# Transform 'created_at' and 'updated_at' to UTC with milliseconds
45+
created_at = parser.isoparse(document.metadata["created_at"])
46+
updated_at = parser.isoparse(document.metadata["updated_at"])
47+
document.metadata["created_at"] = (
48+
created_at.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
49+
+ "Z"
50+
)
51+
document.metadata["updated_at"] = (
52+
updated_at.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
53+
+ "Z"
54+
)
55+
56+
return total_documents

‎services/data-processor/src/loaders/notion.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from db.types import Integration
22
from notion_client import Client
3-
from llama_index.readers.notion import NotionPageReader
3+
from loaders.readers.notion import NotionPageReader
44

55

66
def fetch_notion_documents(integration: Integration):
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,11 @@
11
from db.types import Integration
2-
import httpx
3-
from llama_index.core import Document
4-
5-
INCIDENT_TEXT_TEMPLATE = """
6-
Incident title: {title}
7-
Incident description: {description}
8-
Incident summary: {summary}
9-
Incident status: {status}
10-
Service name: {service_name}
11-
Created at: {created_at}
12-
"""
13-
14-
15-
async def get_incidents(integration: Integration):
16-
access_token = integration.credentials["access_token"]
17-
integration_type = integration.type
18-
headers = {}
19-
if integration_type == "basic":
20-
headers["Authorization"] = f"Token token={access_token}"
21-
elif integration_type == "oauth":
22-
headers["Authorization"] = f"Bearer {access_token}"
23-
else:
24-
raise ValueError(f"Invalid integration type: {integration_type}")
25-
26-
limit = 100
27-
offset = 0
28-
resolved_incidents = []
29-
while True:
30-
async with httpx.AsyncClient() as client:
31-
response = await client.get(
32-
"https://api.pagerduty.com/incidents",
33-
headers=headers,
34-
params={
35-
"date_range": "all",
36-
"statuses[]": "resolved",
37-
"limit": limit,
38-
"offset": offset,
39-
},
40-
)
41-
data = response.json()
42-
incidents = data["incidents"]
43-
resolved_incidents.extend(incidents)
44-
if not data["more"]:
45-
break
46-
offset += limit
47-
return resolved_incidents
2+
from loaders.readers.pagerduty import PagerDutyReader
483

494

505
async def fetch_pagerduty_documents(integration: Integration):
51-
incidents = await get_incidents(integration)
52-
53-
documents = []
54-
for incident in incidents:
55-
service = incident.get("service", {})
56-
service_name = service.get("summary", "Unknown")
57-
58-
text = INCIDENT_TEXT_TEMPLATE.format(
59-
title=incident["title"],
60-
description=incident["description"],
61-
summary=incident["summary"],
62-
status=incident["status"],
63-
service_name=service_name,
64-
created_at=incident["created_at"],
65-
)
66-
metadata = {
67-
"source": "PagerDuty",
68-
"id": incident["id"],
69-
"link": incident["html_url"],
70-
"status": incident["status"],
71-
"urgency": incident["urgency"],
72-
"service_id": service.get("id", "Unknown"),
73-
"first_trigger_log_entry_id": incident.get(
74-
"first_trigger_log_entry", {}
75-
).get("id", "Unknown"),
76-
"created_at": incident["created_at"],
77-
}
78-
79-
document = Document(doc_id=incident["id"], text=text, metadata=metadata)
80-
documents.append(document)
6+
access_token = integration.credentials["access_token"]
7+
token_type = integration.type
8+
loader = PagerDutyReader(access_token, token_type)
9+
documents = await loader.load_data()
8110

8211
return documents

‎services/data-processor/src/loaders/raw_readers/confluence.py ‎services/data-processor/src/loaders/readers/confluence.py

+6-4
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ def load_data(
163163
if not start:
164164
start = 0
165165

166+
expand = "body.export_view.value,version"
166167
pages: List = []
167168
if space_key:
168169
pages.extend(
@@ -172,7 +173,7 @@ def load_data(
172173
max_num_results=max_num_results,
173174
space=space_key,
174175
status=page_status,
175-
expand="body.export_view.value",
176+
expand=expand,
176177
content_type="page",
177178
)
178179
)
@@ -183,7 +184,7 @@ def load_data(
183184
cursor=cursor,
184185
cql=f'type="page" AND label="{label}"',
185186
max_num_results=max_num_results,
186-
expand="body.export_view.value",
187+
expand=expand,
187188
)
188189
)
189190
elif cql:
@@ -193,7 +194,7 @@ def load_data(
193194
cursor=cursor,
194195
cql=cql,
195196
max_num_results=max_num_results,
196-
expand="body.export_view.value",
197+
expand=expand,
197198
)
198199
)
199200
elif page_ids:
@@ -217,7 +218,7 @@ def load_data(
217218
self._get_data_with_retry(
218219
self.confluence.get_page_by_id,
219220
page_id=page_id,
220-
expand="body.export_view.value",
221+
expand=expand,
221222
)
222223
)
223224

@@ -342,6 +343,7 @@ def process_page(self, page, include_attachments, text_maker):
342343
"page_id": page["id"],
343344
"status": page["status"],
344345
"url": self.base_url + page["_links"]["webui"],
346+
"updated_at": page["version"]["when"],
345347
},
346348
)
347349

‎services/data-processor/src/loaders/raw_readers/github_issues.py ‎services/data-processor/src/loaders/readers/github_issues.py

+1
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ async def load_data(
183183
extra_info = {
184184
"state": issue["state"],
185185
"created_at": issue["created_at"],
186+
"updated_at": issue["updated_at"],
186187
# url is the API URL
187188
"url": issue["url"],
188189
# source is the HTML URL, more convenient for humans

‎services/data-processor/src/loaders/raw_readers/github_repo.py ‎services/data-processor/src/loaders/readers/github_repo.py

+6
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,10 @@ async def _recurse_tree(
446446
)
447447
return blobs_and_full_paths
448448

449+
async def _get_latest_commit(self, path) -> str:
450+
commits = await self._github_client.get_commits(self._owner, self._repo, path)
451+
return commits[0]
452+
449453
async def _generate_documents(
450454
self,
451455
blobs_and_paths: List[Tuple[GitTreeResponseModel.GitTreeObject, str]],
@@ -472,6 +476,7 @@ async def _generate_documents(
472476
documents = []
473477
async for blob_data, full_path in buffered_iterator:
474478
print_if_verbose(self._verbose, f"generating document for {full_path}")
479+
latest_commit = await self._get_latest_commit(full_path)
475480
assert (
476481
blob_data.encoding == "base64"
477482
), f"blob encoding {blob_data.encoding} not supported"
@@ -525,6 +530,7 @@ async def _generate_documents(
525530
"file_path": full_path,
526531
"file_name": full_path.split("/")[-1],
527532
"url": url,
533+
"updated_at": latest_commit.commit.author.date,
528534
},
529535
)
530536
documents.append(document)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
from typing import List, Optional, TypedDict
2+
3+
from llama_index.core.readers.base import BaseReader
4+
from llama_index.core.schema import Document
5+
6+
7+
class BasicAuth(TypedDict):
8+
email: str
9+
api_token: str
10+
server_url: str
11+
12+
13+
class Oauth2(TypedDict):
14+
cloud_id: str
15+
api_token: str
16+
17+
18+
class JiraReader(BaseReader):
19+
"""Jira reader. Reads data from Jira issues from passed query.
20+
21+
Args:
22+
Optional basic_auth:{
23+
"email": "email",
24+
"api_token": "token",
25+
"server_url": "server_url"
26+
}
27+
Optional oauth:{
28+
"cloud_id": "cloud_id",
29+
"api_token": "token"
30+
}
31+
"""
32+
33+
def __init__(
34+
self,
35+
email: Optional[str] = None,
36+
api_token: Optional[str] = None,
37+
server_url: Optional[str] = None,
38+
BasicAuth: Optional[BasicAuth] = None,
39+
Oauth2: Optional[Oauth2] = None,
40+
) -> None:
41+
from jira import JIRA
42+
43+
if email and api_token and server_url:
44+
if BasicAuth is None:
45+
BasicAuth = {}
46+
BasicAuth["email"] = email
47+
BasicAuth["api_token"] = api_token
48+
BasicAuth["server_url"] = server_url
49+
50+
if Oauth2:
51+
options = {
52+
"server": f"https://api.atlassian.com/ex/jira/{Oauth2['cloud_id']}",
53+
"headers": {"Authorization": f"Bearer {Oauth2['api_token']}"},
54+
}
55+
self.jira = JIRA(options=options)
56+
else:
57+
self.jira = JIRA(
58+
basic_auth=(BasicAuth["email"], BasicAuth["api_token"]),
59+
server=f"https://{BasicAuth['server_url']}",
60+
)
61+
62+
def load_data(self, query: str) -> List[Document]:
63+
relevant_issues = self.jira.search_issues(query)
64+
65+
issues = []
66+
67+
assignee = ""
68+
reporter = ""
69+
epic_key = ""
70+
epic_summary = ""
71+
epic_descripton = ""
72+
73+
for issue in relevant_issues:
74+
# Iterates through only issues and not epics
75+
if "parent" in (issue.raw["fields"]):
76+
if issue.fields.assignee:
77+
assignee = issue.fields.assignee.displayName
78+
79+
if issue.fields.reporter:
80+
reporter = issue.fields.reporter.displayName
81+
82+
if issue.raw["fields"]["parent"]["key"]:
83+
epic_key = issue.raw["fields"]["parent"]["key"]
84+
85+
if issue.raw["fields"]["parent"]["fields"]["summary"]:
86+
epic_summary = issue.raw["fields"]["parent"]["fields"]["summary"]
87+
88+
if issue.raw["fields"]["parent"]["fields"]["status"]["description"]:
89+
epic_descripton = issue.raw["fields"]["parent"]["fields"]["status"][
90+
"description"
91+
]
92+
93+
issues.append(
94+
Document(
95+
text=f"{issue.fields.summary} \n {issue.fields.description}",
96+
doc_id=issue.id,
97+
extra_info={
98+
"id": issue.id,
99+
"title": issue.fields.summary,
100+
"url": issue.permalink(),
101+
"created_at": issue.fields.created,
102+
"updated_at": issue.fields.updated,
103+
"labels": issue.fields.labels,
104+
"status": issue.fields.status.name,
105+
"assignee": assignee,
106+
"reporter": reporter,
107+
"project": issue.fields.project.name,
108+
"issue_type": issue.fields.issuetype.name,
109+
"priority": issue.fields.priority.name,
110+
"epic_key": epic_key,
111+
"epic_summary": epic_summary,
112+
"epic_description": epic_descripton,
113+
},
114+
)
115+
)
116+
117+
return issues
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
"""Notion reader."""
2+
3+
from datetime import datetime
4+
import os
5+
from typing import Any, Dict, List, Optional
6+
7+
import requests # type: ignore
8+
from llama_index.core.readers.base import BasePydanticReader
9+
from llama_index.core.schema import Document
10+
11+
INTEGRATION_TOKEN_NAME = "NOTION_INTEGRATION_TOKEN"
12+
BLOCK_CHILD_URL_TMPL = "https://api.notion.com/v1/blocks/{block_id}/children"
13+
DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}/query"
14+
SEARCH_URL = "https://api.notion.com/v1/search"
15+
16+
17+
def utc_to_iso(utc_time: str) -> datetime:
18+
return datetime.fromisoformat(utc_time.replace("Z", "+00:00"))
19+
20+
21+
# TODO: Notion DB reader coming soon!
22+
class NotionPageReader(BasePydanticReader):
23+
"""Notion Page reader.
24+
25+
Reads a set of Notion pages.
26+
27+
Args:
28+
integration_token (str): Notion integration token.
29+
30+
"""
31+
32+
is_remote: bool = True
33+
token: str
34+
headers: Dict[str, str]
35+
36+
def __init__(self, integration_token: Optional[str] = None) -> None:
37+
"""Initialize with parameters."""
38+
if integration_token is None:
39+
integration_token = os.getenv(INTEGRATION_TOKEN_NAME)
40+
if integration_token is None:
41+
raise ValueError(
42+
"Must specify `integration_token` or set environment "
43+
"variable `NOTION_INTEGRATION_TOKEN`."
44+
)
45+
46+
token = integration_token
47+
headers = {
48+
"Authorization": "Bearer " + token,
49+
"Content-Type": "application/json",
50+
"Notion-Version": "2022-06-28",
51+
}
52+
53+
super().__init__(token=token, headers=headers)
54+
55+
@classmethod
56+
def class_name(cls) -> str:
57+
"""Get the name identifier of the class."""
58+
return "NotionPageReader"
59+
60+
def _read_block(self, block_id: str, num_tabs: int = 0) -> str:
61+
"""Read a block."""
62+
done = False
63+
result_lines_arr = []
64+
cur_block_id = block_id
65+
most_recent_time = None
66+
while not done:
67+
block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
68+
query_dict: Dict[str, Any] = {}
69+
70+
res = requests.request(
71+
"GET", block_url, headers=self.headers, json=query_dict
72+
)
73+
data = res.json()
74+
75+
for result in data["results"]:
76+
result_type = result["type"]
77+
result_obj = result[result_type]
78+
79+
cur_result_text_arr = []
80+
if "rich_text" in result_obj:
81+
for rich_text in result_obj["rich_text"]:
82+
# skip if doesn't have text object
83+
if "text" in rich_text:
84+
text = rich_text["text"]["content"]
85+
prefix = "\t" * num_tabs
86+
cur_result_text_arr.append(prefix + text)
87+
88+
result_block_id = result["id"]
89+
has_children = result["has_children"]
90+
if has_children:
91+
children_text, _ = self._read_block(
92+
result_block_id, num_tabs=num_tabs + 1
93+
)
94+
cur_result_text_arr.append(children_text)
95+
96+
cur_result_text = "\n".join(cur_result_text_arr)
97+
result_lines_arr.append(cur_result_text)
98+
last_edited_time = result["last_edited_time"]
99+
100+
if most_recent_time is None or utc_to_iso(
101+
last_edited_time
102+
) > utc_to_iso(most_recent_time):
103+
most_recent_time = last_edited_time
104+
105+
if data["next_cursor"] is None:
106+
done = True
107+
break
108+
else:
109+
cur_block_id = data["next_cursor"]
110+
111+
block_text = "\n".join(result_lines_arr)
112+
113+
return block_text, most_recent_time
114+
115+
def read_page(self, page_id: str) -> str:
116+
"""Read a page."""
117+
return self._read_block(page_id)
118+
119+
def query_database(
120+
self, database_id: str, query_dict: Dict[str, Any] = {"page_size": 100}
121+
) -> List[str]:
122+
"""Get all the pages from a Notion database."""
123+
pages = []
124+
125+
res = requests.post(
126+
DATABASE_URL_TMPL.format(database_id=database_id),
127+
headers=self.headers,
128+
json=query_dict,
129+
)
130+
res.raise_for_status()
131+
data = res.json()
132+
133+
pages.extend(data.get("results"))
134+
135+
while data.get("has_more"):
136+
query_dict["start_cursor"] = data.get("next_cursor")
137+
res = requests.post(
138+
DATABASE_URL_TMPL.format(database_id=database_id),
139+
headers=self.headers,
140+
json=query_dict,
141+
)
142+
res.raise_for_status()
143+
data = res.json()
144+
pages.extend(data.get("results"))
145+
146+
return [page["id"] for page in pages]
147+
148+
def search(self, query: str) -> List[str]:
149+
"""Search Notion page given a text query."""
150+
done = False
151+
next_cursor: Optional[str] = None
152+
page_ids = []
153+
while not done:
154+
query_dict = {
155+
"query": query,
156+
}
157+
if next_cursor is not None:
158+
query_dict["start_cursor"] = next_cursor
159+
res = requests.post(SEARCH_URL, headers=self.headers, json=query_dict)
160+
data = res.json()
161+
for result in data["results"]:
162+
page_id = result["id"]
163+
page_ids.append(page_id)
164+
165+
if data["next_cursor"] is None:
166+
done = True
167+
break
168+
else:
169+
next_cursor = data["next_cursor"]
170+
return page_ids
171+
172+
def load_data(
173+
self, page_ids: List[str] = [], database_id: Optional[str] = None
174+
) -> List[Document]:
175+
"""Load data from the input directory.
176+
177+
Args:
178+
page_ids (List[str]): List of page ids to load.
179+
database_id (str): Database_id from which to load page ids.
180+
181+
Returns:
182+
List[Document]: List of documents.
183+
184+
"""
185+
if not page_ids and not database_id:
186+
raise ValueError("Must specify either `page_ids` or `database_id`.")
187+
docs = []
188+
if database_id is not None:
189+
# get all the pages in the database
190+
page_ids = self.query_database(database_id)
191+
for page_id in page_ids:
192+
page_text, most_recent_time = self.read_page(page_id)
193+
docs.append(
194+
Document(
195+
text=page_text,
196+
id_=page_id,
197+
extra_info={"page_id": page_id, "updated_at": most_recent_time},
198+
)
199+
)
200+
else:
201+
for page_id in page_ids:
202+
page_text, most_recent_time = self.read_page(page_id)
203+
docs.append(
204+
Document(
205+
text=page_text,
206+
id_=page_id,
207+
extra_info={"page_id": page_id, "updated_at": most_recent_time},
208+
)
209+
)
210+
211+
return docs
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import httpx
2+
from typing import List
3+
from llama_index.core.readers.base import BaseReader
4+
from llama_index.core.schema import Document
5+
6+
INCIDENT_TEXT_TEMPLATE = """
7+
Incident title: {title}
8+
Incident description: {description}
9+
Incident summary: {summary}
10+
Incident status: {status}
11+
Service name: {service_name}
12+
Created at: {created_at}
13+
"""
14+
15+
16+
class PagerDutyReader(BaseReader):
17+
access_token: str
18+
token_type: str
19+
20+
def __init__(self, access_token: str, token_type: str):
21+
self.access_token = access_token
22+
self.token_type = token_type
23+
24+
@classmethod
25+
def class_name(cls) -> str:
26+
return "PagerDutyReader"
27+
28+
async def get_incidents(self) -> List[Document]:
29+
headers = {}
30+
if self.token_type == "basic":
31+
headers["Authorization"] = f"Token token={self.access_token}"
32+
elif self.token_type == "oauth":
33+
headers["Authorization"] = f"Bearer {self.access_token}"
34+
35+
limit = 100
36+
offset = 0
37+
resolved_incidents = []
38+
while True:
39+
async with httpx.AsyncClient() as client:
40+
response = await client.get(
41+
"https://api.pagerduty.com/incidents",
42+
headers=headers,
43+
params={
44+
"date_range": "all",
45+
"statuses[]": "resolved",
46+
"limit": limit,
47+
"offset": offset,
48+
},
49+
)
50+
data = response.json()
51+
incidents = data["incidents"]
52+
resolved_incidents.extend(incidents)
53+
if not data["more"]:
54+
break
55+
offset += limit
56+
return resolved_incidents
57+
58+
async def load_data(self) -> List[Document]:
59+
incidents = await self.get_incidents()
60+
61+
documents = []
62+
63+
for incident in incidents:
64+
service = incident.get("service", {})
65+
service_name = service.get("summary", "Unknown")
66+
67+
text = INCIDENT_TEXT_TEMPLATE.format(
68+
title=incident["title"],
69+
description=incident["description"],
70+
summary=incident["summary"],
71+
status=incident["status"],
72+
service_name=service_name,
73+
created_at=incident["created_at"],
74+
)
75+
metadata = {
76+
"source": "PagerDuty",
77+
"title": incident["title"],
78+
"id": incident["id"],
79+
"link": incident["html_url"],
80+
"status": incident["status"],
81+
"urgency": incident["urgency"],
82+
"service_id": service.get("id", "Unknown"),
83+
"first_trigger_log_entry_id": incident.get(
84+
"first_trigger_log_entry", {}
85+
).get("id", "Unknown"),
86+
"created_at": incident["created_at"],
87+
"updated_at": incident["updated_at"],
88+
}
89+
90+
document = Document(doc_id=incident["id"], text=text, metadata=metadata)
91+
documents.append(document)
92+
93+
return documents

‎services/data-processor/src/loaders/raw_readers/slack.py ‎services/data-processor/src/loaders/readers/slack.py

+58-14
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,11 @@ def _read_message(self, channel_id: str, message_ts: str) -> str:
103103

104104
"""Read a message."""
105105

106+
# TODO: this method reads all the thread messages and creates one document
107+
# At the moment, we don't use the usernames + timestamps. This can be a nice improvement.
106108
messages_text: List[str] = []
107109
next_cursor = None
110+
most_recent_update = None
108111
while True:
109112
try:
110113
# https://slack.com/api/conversations.replies
@@ -128,6 +131,18 @@ def _read_message(self, channel_id: str, message_ts: str) -> str:
128131
**conversations_replies_kwargs # type: ignore
129132
)
130133
messages = result["messages"]
134+
135+
for message in messages:
136+
last_edited = float(
137+
message.get("edited", {}).get("ts", message["ts"])
138+
)
139+
last_edited_utc = datetime.utcfromtimestamp(last_edited)
140+
if (
141+
most_recent_update is None
142+
or last_edited_utc > most_recent_update
143+
):
144+
most_recent_update = last_edited_utc
145+
131146
messages_text.extend(message["text"] for message in messages)
132147
if not result["has_more"]:
133148
break
@@ -143,7 +158,10 @@ def _read_message(self, channel_id: str, message_ts: str) -> str:
143158
time.sleep(int(e.response.headers["retry-after"]))
144159
else:
145160
logger.error(f"Error parsing conversation replies: {e}")
146-
return "\n\n".join(messages_text)
161+
162+
most_recent_update = most_recent_update.isoformat(timespec="milliseconds") + "Z"
163+
164+
return ("\n\n".join(messages_text), most_recent_update)
147165

148166
def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str:
149167
from slack_sdk.errors import SlackApiError
@@ -162,6 +180,7 @@ def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str:
162180
"channel": channel_id,
163181
"cursor": next_cursor,
164182
"latest": str(self.latest_date_timestamp),
183+
"include_all_metadata": True,
165184
}
166185
if self.earliest_date_timestamp is not None:
167186
conversations_history_kwargs["oldest"] = str(
@@ -175,18 +194,34 @@ def _read_channel(self, channel_id: str, reverse_chronological: bool) -> str:
175194
logger.info(
176195
f"{len(conversation_history)} messages found in {channel_id}"
177196
)
178-
result_messages.extend(
179-
(
180-
{
181-
**message,
182-
"text": self._read_message(channel_id, message["ts"]),
183-
}
184-
if message.get("thread_ts")
185-
== message["ts"] # Message is a parent message of a thread
186-
else message
187-
)
188-
for message in tqdm(conversation_history, desc="Reading messages")
189-
)
197+
198+
for message in tqdm(conversation_history, desc="Reading messages"):
199+
if message.get("thread_ts") == message["ts"]:
200+
# Message is a thread parent message. Let's explore this thread!
201+
text, most_recent_update = self._read_message(
202+
channel_id, message["ts"]
203+
)
204+
result_messages.append(
205+
{
206+
**message,
207+
"text": text,
208+
"updated_at": most_recent_update,
209+
}
210+
)
211+
else:
212+
last_edited = float(
213+
message.get("edited", {}).get("ts", message["ts"])
214+
)
215+
result_messages.append(
216+
{
217+
**message,
218+
"updated_at": datetime.utcfromtimestamp(
219+
last_edited
220+
).isoformat(timespec="milliseconds")
221+
+ "Z",
222+
}
223+
)
224+
190225
if not result["has_more"]:
191226
break
192227
next_cursor = result["response_metadata"]["next_cursor"]
@@ -222,11 +257,20 @@ def load_data(
222257
)
223258
# Remove messages with empty text
224259
messages = [message for message in messages if message["text"] != ""]
260+
# debugging step
261+
for message in messages:
262+
if "glorious poop" in message["text"]:
263+
print("this is it boys")
264+
225265
documents = [
226266
Document(
227267
id_=message["ts"],
228268
text=message["text"],
229-
metadata={"channel_id": channel_id, "ts": message["ts"]},
269+
metadata={
270+
"channel_id": channel_id,
271+
"ts": message["ts"],
272+
"updated_at": message["updated_at"],
273+
},
230274
)
231275
for message in messages
232276
]

‎services/data-processor/src/loaders/slack.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from typing import List
88

9-
from loaders.raw_readers.slack import SlackReader
9+
from loaders.readers.slack import SlackReader
1010

1111

1212
def join_channels(client: WebClient, channel_ids: List[str]):
@@ -27,7 +27,6 @@ def fetch_slack_documents(integration: Integration):
2727
types=["public_channel", "private_channel"],
2828
)
2929
channel_ids = [channel["id"] for channel in channels["channels"]]
30-
3130
id2name = {channel["id"]: channel["name"] for channel in channels["channels"]}
3231

3332
# Try to join the channels, to avoid "not_in_channel" in Slack.

‎services/data-processor/src/loaders/utils/github_client.py

+611
Large diffs are not rendered by default.

‎services/data-processor/src/main.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,9 @@ async def start_build_index(
4949

5050
# TODO: we re-create the index every time. We need to consider
5151
# changing this in the future
52-
existing_index = await get_index_by_organization_id(organization_id)
53-
if existing_index:
54-
await delete_index_by_id(existing_index["_id"])
55-
56-
index = await create_index(organization_id, data_sources, "chromadb")
52+
index = await get_index_by_organization_id(organization_id)
53+
if not index:
54+
index = await create_index(organization_id, data_sources, "chromadb")
5755

5856
background_tasks.add_task(
5957
build_index,

‎services/data-processor/src/rag/chromadb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import chromadb
22
from chromadb.config import Settings
3-
from llama_index.vector_stores.chroma import (
3+
from .raw_vector_stores.chromadb import (
44
ChromaVectorStore as LIChromaVectorStore,
55
)
66
from rag.base import BaseVectorStore

‎services/data-processor/src/rag/raw_vector_stores/chromadb.py

+423
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)
Please sign in to comment.