-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
272 lines (220 loc) · 8.33 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
from jina import DocumentArray
import re
import openai
import numpy as np
from jina import Document
import json
import os
def load_dataset(path='./test_db/', metric='cosine', n_dim=4096, max_connection=16, ef_search=50):
# Load or Create Annlite Index Database
data_path = path
da = DocumentArray(storage='annlite', config={
'data_path': data_path, 'n_dim': n_dim, 'metric': metric, 'max_connection': max_connection, 'ef_search': ef_search}) #connection: 48, ef_search: 100
#da.summary()
return da
def get_doc_name_for_readwise(full_text: str):
author_name = re.findall(r"- Author: \[\[(.*)\]\]", full_text)[0]
title = re.findall(r"- Full Title: (.*)", full_text)[0]
category = re.findall(r"- Category: #(.*)", full_text)[0]
if 'book' in category:
highlight = 'the book ' + '\'' + title + '\'' + ' by ' + author_name
elif 'article' in category:
highlight = 'the article ' + '\'' + title + '\''
else:
highlight = 'a thread by ' + author_name
return highlight
def extract_note_title(full_path):
sections = full_path.split('/')
filename = sections[-1]
#to remove extension
title = filename.split('.md')[0]
return title
def remove_links_2(note: str):
#remove ([....))
pattern = r'\(\[.*\)\)'
#print(note)
note = re.sub(pattern, '', note)
#print(note)
#remove urls
pattern = r'https?://\S+'
note = re.sub(pattern, '', note)
#remove locations from books
note = note.replace(u'\xa0', '')
pattern = r'\(Location\d+\)'
note = re.sub(pattern, '', note)
note = note.replace('
note = note.replace('\n', '')
note = note.replace('[[orange]]', '')
note = remove_double_space(note)
#print(note)
return note
def get_start_and_end_of_text(text):
try:
body_start = re.search(r"#\s+(.*)\n", text).end()
except:
body_start = 0
# In case ##References does not exist, take till the end of file
try:
body_end = re.search(r"#\s+Stop Indexing|##\s+References", text).start()
except:
body_end = None
return body_start, body_end
def remove_special_chars(s):
# Use a regular expression to match any of the specified characters
pattern = r"[*_>=]+"
# Replace the characters with an empty string
result = re.sub(pattern, "", s)
return result
def remove_double_space(s):
pattern = r" +"
result = re.sub(pattern, " ", s)
return result
def split_note_into_sentences(note):
sentences = []
for line in note.split('\n'):
line = line.strip()
sub_sentences = line.split('. ')
for sub_sentence in sub_sentences:
if sub_sentence != '':
#sentence should have a space, otherwise it is just a word
if " " in sub_sentence:
sentences.append(sub_sentence.strip())
return sentences
def extract_text(s):
# Use a regular expression to match the text between '![[ and ']]'
# This is to remove the embedded media in notes
pattern = r"!\[\[(.+?)\]\]"
result = re.sub(pattern, "", s)
result = remove_special_chars(result)
result = remove_double_space(result)
return result
def encode_query(openai_key: str, query: str, n_dim:int):
"""You get text and deliver back a Jina document with embedding using coherence large model
Args:
query (str): _description_
Returns:
Document: Jina AI document with embeddings
"""
if n_dim == 1536:
openai.api_key = openai_key
embeds = openai.Embedding.create(input=query, model = "text-embedding-ada-002")['data'][0]['embedding']
else:
raise ValueError("Unknown embedding dimension sent")
embeds = np.array(embeds)
embedded_query = Document(text=query)
embedded_query.embedding = embeds.reshape(n_dim, )
return embedded_query
def remove_special_characters(text):
"""
Remove all special characters from a string and leave only alphabets and numbers.
Parameters:
- text: the string to process
Returns:
- The input string with all special characters removed.
"""
# Use a regular expression to remove all non-alphabetic and non-numeric characters
return re.sub(r'[^A-Za-z0-9\s]', '', text)
def get_tokens(doc: Document):
"""Return all words in a document in lower case
Args:
doc (Document): _description_
Returns:
_type_: _description_
"""
#get name first
note = remove_special_characters(doc.text.lower()).split()
note_body = []
for chunk in doc.chunks:
note_body.extend(remove_special_characters(chunk.text.lower()).split())
note.extend(note_body)
return note
def is_file_empty(filename):
# Check if the file exists
if not os.path.exists(filename):
raise FileNotFoundError(f"File '{filename}' does not exist.")
# Check if the file is empty
return os.stat(filename).st_size == 0
def save_json(filename: str, my_dict: dict):
with open(filename, "w") as f:
json.dump(my_dict, f)
def load_json(filename: str):
my_dict = {}
# Load a dictionary from a JSON file
if not is_file_empty(filename):
with open(filename, "r") as f:
my_dict = json.load(f)
return my_dict
def find_document_by_name(database: DocumentArray, note_to_find: str):
counter = 0
#instances_found = []
with database:
while counter <= len(database):
try:
#for i, old_note in enumerate(database):
old_note = database[counter]
if note_to_find == old_note.text:
#instances_found.append((old_note, counter))
#database.remove(old_note)
#database.__delitem__(counter)
return old_note, counter
counter+=1
except Exception as e:
print(e)
counter+= 1
return None, None
def remove_old_note(database: DocumentArray, modified_note):
modified_note_text = modified_note.text
#modified_note_text = 'the book \'Brave New World\' by Aldous Huxley.'
old_note, counter = fast_find_document_by_name(database, modified_note_text)
try:
if old_note:
with database:
database.remove(old_note)
database.__delitem__(counter)
except Exception as e:
print(f'Note: {modified_note_text} might not be fully deleted')
print(e)
def load_bm25_index (filepath:str):
bm25_index = load_json(filepath)
return bm25_index
def fast_find_document_by_name(database: DocumentArray, modified_note: str):
note_names = database.texts
try:
counter = note_names.index(modified_note)
old_note = database[counter]
return old_note, counter
except ValueError:
# If document is not found
return None, None
def add_highlight(path: str):
'''It only add notes that do not have type #Conecpt or #MOC, and have a status of ✅ or 🌲
Tags are small case and have underscores between words
'''
#todo This needs to be improved! Include tags and make it faster!
if '🔖 Readwise' in path:
with open(path) as f:
full_text = f.read()
highlight_readwise = get_doc_name_for_readwise(full_text)
highlight = extract_note_title(path)
note_index_start = full_text.find('## Highlights') + len('## Highlights')
notes_text = full_text[note_index_start: ]
notes_list = notes_text.split('\n\n\n')#split('- ')
notes_list_processed = []
for i, note in enumerate(notes_list,0):
note = remove_links_2(note)
if note != '':
notes_list_processed.append(note)
return notes_list_processed, highlight , highlight_readwise
with open(path) as f:
text = f.read()
body_start, body_end = get_start_and_end_of_text(text)
#The title should be simply the name of the file not the headline
full_path = f.name
title = extract_note_title(full_path)
body = text[body_start:body_end]
#Removes special characters (bold, etc) and embedded media from text
body = extract_text(body)
note = body
highlight = title
notes_list = split_note_into_sentences(note)
return notes_list, highlight, None