From 16d4a1a56da95bb221058e56fdfa30dcb8c6e4d1 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Sun, 4 Jun 2023 22:29:08 +0200 Subject: [PATCH 1/9] added notion data source, still missing database indexing --- app/data_source/sources/notion/__init__.py | 0 app/data_source/sources/notion/notion.py | 206 +++++++++++++++++++++ app/indexing/index_documents.py | 29 ++- app/static/data_source_icons/notion.png | Bin 0 -> 7161 bytes ui/src/components/data-source-panel.tsx | 9 + 5 files changed, 227 insertions(+), 17 deletions(-) create mode 100644 app/data_source/sources/notion/__init__.py create mode 100644 app/data_source/sources/notion/notion.py create mode 100644 app/static/data_source_icons/notion.png diff --git a/app/data_source/sources/notion/__init__.py b/app/data_source/sources/notion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/data_source/sources/notion/notion.py b/app/data_source/sources/notion/notion.py new file mode 100644 index 0000000..1db1095 --- /dev/null +++ b/app/data_source/sources/notion/notion.py @@ -0,0 +1,206 @@ +import logging +from datetime import datetime +from enum import Enum +from typing import Dict, List + +import requests +from data_source.api.base_data_source import BaseDataSource, BaseDataSourceConfig, ConfigField, HTMLInputType +from data_source.api.basic_document import BasicDocument, DocumentType +from data_source.api.exception import InvalidDataSourceConfig +from queues.index_queue import IndexQueue +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +logger = logging.getLogger(__name__) + +# Notion API Status codes https://developers.notion.com/reference/status-codes + +HTTP_OK = 200 +HTTP_BAD_REQUEST = 400 +HTTP_UNAUTHORIZED = 401 +HTTP_FORBIDDEN = 403 +HTTP_NOT_FOUND = 404 +HTTP_CONFLICT = 409 +HTTP_TOO_MANY_REQUESTS = 429 + +# 5xx Server Errors +HTTP_INTERNAL_SERVER_ERROR = 500 +HTTP_SERVICE_UNAVAILABLE = 503 +HTTP_GATEWAY_TIMEOUT = 504 + +RETRY_AFTER_STATUS_CODES = frozenset( + { + HTTP_TOO_MANY_REQUESTS, + HTTP_INTERNAL_SERVER_ERROR, + HTTP_SERVICE_UNAVAILABLE, + HTTP_GATEWAY_TIMEOUT, + } +) + + +def _notion_retry_session(token, retries=5, backoff_factor=2.0, status_forcelist=RETRY_AFTER_STATUS_CODES): + """Creates a retry session""" + session = requests.Session() + retry = Retry( + total=retries, + connect=retries, + read=retries, + status=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + raise_on_redirect=False, + raise_on_status=False, + respect_retry_after_header=True, + ) + adapter = HTTPAdapter() + adapter.max_retries = retry + session.mount("http://", adapter) + session.mount("https://", adapter) + session.headers.update({"Notion-Version": "2022-06-28", "Authorization": f"Bearer {token}"}) + return session + + +class NotionObject(str, Enum): + page = "page" + database = "database" + + +class NotionClient: + def __init__(self, token): + self.api_url = "https://api.notion.com/v1" + self.session = _notion_retry_session(token) + + def auth_check(self): + url = f"{self.api_url}/users/me" + response = self.session.get(url) + response.raise_for_status() + + def get_user(self, user_id): + url = f"{self.api_url}/users/{user_id}" + response = self.session.get(url) + return response.json() + + def list_objects(self, notion_object: NotionObject): + url = f"{self.api_url}/search" + filter_data = { + "filter": {"value": notion_object, "property": "object"}, + "sort": {"direction": "ascending", "timestamp": "last_edited_time"}, + } + response = self.session.post(url, json=filter_data) + results = response.json()["results"] + while response.json()["has_more"] is True: + response = self.session.post(url, json=filter_data, params={"start_cursor": response.json()["next_cursor"]}) + results.extend(response.json()["results"]) + return results + + def list_pages(self): + return self.list_objects(NotionObject.page) + + def list_databases(self): + return self.list_objects(NotionObject.database) + + def list_blocks(self, block_id: str): + url = f"{self.api_url}/blocks/{block_id}/children" + response = self.session.get(url) + results = response.json()["results"] + while response.json()["has_more"] is True: + response = self.session.get(url, params={"start_cursor": response.json()["next_cursor"]}) + results.extend(response.json()["results"]) + return results + + +class NotionConfig(BaseDataSourceConfig): + token: str + + +class NotionDataSource(BaseDataSource): + @staticmethod + def get_config_fields() -> List[ConfigField]: + """ + list of the config fields which should be the same fields as in MagicConfig, for dynamic UI generation + """ + return [ + ConfigField( + label="Notion Integration Token", + name="token", + placeholder="secret_AZefAeAZqsfDAZE", + input_type=HTMLInputType.PASSWORD, + ) + ] + + @staticmethod + async def validate_config(config: Dict) -> None: + """ + Validate the configuration and raise an exception if it's invalid, + You should try to actually connect to the data source and verify that it's working + """ + try: + parsed_config = NotionConfig(**config) + notion_client = NotionClient(token=parsed_config.token) + notion_client.auth_check() + except Exception as e: + raise InvalidDataSourceConfig from e + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + notion_config = NotionConfig(**self._raw_config) + self._notion_client = NotionClient( + token=notion_config.token, + ) + self.data_source_id = "DUMMY_SOURCE_ID" + + @staticmethod + def _parse_rich_text(self, rich_text: list): + return "\n".join([text["plain_text"] for text in rich_text]) + + @staticmethod + def _parse_content_from_blocks(self, notion_blocks): + return "\n".join( + [ + self._parse_rich_text(block["paragraph"]["rich_text"]) + for block in notion_blocks + if block["type"] == "paragraph" + ] + ) + + def _parse_content_from_page(self, page): + metadata_list = [ + f"{prop}: {self._parse_rich_text(page['properties'][prop].get('rich_text',''))}" + for prop in page["properties"] + if prop != "Name" + ] + title = f"Title: {self._parse_rich_text(page['properties']['Name']['title'])}" + metadata = "\n".join([title] + metadata_list) + page_blocks = self._notion_client.list_blocks(page["id"]) + blocks_content = self._parse_content_from_blocks(page_blocks) + author = self._notion_client.get_user(page["created_by"]["id"]) + return { + "id": page["id"], + "author": author["name"], + "author_image_url": author["avatar_url"], + "url": page["url"], + "title": title, + "location": title, + "content": metadata + blocks_content, + "timestamp": page["created_time"], + } + + def _fetch_page(self, page): + page_data = self._parse_content_from_page(page) + logger.info(f"Indexing page {page_data['id']}") + document = BasicDocument( + data_source_id=self._data_source_id, + document_type=DocumentType.DOCUMENT, + **page_data, + ) + IndexQueue.get_instance().put_single(document) + + def _feed_new_documents(self) -> None: + pages = self._notion_client.list_pages() + for page in pages: + last_updated_at = datetime.strptime(page["last_edited_time"], "%Y-%m-%dT%H:%M:%S.%fZ") + if last_updated_at < self._last_index_time: + # skipping already indexed pages + continue + + self._add_task(self.fetch_page, page) diff --git a/app/indexing/index_documents.py b/app/indexing/index_documents.py index 8722983..c4b3bec 100644 --- a/app/indexing/index_documents.py +++ b/app/indexing/index_documents.py @@ -7,12 +7,10 @@ from db_engine import Session from indexing.bm25_index import Bm25Index from indexing.faiss_index import FaissIndex +from langchain.schema import Document as PDFDocument from models import bi_encoder -from parsers.pdf import split_PDF_into_paragraphs from paths import IS_IN_DOCKER from schemas import Document, Paragraph -from langchain.schema import Document as PDFDocument - logger = logging.getLogger(__name__) @@ -25,7 +23,6 @@ def get_enum_value_or_none(enum: Optional[Enum]) -> Optional[str]: class Indexer: - @staticmethod def basic_to_document(document: BasicDocument, parent: Document = None) -> Document: paragraphs = Indexer._split_into_paragraphs(document.content) @@ -43,11 +40,8 @@ def basic_to_document(document: BasicDocument, parent: Document = None) -> Docum location=document.location, url=document.url, timestamp=document.timestamp, - paragraphs=[ - Paragraph(content=content) - for content in paragraphs - ], - parent=parent + paragraphs=[Paragraph(content=content) for content in paragraphs], + parent=parent, ) @staticmethod @@ -57,10 +51,11 @@ def index_documents(documents: List[BasicDocument]): ids_in_data_source = [document.id_in_data_source for document in documents] with Session() as session: - documents_to_delete = session.query(Document).filter( - Document.id_in_data_source.in_(ids_in_data_source)).all() + documents_to_delete = ( + session.query(Document).filter(Document.id_in_data_source.in_(ids_in_data_source)).all() + ) if documents_to_delete: - logging.info(f'removing documents that were updated and need to be re-indexed.') + logging.info(f"removing documents that were updated and need to be re-indexed.") Indexer.remove_documents(documents_to_delete, session) for document in documents_to_delete: # Currently bulk deleting doesn't cascade. So we need to delete them one by one. @@ -120,15 +115,15 @@ def _split_into_paragraphs(text, minimum_length=256): if text is None: return [] paragraphs = [] - current_paragraph = '' - for paragraph in re.split(r'\n\s*\n', text): + current_paragraph = "" + for paragraph in re.split(r"\n\s*\n", text): if len(current_paragraph) > 0: - current_paragraph += ' ' + current_paragraph += " " current_paragraph += paragraph.strip() if len(current_paragraph) > minimum_length: paragraphs.append(current_paragraph) - current_paragraph = '' + current_paragraph = "" if len(current_paragraph) > 0: paragraphs.append(current_paragraph) @@ -138,7 +133,7 @@ def _split_into_paragraphs(text, minimum_length=256): def _add_metadata_for_indexing(paragraph: Paragraph) -> str: result = paragraph.content if paragraph.document.title is not None: - result += '; ' + paragraph.document.title + result += "; " + paragraph.document.title return result @staticmethod diff --git a/app/static/data_source_icons/notion.png b/app/static/data_source_icons/notion.png new file mode 100644 index 0000000000000000000000000000000000000000..956fcfff9de53beb8ee7ce154d64fdfe10fcabab GIT binary patch literal 7161 zcma)f2UJtt()KAi{n;eGGA>;Cs$>-)aFR?f_xd1lYEXZATcC&Ezw96c=$EdYRCM_c0} z00>w`05lR74Vl8tu)ug~TX+G0WjXjk--m+sU__0fp0VaRPrX0?=dr)P|6ka@E&sg! zC+we5|Kn{~Z5w$lu7nF|ht`Fs%IN^xyhFmH$Hiy#ic&SRoRL zt*xya8ynKn(r7dqgTZ8GX0ETVZ*6T+Q&T%TJDZ!E^YQT|BqV5PXjD{G1O)|UXJ=bj zSY%{mjE#+%nwlOzemo>3WOH+qhK9z~)ioE1ZH<K_O z($bQLhi7$lH8wU@Q&aQKojW5VBcr3E<>lr4{QMIW6Y1&cp`oEmOG_ve%GlVLfq?;w z#Rdij8X6ipIXNXICAGA)XlZGws;c_?`=_L&n3$Mwad8O<2>ky2TUJ)~*s)_iK0Zc9 zM%~@rHa0eqk&zJ*5!2Jt_4W0ho}P7ebvrvdot>RlR#xrp?Y_RguV23o2nY}t7f(z~ z+}_@nkdW~6^SgQTW>{EQR8$lrBjc-AukPQ!|KrDx!kM;OxWu=-FX~?c8yVS|3FfU^ zMCo;mhI54lU)xwP^>OJSQ)=|uCWV$}1D&-JVn$~gH*QWZQ^CI+Nc&MV;JhFz6% zRgWEfBU?6LsaYACq3%lC`&p_8W9^AJGOAX^*FvvZTBW4sCVg&OiYxCsQ<2-r*yt5M3iC z_ek-rdLu(Vlcg&@j*cGlx|aNJJ1UNUT?zbj&K-RSzZ~+a?7hL@r{ECNEnHloa>s8U zgW8U^RhqetwbkcAyFXvPEdM2?t(LmKkzBL=`SzXl=6Kt^-rciX?Y$QQ=f4i`Y|*^5 z-hH!pXR31MRrb#Xg_A$LhLMuH3h%6H4^6434t4GIHq6L(_kVD+54^NGYhtzV?jgqc zQisEukkC-A-!GL-xm=qM*N0lU5;qox!=vT=t`3(hk02%Anr-v-eJq;7&RoCizIra> z+{Q@%@|Mq96(m&oWuPj_M+GHX6g~H_)Gs^fc-Icis9vMi1}O)L2#FvygIP_p&8xuXU{ttF>sC z)ol5b+I8Mvgk;)x{zAyj;bN55(GG?V%ekyMCWjhqG79$Un-|9A-@h|0xPhC% z_lCv3(7&hZ6O!K-7ppQXE?-Dvaxt4Fj1z5j*#G&Z=s75n^(fU9RQ*);qd9-~RZFjs z8&V-lfmNcO4}}5-ZXhLWb7TZUmW-n2zRcp2RB?COps2Z7HiffHc?zeV2|79|)!|5T zOagVydFK3DH1(otYgLaIn+3FA;`u1-Xhwc=bKd1o-pWYRn?E{ulOLSqCE`O#)CaRx*_5H*tA$aT-{cTax zg*T%G@;)I;b^VjU^>$yeldUg(WrohgFa3JV%P7l+M%BMd0G)S!w?$x-C2xP=ZeMSo zd$_-=C&qL}1C9T7_G_O}3#XP3=4R6!xhe9VT~~ zdXn3u#qo)GjfUg*cd zH)#4h*`Qw~_;u&e6EONg$l-i0`>2} zDqYUid8ww{=-NDXF#Tm!`eeSoHJtkPLN|sO|D~QXvhq7SMjLl}$?pYFY*pIgw%!+H zXTgNg6eEa=V(FK}kyq}*86Jv`IC`oC=v~3Nw$l>5{HcSed&;t;nM#~cI}O7LxdY>f^+&upb4#72^(4gaw#5+x2vfaIT&7#POmAeP$a&Rg7y@xn15qoY@W5NQ~kVJmFL%$Dck8KE|Z{GvJC;T2;_JiNG*NSB5K)yPkbuHbK z(!z`CL}z$5XAg-f%XZ3{N12@|6XlOm;lW_+l1+PLw^++G~v4RuY2+l|JDfyEfZFc_o{&d{fV|O;9CawD>JQ${<2tu8-jih5{G=%IPYE`^VDbGzbsO zEL{G`oMhv24$s3j?|x_+gfS4~C7xEM^FaLXemGtjMdG6IV?L0=nb>>Tvw>y(RDi?b ztx7|t)SE)Hby1BouGE0tZ`)6yz>dn6i!xf8-)bhemahZ`jm0gPsl8>3P2`GN6#=gL zJOs!q7C87-;C~-_hB>t(#l=(OAM5hcV(0$&d>b;01GtKyWaY)kSf~V=h zE7Dc>o8Ik<-j!D07TMRt!N|;Jz2}dP0i&EP$=#hsGl;kx#JRBbA|u#lY7-FOOa@%3 zNQyYxa~hz#7Of^Lo4hZixdZU6qu6RvnXfka4H^{b^%OSM%=3Vlyb|NRnh*W;s=op8 zllO&0O9;gOv{Z}-=OO7E6&(6ZMMxmfAt(2&fRSkR+PQxoX4`sslwBuPU+!x|AQ1OY050{{YOfIs+#U;;$&4^w!$Y~YZ4Kn2&I zx}$(_Z@4cuEw9e*ba2rJ>lp*g84*U%8`af!Z=e!>pm+37NiGLGl?BAw;i21mts=;c zPjp};SFq`8-0yY4FveLVKulC4F~!V?s@`0GuBjRHR-OKsM~pB+@r zf;N&!aFKHVxcnv5`Qj`3@9#zQFRg1aJXeJ}jgC0q^=2;9?&1{!_wt$JG;yk3 z(So1g(pILHuyYHElh?Vx(i_ov@mGAxA*l3 z2Lwnr8s!S$h)j?Uh(o}T9lqJvK^8CQrN=|?g;=EohzdX$j+CY*0We2RK8gU9P)zW3 z07mGEG_VD~|4rrq^!)%tLq-B0B+iNofZISArbl4>8K$bYrmKEUsIDruWzCwzagH+rIc&(U&3(nUV4K%1hfhmGG1O7G{QvrM!V7e{} z!dU=(S``2SX7GfHJbbg?iWcZ0Mh0t(I3Yk_0l|;`Dqt?s7@}fbse?5jbA#<3I#CF? z06T@NyaQNZj+8z;)bIY6=ly9;aNs%GH`R0inK-L-iynAV<9xJAs^bAs4y>5TqTmqR zpU+R>6HkZFl^C#N1o_ms*YDtM3PHH74JYb@2>Cbgg8`^RWto4=@by`^eq>`{ry~iI z<-!u*X21Y@C}x;r?GhCrmV;&wY%~IN13DdrI|6@Yy;w2%SJLE@2RuD3Dt>wa;t8-q zqsa%zJYf6oVMhQQ!31Mur_*U5&39lq`Y#Xi07-SMUJ9m3+>7+2g;|nKEZ|JrpjnXZ z%}odpA$UJY2#^asF*AevMg#0#f02SfKES=sg~@RMCO$T908*fsvtnaW06ueG4Oh<*pi5B7f3Ye>y`cIlxhgXwydB~QshJI)gE|q1 z1HV)}+#)h92)%pw(g9cQ*~0+%Lil{i83@kQNLvKg$rDKkD}kxF1EySbz`wTi7kp(P z6#kbWz{{z|${%zu^wBBvpWrlI*>09wu0;B;5(4oOg8L*K#P~0 zz9WYEdq-E{eAHx-aw$OO0HJPmf3qza!OtlHGUq{YTZ8u@u!VqkKaur^`e4dC=)Z&%*zyQ)+_BPRM=fF3 ziZhCmig8wb##ABca}TxJ#p*o( z?NvGOLXemMwh)}cP}&e-|NqL?R}WDBnom#^J)$s{$sNiUgrOA};-J{8MDR_5OofU^ znY-;yyGg(>a~a5#YFR(b9YJzX`$*MG*qy%40e%;d>=2#QOS)k}2!ed1*b%bCz}ZDR zh_b03iJ`!lY6*PAiA>sj8Il^&9Wf$z0AbMiQ?h{&re`Oov8-`oi8jpL=ZS|nK&9BL zXMnv<0&$WJ=a31QBTNjdQTUw5>?6bm`4L`@ks}yZl=-A*d8!2%jLdgNpmiSFvQpu< zRgLwDvez{Yf1}9064@zWAVVUMj+r!!hr-x3Xeq`A6C(MYgYF=8}kupY{qPlt#s2#-; z9`T5hb+cR;t{=nO+e`6ka*zlqfF`38k7Oal`OX^aB}V2)A{75nlN2I8vlHSPF;&JC z*JmL5D}L>G=4VcDf?eYRE8e^?mJcUj@uGr-(;76H*3>Z0bM#Jinrgr}>w z9z?N_w@v#_$lE0W21&b~&*De$iv=m~4_yuI!-|WOdB}+YY%HX+CTq%JP_WR9?0KL~ z%N1Zt552k|_I_q_n@;101T-6Q`>Z*lh!psNyy;=sF9tQL?M&lpP|c}LmpsXT#2Y(= z;uekB@f4lM!eph+K<)3f;BpBfct0l>R30)jpc2Ik$nE<@*~}P~Qa3XDoV|*GUs!El z7=pp=V1X=#(N?0O?)qJQ;_rk?!CJ&f%~M5L2OR~RN|=|$I3|& zpnnY2X82OgM}{GY-64waY05dkiG5KCJz&ekd*-gMiY~;Ud%npX<#aA8IQ8Dw#f@X* z?dQvqU$fy)Be?4b01@@Jc<)c}E$99U@4dHN8y~e%#NX(NNi1=-IH$z8^VR6h&3D&v zCw^$&IBE$w&K4rkj$iU+bpxHMhxi{ zh{59M8i7WHm;}+Bj;9%iCFB4*PCu1!l%XXk;12L=Ma+m08*bcG;JN}$bckybuaOGa z%)vID4l}+5ML%^Ii+d5Iga~&_U&nEfPe{g5F=6oyM+-sR^*9l{ij$%Y2VU0k8U{Cu zAHVJhO{}~=3Gtmhto%_COETmJCSb%qO%aQWWq67RpNu)=cb(S}s$xkNC!eM~c_l#< zzlC>&_}W;Xj@fYFIbOt}K#lM-2WVns@L7Z;$QMW;K46tPXkH%o`%E zlcYF^Df{t{RM8-phTj2l`gu>X2u%gL@9KX52_FReqt6`zxdPsG7gVxS)oCwp*<9Bt z7b}TF6Kxr1$I1Gfi`{Aac{Q$=oJk)Y^uL}5$WnQpv0geg62;DFaB8txA>&Ba z?SgV6#5ZF^F7*2oO4vy5(d|)(Q&~@c5Fe9zqdkjnL&RYQCC-;_3vV z2z+#6!N_||YL|bpsj zo!B{cVzyh#a#Ih{e55J$KvhG0cP{6!07f*jq=yaa40D88aaTyYzlpmiJvwyE@aWRh zwsjIIBvr5aB_33?p=Fu;3zzP)IvO^nE}RIBLS}VX+#c@5=>WC4CU;)P%na`%E3wgU z%?4{PoR0_ObrRo$wuT>h@4U4aCb)Zp7(d$>h=88E6w$4TL%qEZ4%e!21RH%@$314f zv%)s_>%k!7cR~M}1bsjl*fPDjEpq&YBL!8hmCZh!uva*+y@JhDno(4EoE>y3Gn#2QQ(D7&V44q;uu9YYx^ywq$b<*6Zf zc|1(&v80*+^9;#uD24Lq`JW~K7!Oo^3T3FE%F=l+ZJl(~Qa^hsro~BofjQ1%&YhT= zqC@Yvmh)y-L!;R-IXR?>w*4q=_366hY-ZK`ryf;2E$=z&;P2FG+FVXmna3VMOA^db z;fuv!a!W2pNZ})OCgE$lniOMCHiUheVMrHWIKgy(QjLDL=Z5q|@^2r^t=`YeF?$WR zQk{0#PZ}J@z6jWeciyEZWfjU_tWnRV<;9az*s)V>*1S&xt^B&g(@s?}EVa#kiR>@5 zIKNO9xKMN9Va`1V_)MnGOj}N=i^L;1o+SBUJ+-`mMyfnv>OIz|> z-;=O~NB8qZ*CH_Px@wzRT`No?2fDY~X5+(anWAfk2A+lu&8O6l_+H0ez9i?teXgr( zV2sg;Yl%CB!fa9el-0laP0mP|z)kr?)C-EE ) } + { + this.state.selectedDataSource.value === 'notion' && ( + + 1. {'Go to notion.so/my-integrations -> Create an integration and copy its token -> Give it read access capabilities (users included)'} + 2. {'Add your integration to high authority page so that it has access to all of its children.'} + 3. {'Paste the token'} + + ) + }
From 6ce188155d5c35ba2c9a2bd677439e2ebe383bbe Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Sun, 4 Jun 2023 23:12:30 +0200 Subject: [PATCH 2/9] tested successfully page indexing --- app/data_source/sources/notion/notion.py | 29 ++++++++++++------------ 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/app/data_source/sources/notion/notion.py b/app/data_source/sources/notion/notion.py index 1db1095..ae0a173 100644 --- a/app/data_source/sources/notion/notion.py +++ b/app/data_source/sources/notion/notion.py @@ -149,11 +149,9 @@ def __init__(self, *args, **kwargs): ) self.data_source_id = "DUMMY_SOURCE_ID" - @staticmethod def _parse_rich_text(self, rich_text: list): return "\n".join([text["plain_text"] for text in rich_text]) - @staticmethod def _parse_content_from_blocks(self, notion_blocks): return "\n".join( [ @@ -163,13 +161,17 @@ def _parse_content_from_blocks(self, notion_blocks): ] ) + def _parse_title(self, page): + title_prop = next(prop for prop in page["properties"] if page["properties"][prop]["type"] == "title") + return self._parse_rich_text(page["properties"][title_prop]["title"]) + def _parse_content_from_page(self, page): metadata_list = [ f"{prop}: {self._parse_rich_text(page['properties'][prop].get('rich_text',''))}" for prop in page["properties"] if prop != "Name" ] - title = f"Title: {self._parse_rich_text(page['properties']['Name']['title'])}" + title = f"Title: {self._parse_title(page)}" metadata = "\n".join([title] + metadata_list) page_blocks = self._notion_client.list_blocks(page["id"]) blocks_content = self._parse_content_from_blocks(page_blocks) @@ -182,19 +184,9 @@ def _parse_content_from_page(self, page): "title": title, "location": title, "content": metadata + blocks_content, - "timestamp": page["created_time"], + "timestamp": datetime.strptime(page["last_edited_time"], "%Y-%m-%dT%H:%M:%S.%fZ"), } - def _fetch_page(self, page): - page_data = self._parse_content_from_page(page) - logger.info(f"Indexing page {page_data['id']}") - document = BasicDocument( - data_source_id=self._data_source_id, - document_type=DocumentType.DOCUMENT, - **page_data, - ) - IndexQueue.get_instance().put_single(document) - def _feed_new_documents(self) -> None: pages = self._notion_client.list_pages() for page in pages: @@ -203,4 +195,11 @@ def _feed_new_documents(self) -> None: # skipping already indexed pages continue - self._add_task(self.fetch_page, page) + page_data = self._parse_content_from_page(page) + logger.info(f"Indexing page {page_data['id']}") + document = BasicDocument( + data_source_id=self._data_source_id, + type=DocumentType.DOCUMENT, + **page_data, + ) + IndexQueue.get_instance().put_single(document) From 755265f7b149bb90e5b12c81baf976a858d529fd Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Mon, 5 Jun 2023 15:03:09 +0200 Subject: [PATCH 3/9] successfully built --- deploy.sh | 8 ++++---- docker-compose.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/deploy.sh b/deploy.sh index 7b775e0..c78579c 100755 --- a/deploy.sh +++ b/deploy.sh @@ -10,9 +10,9 @@ cd .. mkdir -p $HOME/.gerev/.buildx-cache -sudo docker buildx create --use -sudo docker buildx build --platform linux/amd64,linux/arm64 \ +docker buildx create --use +docker buildx build --platform linux/amd64,linux/arm64 \ --cache-from type=local,src=$HOME/.gerev/.buildx-cache \ --cache-to type=local,dest=$HOME/.gerev/.buildx-cache \ - -t gerev/gerev:$VERSION . \ - -t gerev/gerev:latest --push + -t us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:$VERSION . \ + -t us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:latest --push diff --git a/docker-compose.yaml b/docker-compose.yaml index 87efb9a..7772da1 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: gerev: - image: gerev:latest + image: us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops:latest ports: - 80:80 volumes: From 7455538499a3da79bf33a1ac25e9d1b9c0900ebd Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Mon, 5 Jun 2023 15:22:57 +0200 Subject: [PATCH 4/9] fixed docker compose missing gerev name --- docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index 7772da1..8e00348 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: gerev: - image: us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops:latest + image: us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:latest ports: - 80:80 volumes: From 54eff691b18728a0c689e17bc06751d1a407f5be Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Mon, 5 Jun 2023 17:50:31 +0200 Subject: [PATCH 5/9] fixed pagination and unkown authors --- app/data_source/sources/notion/notion.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/data_source/sources/notion/notion.py b/app/data_source/sources/notion/notion.py index ae0a173..a29239b 100644 --- a/app/data_source/sources/notion/notion.py +++ b/app/data_source/sources/notion/notion.py @@ -89,7 +89,7 @@ def list_objects(self, notion_object: NotionObject): response = self.session.post(url, json=filter_data) results = response.json()["results"] while response.json()["has_more"] is True: - response = self.session.post(url, json=filter_data, params={"start_cursor": response.json()["next_cursor"]}) + response = self.session.post(url, json={"start_cursor": response.json()["next_cursor"], **filter_data}) results.extend(response.json()["results"]) return results @@ -178,8 +178,8 @@ def _parse_content_from_page(self, page): author = self._notion_client.get_user(page["created_by"]["id"]) return { "id": page["id"], - "author": author["name"], - "author_image_url": author["avatar_url"], + "author": author.get("name", ""), + "author_image_url": author.get("avatar_url", ""), "url": page["url"], "title": title, "location": title, From 9482acd26d11a679299e5a18bcbcb33dd838e7b0 Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Tue, 6 Jun 2023 11:25:10 +0200 Subject: [PATCH 6/9] fixed notion data source --- app/data_source/sources/slack/slack.py | 97 ++++++++++++++------------ 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/app/data_source/sources/slack/slack.py b/app/data_source/sources/slack/slack.py index 332700b..d9e7593 100644 --- a/app/data_source/sources/slack/slack.py +++ b/app/data_source/sources/slack/slack.py @@ -3,16 +3,15 @@ import time from dataclasses import dataclass from http.client import IncompleteRead -from typing import Optional, Dict, List +from typing import Dict, List, Optional +from data_source.api.base_data_source import BaseDataSource, BaseDataSourceConfig, ConfigField, HTMLInputType +from data_source.api.basic_document import BasicDocument, DocumentType +from queues.index_queue import IndexQueue from retry import retry from slack_sdk import WebClient from slack_sdk.errors import SlackApiError -from data_source.api.base_data_source import BaseDataSource, ConfigField, HTMLInputType, BaseDataSourceConfig -from data_source.api.basic_document import DocumentType, BasicDocument -from queues.index_queue import IndexQueue - logger = logging.getLogger(__name__) @@ -37,9 +36,7 @@ class SlackDataSource(BaseDataSource): @staticmethod def get_config_fields() -> List[ConfigField]: - return [ - ConfigField(label="Bot User OAuth Token", name="token", type=HTMLInputType.PASSWORD) - ] + return [ConfigField(label="Bot User OAuth Token", name="token", type=HTMLInputType.PASSWORD)] @staticmethod async def validate_config(config: Dict) -> None: @@ -49,7 +46,7 @@ async def validate_config(config: Dict) -> None: @staticmethod def _is_valid_message(message: Dict) -> bool: - return 'client_msg_id' in message or 'bot_id' in message + return "client_msg_id" in message or "bot_id" in message def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -59,8 +56,7 @@ def __init__(self, *args, **kwargs): def _list_conversations(self) -> List[SlackConversation]: conversations = self._slack.conversations_list(exclude_archived=True, limit=1000) - return [SlackConversation(id=conv['id'], name=conv['name']) - for conv in conversations['channels']] + return [SlackConversation(id=conv["id"], name=conv["name"]) for conv in conversations["channels"]] def _feed_conversations(self, conversations: List[SlackConversation]) -> List[SlackConversation]: joined_conversations = [] @@ -68,11 +64,11 @@ def _feed_conversations(self, conversations: List[SlackConversation]) -> List[Sl for conv in conversations: try: result = self._slack.conversations_join(channel=conv.id) - if result['ok']: - logger.info(f'Joined channel {conv.name}, adding a fetching task...') + if result["ok"]: + logger.info(f"Joined channel {conv.name}, adding a fetching task...") self.add_task_to_queue(self._feed_conversation, conv=conv) except Exception as e: - logger.warning(f'Could not join channel {conv.name}: {e}') + logger.warning(f"Could not join channel {conv.name}: {e}") return joined_conversations @@ -80,22 +76,21 @@ def _get_author_details(self, author_id: str) -> SlackAuthor: author = self._authors_cache.get(author_id, None) if author is None: author_info = self._slack.users_info(user=author_id) - user = author_info['user'] - name = user.get('real_name') or user.get('name') or user.get('profile', {}).get('display_name') or 'Unknown' - author = SlackAuthor(name=name, - image_url=author_info['user']['profile']['image_72']) + user = author_info["user"] + name = user.get("real_name") or user.get("name") or user.get("profile", {}).get("display_name") or "Unknown" + author = SlackAuthor(name=name, image_url=author_info["user"]["profile"]["image_72"]) self._authors_cache[author_id] = author return author def _feed_new_documents(self) -> None: conversations = self._list_conversations() - logger.info(f'Found {len(conversations)} conversations') + logger.info(f"Found {len(conversations)} conversations") self._feed_conversations(conversations) def _feed_conversation(self, conv: SlackConversation): - logger.info(f'Feeding conversation {conv.name}') + logger.info(f"Feeding conversation {conv.name}") last_msg: Optional[BasicDocument] = None @@ -107,13 +102,13 @@ def _feed_conversation(self, conv: SlackConversation): last_msg = None continue - text = message['text'] - if author_id := message.get('user'): + text = message["text"] + if author_id := message.get("user"): author = self._get_author_details(author_id) - elif message.get('bot_id'): - author = SlackAuthor(name=message.get('username'), image_url=message.get('icons', {}).get('image_48')) + elif message.get("bot_id"): + author = SlackAuthor(name=message.get("username"), image_url=message.get("icons", {}).get("image_48")) else: - logger.warning(f'Unknown message author: {message}') + logger.warning(f"Unknown message author: {message}") continue if last_msg is not None: @@ -124,15 +119,22 @@ def _feed_conversation(self, conv: SlackConversation): IndexQueue.get_instance().put_single(doc=last_msg) last_msg = None - timestamp = message['ts'] - message_id = message.get('client_msg_id') or timestamp + timestamp = message["ts"] + message_id = message.get("client_msg_id") or timestamp readable_timestamp = datetime.datetime.fromtimestamp(float(timestamp)) message_url = f"https://slack.com/app_redirect?channel={conv.id}&message_ts={timestamp}" - last_msg = BasicDocument(title=author.name, content=text, author=author.name, - timestamp=readable_timestamp, id=message_id, - data_source_id=self._data_source_id, location=conv.name, - url=message_url, author_image_url=author.image_url, - type=DocumentType.MESSAGE) + last_msg = BasicDocument( + title=author.name, + content=text, + author=author.name, + timestamp=readable_timestamp, + id=message_id, + data_source_id=self._data_source_id, + location=conv.name, + url=message_url, + author_image_url=author.image_url, + type=DocumentType.MESSAGE, + ) if last_msg is not None: IndexQueue.get_instance().put_single(doc=last_msg) @@ -140,19 +142,19 @@ def _feed_conversation(self, conv: SlackConversation): @retry(tries=5, delay=1, backoff=2, logger=logger) def _get_conversation_history(self, conv: SlackConversation, cursor: str, last_index_unix: str): try: - return self._slack.conversations_history(channel=conv.id, oldest=last_index_unix, - limit=1000, cursor=cursor) + return self._slack.conversations_history(channel=conv.id, oldest=last_index_unix, limit=1000, cursor=cursor) except SlackApiError as e: - logger.warning(f'SlackApi error while fetching messages for conversation {conv.name}: {e}') + logger.warning(f"SlackApi error while fetching messages for conversation {conv.name}: {e}") response = e.response - if response['error'] == 'ratelimited': - retry_after_seconds = int(response['headers']['Retry-After']) - logger.warning(f'Rate-limited: Slack API rate limit exceeded,' - f' retrying after {retry_after_seconds} seconds') + if response["error"] == "ratelimited": + retry_after_seconds = int(response["headers"]["Retry-After"]) + logger.warning( + f"Rate-limited: Slack API rate limit exceeded," f" retrying after {retry_after_seconds} seconds" + ) time.sleep(retry_after_seconds) raise e except IncompleteRead as e: - logger.warning(f'IncompleteRead error while fetching messages for conversation {conv.name}') + logger.warning(f"IncompleteRead error while fetching messages for conversation {conv.name}") raise e def _fetch_conversation_messages(self, conv: SlackConversation): @@ -160,19 +162,22 @@ def _fetch_conversation_messages(self, conv: SlackConversation): cursor = None has_more = True last_index_unix = self._last_index_time.timestamp() - logger.info(f'Fetching messages for conversation {conv.name}') + logger.info(f"Fetching messages for conversation {conv.name}") while has_more: try: - response = self._get_conversation_history(conv=conv, cursor=cursor, - last_index_unix=str(last_index_unix)) + response = self._get_conversation_history( + conv=conv, cursor=cursor, last_index_unix=str(last_index_unix) + ) except Exception as e: - logger.warning(f'Error fetching all messages for conversation {conv.name},' - f' returning {len(messages)} messages. Error: {e}') + logger.warning( + f"Error fetching all messages for conversation {conv.name}," + f" returning {len(messages)} messages. Error: {e}" + ) return messages logger.info(f'Fetched {len(response["messages"])} messages for conversation {conv.name}') - messages.extend(response['messages']) + messages.extend(response["messages"]) if has_more := response["has_more"]: cursor = response["response_metadata"]["next_cursor"] From b0f9d7605f15c38ee9bbfa1303e16425aa79153d Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Tue, 6 Jun 2023 15:20:15 +0200 Subject: [PATCH 7/9] removed gorgias specific deployment config --- deploy.sh | 8 ++++---- docker-compose.yaml | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/deploy.sh b/deploy.sh index c78579c..f229e53 100755 --- a/deploy.sh +++ b/deploy.sh @@ -10,9 +10,9 @@ cd .. mkdir -p $HOME/.gerev/.buildx-cache -docker buildx create --use -docker buildx build --platform linux/amd64,linux/arm64 \ +sudo docker buildx create --use +sudo docker buildx build --platform linux/amd64,linux/arm64 \ --cache-from type=local,src=$HOME/.gerev/.buildx-cache \ --cache-to type=local,dest=$HOME/.gerev/.buildx-cache \ - -t us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:$VERSION . \ - -t us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:latest --push + -t gerev/gerev:$VERSION . \ + -t gerev/gerev:latest --push \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 8e00348..87efb9a 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,6 +1,6 @@ services: gerev: - image: us-central1-docker.pkg.dev/gorgias-growth-production/growth-ops/gerev:latest + image: gerev:latest ports: - 80:80 volumes: From d3efadb2915586a9f1ed90c404a236e361e93e5a Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Tue, 6 Jun 2023 15:25:19 +0200 Subject: [PATCH 8/9] added EOL --- deploy.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deploy.sh b/deploy.sh index f229e53..7b775e0 100755 --- a/deploy.sh +++ b/deploy.sh @@ -15,4 +15,4 @@ sudo docker buildx build --platform linux/amd64,linux/arm64 \ --cache-from type=local,src=$HOME/.gerev/.buildx-cache \ --cache-to type=local,dest=$HOME/.gerev/.buildx-cache \ -t gerev/gerev:$VERSION . \ - -t gerev/gerev:latest --push \ No newline at end of file + -t gerev/gerev:latest --push From 3c371c8d4c954e0a701bb2a7fd62ecb2c738d30a Mon Sep 17 00:00:00 2001 From: Anas El Mhamdi Date: Tue, 13 Jun 2023 13:26:43 +0200 Subject: [PATCH 9/9] - added all database pages - added more retries - handling of potential errors stopping the indexing process --- app/data_source/sources/notion/notion.py | 73 ++++++++++++++++++------ 1 file changed, 55 insertions(+), 18 deletions(-) diff --git a/app/data_source/sources/notion/notion.py b/app/data_source/sources/notion/notion.py index a29239b..1721196 100644 --- a/app/data_source/sources/notion/notion.py +++ b/app/data_source/sources/notion/notion.py @@ -38,7 +38,7 @@ ) -def _notion_retry_session(token, retries=5, backoff_factor=2.0, status_forcelist=RETRY_AFTER_STATUS_CODES): +def _notion_retry_session(token, retries=10, backoff_factor=2.0, status_forcelist=RETRY_AFTER_STATUS_CODES): """Creates a retry session""" session = requests.Session() retry = Retry( @@ -78,7 +78,10 @@ def auth_check(self): def get_user(self, user_id): url = f"{self.api_url}/users/{user_id}" response = self.session.get(url) - return response.json() + try: + return response.json() + except requests.exceptions.JSONDecodeError: + return {} def list_objects(self, notion_object: NotionObject): url = f"{self.api_url}/search" @@ -101,10 +104,26 @@ def list_databases(self): def list_blocks(self, block_id: str): url = f"{self.api_url}/blocks/{block_id}/children" - response = self.session.get(url) + params = {"page_size": 100} + response = self.session.get(url, params=params) + if not response.json()["results"]: + return [] results = response.json()["results"] while response.json()["has_more"] is True: - response = self.session.get(url, params={"start_cursor": response.json()["next_cursor"]}) + response = self.session.get(url, params={"start_cursor": response.json()["next_cursor"], **params}) + results.extend(response.json()["results"]) + return results + + def list_database_pages(self, database_id: str): + url = f"{self.api_url}/databases/{database_id}/query" + filter_data = {"page_size": 100} + response = self.session.post(url, json=filter_data) + results = response.json()["results"] + while response.json()["has_more"] is True: + response = self.session.post( + url, + json={"start_cursor": response.json()["next_cursor"], **filter_data}, + ) results.extend(response.json()["results"]) return results @@ -155,9 +174,9 @@ def _parse_rich_text(self, rich_text: list): def _parse_content_from_blocks(self, notion_blocks): return "\n".join( [ - self._parse_rich_text(block["paragraph"]["rich_text"]) + self._parse_rich_text(block[block["type"]]["rich_text"]) for block in notion_blocks - if block["type"] == "paragraph" + if block[block["type"]].get("rich_text") ] ) @@ -171,8 +190,8 @@ def _parse_content_from_page(self, page): for prop in page["properties"] if prop != "Name" ] - title = f"Title: {self._parse_title(page)}" - metadata = "\n".join([title] + metadata_list) + title = f"{self._parse_title(page)}" + metadata = "\n".join([f"Title: {title}"] + metadata_list) page_blocks = self._notion_client.list_blocks(page["id"]) blocks_content = self._parse_content_from_blocks(page_blocks) author = self._notion_client.get_user(page["created_by"]["id"]) @@ -188,18 +207,36 @@ def _parse_content_from_page(self, page): } def _feed_new_documents(self) -> None: - pages = self._notion_client.list_pages() + logger.info("Fetching non database pages ...") + single_pages = self._notion_client.list_pages() + logger.info(f"Found {len(single_pages)} non database pages ...") + + logger.info("Fetching databases ...") + databases = self._notion_client.list_databases() + logger.info(f"Found {len(databases)} databases ...") + + all_database_pages = [] + for database in databases: + database_pages = self._notion_client.list_database_pages(database["id"]) + logger.info(f"Found {len(database_pages)} pages to index in database {database['id']} ...") + all_database_pages.extend(database_pages) + + pages = single_pages + all_database_pages + logger.info(f"Found {len(pages)} pages in total ...") + for page in pages: last_updated_at = datetime.strptime(page["last_edited_time"], "%Y-%m-%dT%H:%M:%S.%fZ") if last_updated_at < self._last_index_time: # skipping already indexed pages continue - - page_data = self._parse_content_from_page(page) - logger.info(f"Indexing page {page_data['id']}") - document = BasicDocument( - data_source_id=self._data_source_id, - type=DocumentType.DOCUMENT, - **page_data, - ) - IndexQueue.get_instance().put_single(document) + try: + page_data = self._parse_content_from_page(page) + logger.info(f"Indexing page {page_data['id']}") + document = BasicDocument( + data_source_id=self._data_source_id, + type=DocumentType.DOCUMENT, + **page_data, + ) + IndexQueue.get_instance().put_single(document) + except Exception as e: + logger.error(f"Failed to index page {page['id']}", exc_info=e)