-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhelper.py
153 lines (127 loc) · 6.09 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import gc
import pandas as pd
from numpy import nan
from torch import cuda
from haystack.schema import Document
from haystack.document_stores import ElasticsearchDocumentStore
from haystack.nodes import BM25Retriever, EmbeddingRetriever, TransformersDocumentClassifier
cuda.empty_cache()
def csv_to_doc(path: str, **kwargs) -> dict[str, list[str]]:
df = pd.read_csv(path)
if 'source' in kwargs:
# TODO - mention in readme.md to remove this code if all lines have to be used.
#df = df.head(50)
content = kwargs['content']
df['meta'] = ''
df = df[[content, 'meta']]
dict_df = df.to_dict('records')
return dict_df
title = kwargs['title']
subject = kwargs['subject']
content = kwargs['content']
df[title] = df[title].replace('\d+\.\d+', '', regex=True)
df = df[[title, subject, content]]
df = (df
.fillna('No content')
.replace(nan, 'No content'))
df['meta'] = df.apply(lambda x: {'topic': x[title], 'subject': x[subject], 'content': x[content]}, axis=1).to_list()
df = df[[content, 'meta']]
df.columns = ['content', 'meta']
dict_df = df.to_dict('records')
return dict_df
def add_to_docstore(docs: dict[str, list[str]], index: str, delete_docs: bool = False):
"""
Initialize Elasticsearch document store and write the documents for given index.
"""
doc_store = ElasticsearchDocumentStore(index=index)
if delete_docs:
doc_store.delete_documents(index=index)
doc_store.write_documents(docs)
return doc_store
def classify_docs(labels:list[str], doc_store, index: str):
"""
Use Zero Shot Classification model to add labels to document.
Labels added to the metadata for each document.
"""
classifier = TransformersDocumentClassifier(task='zero-shot-classification',
model_name_or_path='MoritzLaurer/DeBERTa-v3-base-mnli-fever-anli',
labels=labels, use_gpu=True)
docs = doc_store.get_all_documents(index=index)
docs = classifier.predict(docs)
doc_store.write_documents(docs, index=index)
return doc_store
def run_pipeline(pipeline, docs:dict[str, list[str]]) -> pd.DataFrame:
"""
Run QA Generation Haystack pipeline. Remove QA pairs with answer confidence less than threshold.
Args:
pipeline: QA generation pipeline
docs: Documents from Elasticsearch doc store
Returns:
df (pd.DataFrame): DF containing generated QA and context document
"""
generated_ques = []
generated_ans = []
doc_contexts = []
#for split_docs in docs
#df_temp = pd.DataFrame(columns=['generated_question', 'generated_answer', 'document_context'])
chunk_size = 200
for chunk in range(0, len(docs), chunk_size):
chunked_docs = docs[chunk:chunk+chunk_size]
print(f'LEN: {len(chunked_docs)}')
print(f'\nWorking on docs: {str(chunk)}:{str(chunk+chunk_size)}')
results = pipeline.run(documents=chunked_docs, debug=True)
for query_content, answer_content, document_content in zip(results['queries'], results['answers'], results['documents']):
answer = answer_content[0]
document = document_content[0]
if answer.score > 0.75:
generated_ques.append(query_content)
generated_ans.append(answer.answer)
doc_contexts.append(document.content)
df = pd.DataFrame(data={'generated_question':generated_ques, 'generated_answer':generated_ans, 'document_context':doc_contexts})
df.to_csv(f'data/generated_{str(chunk)}_{str(chunk+chunk_size)}_QA.csv', index=False)
print(f'\nSaved: {str(chunk)}:{str(chunk_size)}')
del pipeline
gc.collect()
return df
def load_all_docs(topic: str) -> list[Document]:
doc_store = ElasticsearchDocumentStore(index=topic)
docs = doc_store.get_all_documents()
return docs
def load_bm25_docs(topic: str, retrieval_query: str) -> list[Document]:
doc_store = ElasticsearchDocumentStore(index=topic)
retriever = BM25Retriever(document_store=doc_store)
docs = retriever.retrieve(query=retrieval_query)
return docs
def load_embedded_docs(topic: str, emb_retrieval_query: str) -> list[Document]:
doc_store = ElasticsearchDocumentStore(index=topic, similarity='dot_product', embedding_dim=768)
retriever = EmbeddingRetriever(document_store=doc_store, embedding_model='sentence-transformers/msmarco-distilbert-base-v4',
model_format='sentence_transformers')
doc_store.update_embeddings(retriever=retriever)
docs = retriever.retrieve(query=emb_retrieval_query)
del retriever
gc.collect()
return docs
def split_labels(labels):
labels = labels.split(',')
labels = [label.strip() for label in labels]
return labels
def load_zeroshot_docs(topic: str, zero_shot_query: str) -> list[Document]:
# Clean input from dashboard
#zero_shot_classes = zero_shot_query.split(',')
#zero_shot_classes = [value.strip() for value in zero_shot_classes]
zero_shot_labels = split_labels(zero_shot_query)
# Filtering ES data
_filter = {"classification.label": zero_shot_labels}
doc_store = ElasticsearchDocumentStore(index=topic)
docs = doc_store.get_all_documents(filters=_filter)
return docs
def prepare_qa_string(df_gen_qa, **kwargs):
if kwargs['retrieval_flag']:
qa_output_str = f'BM25 - {len(df_gen_qa)} QA pairs were generated using documents filtered on "{kwargs["retrieval_flag"]}". Download the file below.'
elif kwargs['emb_retrieval_flag']:
qa_output_str = f'Embedding retrieval - {len(df_gen_qa)} QA pairs were generated using documents filtered on "{kwargs["emb_retrieval_query"]}". Download the file below.'
elif kwargs['zero_shot_flag']:
qa_output_str = f'Zero shot label - {len(df_gen_qa)} QA pairs were generated using documents filtered on "{kwargs["zero_shot_query"]}". Download the file below.'
else:
qa_output_str = f'{len(df_gen_qa)} QA pairs were generated. Download the file below.'
return qa_output_str