Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit a4e753a

Browse files
committedSep 17, 2024·
Adjust notion reader
1 parent f1daeef commit a4e753a

File tree

2 files changed

+209
-1
lines changed

2 files changed

+209
-1
lines changed
 

‎services/data-processor/src/loaders/notion.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from db.types import Integration
22
from notion_client import Client
3-
from llama_index.readers.notion import NotionPageReader
3+
from loaders.raw_readers.notion import NotionPageReader
44

55

66
def fetch_notion_documents(integration: Integration):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
"""Notion reader."""
2+
3+
from datetime import datetime
4+
import os
5+
from typing import Any, Dict, List, Optional
6+
7+
import requests # type: ignore
8+
from llama_index.core.readers.base import BasePydanticReader
9+
from llama_index.core.schema import Document
10+
11+
INTEGRATION_TOKEN_NAME = "NOTION_INTEGRATION_TOKEN"
12+
BLOCK_CHILD_URL_TMPL = "https://api.notion.com/v1/blocks/{block_id}/children"
13+
DATABASE_URL_TMPL = "https://api.notion.com/v1/databases/{database_id}/query"
14+
SEARCH_URL = "https://api.notion.com/v1/search"
15+
16+
17+
# TODO: Notion DB reader coming soon!
18+
class NotionPageReader(BasePydanticReader):
19+
"""Notion Page reader.
20+
21+
Reads a set of Notion pages.
22+
23+
Args:
24+
integration_token (str): Notion integration token.
25+
26+
"""
27+
28+
is_remote: bool = True
29+
token: str
30+
headers: Dict[str, str]
31+
32+
def __init__(self, integration_token: Optional[str] = None) -> None:
33+
"""Initialize with parameters."""
34+
if integration_token is None:
35+
integration_token = os.getenv(INTEGRATION_TOKEN_NAME)
36+
if integration_token is None:
37+
raise ValueError(
38+
"Must specify `integration_token` or set environment "
39+
"variable `NOTION_INTEGRATION_TOKEN`."
40+
)
41+
42+
token = integration_token
43+
headers = {
44+
"Authorization": "Bearer " + token,
45+
"Content-Type": "application/json",
46+
"Notion-Version": "2022-06-28",
47+
}
48+
49+
super().__init__(token=token, headers=headers)
50+
51+
@classmethod
52+
def class_name(cls) -> str:
53+
"""Get the name identifier of the class."""
54+
return "NotionPageReader"
55+
56+
def _read_block(self, block_id: str, num_tabs: int = 0) -> str:
57+
"""Read a block."""
58+
done = False
59+
result_lines_arr = []
60+
cur_block_id = block_id
61+
most_recent_time = None
62+
while not done:
63+
block_url = BLOCK_CHILD_URL_TMPL.format(block_id=cur_block_id)
64+
query_dict: Dict[str, Any] = {}
65+
66+
res = requests.request(
67+
"GET", block_url, headers=self.headers, json=query_dict
68+
)
69+
data = res.json()
70+
71+
for result in data["results"]:
72+
result_type = result["type"]
73+
result_obj = result[result_type]
74+
75+
cur_result_text_arr = []
76+
if "rich_text" in result_obj:
77+
for rich_text in result_obj["rich_text"]:
78+
# skip if doesn't have text object
79+
if "text" in rich_text:
80+
text = rich_text["text"]["content"]
81+
prefix = "\t" * num_tabs
82+
cur_result_text_arr.append(prefix + text)
83+
84+
result_block_id = result["id"]
85+
has_children = result["has_children"]
86+
if has_children:
87+
children_text = self._read_block(
88+
result_block_id, num_tabs=num_tabs + 1
89+
)
90+
cur_result_text_arr.append(children_text)
91+
92+
cur_result_text = "\n".join(cur_result_text_arr)
93+
result_lines_arr.append(cur_result_text)
94+
last_edited_time = result["last_edited_time"]
95+
last_edited_utc = datetime.fromisoformat(last_edited_time)
96+
if most_recent_time is None or last_edited_utc > most_recent_time:
97+
most_recent_time = last_edited_utc
98+
99+
if data["next_cursor"] is None:
100+
done = True
101+
break
102+
else:
103+
cur_block_id = data["next_cursor"]
104+
105+
block_text = "\n".join(result_lines_arr)
106+
most_recent_time = most_recent_time.isoformat(timespec="milliseconds") + "Z"
107+
108+
return block_text, most_recent_time
109+
110+
def read_page(self, page_id: str) -> str:
111+
"""Read a page."""
112+
return self._read_block(page_id)
113+
114+
def query_database(
115+
self, database_id: str, query_dict: Dict[str, Any] = {"page_size": 100}
116+
) -> List[str]:
117+
"""Get all the pages from a Notion database."""
118+
pages = []
119+
120+
res = requests.post(
121+
DATABASE_URL_TMPL.format(database_id=database_id),
122+
headers=self.headers,
123+
json=query_dict,
124+
)
125+
res.raise_for_status()
126+
data = res.json()
127+
128+
pages.extend(data.get("results"))
129+
130+
while data.get("has_more"):
131+
query_dict["start_cursor"] = data.get("next_cursor")
132+
res = requests.post(
133+
DATABASE_URL_TMPL.format(database_id=database_id),
134+
headers=self.headers,
135+
json=query_dict,
136+
)
137+
res.raise_for_status()
138+
data = res.json()
139+
pages.extend(data.get("results"))
140+
141+
return [page["id"] for page in pages]
142+
143+
def search(self, query: str) -> List[str]:
144+
"""Search Notion page given a text query."""
145+
done = False
146+
next_cursor: Optional[str] = None
147+
page_ids = []
148+
while not done:
149+
query_dict = {
150+
"query": query,
151+
}
152+
if next_cursor is not None:
153+
query_dict["start_cursor"] = next_cursor
154+
res = requests.post(SEARCH_URL, headers=self.headers, json=query_dict)
155+
data = res.json()
156+
for result in data["results"]:
157+
page_id = result["id"]
158+
page_ids.append(page_id)
159+
160+
if data["next_cursor"] is None:
161+
done = True
162+
break
163+
else:
164+
next_cursor = data["next_cursor"]
165+
return page_ids
166+
167+
def load_data(
168+
self, page_ids: List[str] = [], database_id: Optional[str] = None
169+
) -> List[Document]:
170+
"""Load data from the input directory.
171+
172+
Args:
173+
page_ids (List[str]): List of page ids to load.
174+
database_id (str): Database_id from which to load page ids.
175+
176+
Returns:
177+
List[Document]: List of documents.
178+
179+
"""
180+
if not page_ids and not database_id:
181+
raise ValueError("Must specify either `page_ids` or `database_id`.")
182+
docs = []
183+
if database_id is not None:
184+
# get all the pages in the database
185+
page_ids = self.query_database(database_id)
186+
for page_id in page_ids:
187+
page_text, most_recent_time = self.read_page(page_id)
188+
docs.append(
189+
Document(
190+
text=page_text,
191+
id_=page_id,
192+
extra_info={"page_id": page_id},
193+
metadata={"updated_at": most_recent_time},
194+
)
195+
)
196+
else:
197+
for page_id in page_ids:
198+
page_text, most_recent_time = self.read_page(page_id)
199+
docs.append(
200+
Document(
201+
text=page_text,
202+
id_=page_id,
203+
extra_info={"page_id": page_id},
204+
metadata={"updated_at": most_recent_time},
205+
)
206+
)
207+
208+
return docs

0 commit comments

Comments
 (0)
Please sign in to comment.