-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunction_app.py
228 lines (185 loc) · 7.81 KB
/
function_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import azure.functions as func
import logging
from dotenv import load_dotenv
# import numpy
import pandas as pd
from io import StringIO, BytesIO
import os
import fitz # PyMuPDF for PDF processing
import json
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions
import whisper # OpenAI Whisper for audio transcription
import tempfile
import ffmpeg
# IMAGE
from PIL import Image # Import for image processing
import pytesseract # Import for OCR
# Load environment variables from .env file for secure configuration
load_dotenv()
# Initialize Azure Functions app with function-level authentication
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
@app.route(route="http_trigger")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python HTTP trigger function processed a request.')
name = req.params.get('name')
if not name:
try:
req_body = req.get_json()
except ValueError:
pass
else:
name = req_body.get('name')
if name:
return func.HttpResponse(f"Hello, {name}. This HTTP triggered function executed successfully.")
else:
return func.HttpResponse(
"This HTTP triggered function executed successfully. Pass a name in the query string or in the request body for a personalized response.",
status_code=200
)
# Environment variable for Azure Blob Storage connection string
BLOB_STORAGE_CONNECTION_STRING = os.environ.get('BLOB_STORAGE_CONNECTION_STRING')
# Name of the container where business files are stored
container_name = 'business-labs'
@app.route(route="get_files_list")
def get_files_list(req: func.HttpRequest) -> func.HttpResponse:
"""
HTTP Trigger function to retrieve a list of files in the specified Azure Blob Storage container.
Responses:
- 200 OK: JSON array of file names in the container.
- 500 Internal Server Error: Error message if file retrieval fails.
Example Request:
- GET /api/get_files_list
Example Response:
- ["file1.csv", "file2.pdf"]
"""
try:
blob_service_client = BlobServiceClient.from_connection_string(BLOB_STORAGE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(container_name)
blob_list = container_client.list_blobs()
files_list = [blob.name for blob in blob_list]
return func.HttpResponse(
json.dumps(files_list, indent=4, sort_keys=True),
mimetype="application/json",
status_code=200
)
except Exception as e:
logging.error(f"Failed to list files: {e}")
return func.HttpResponse(
f"Error: {e}",
status_code=500
)
async def get_audio_transcription(audio_path):
"""
Transcribes audio file content to text using OpenAI Whisper model.
Parameters:
- audio_path (str): Path to the audio file.
Returns:
- str: Transcribed text from the audio file.
"""
# Ensure that ffmpeg can properly handle the audio file format
audio_file = audio_path
temp_audio_file = f"{audio_path}.wav" # Converting to WAV for Whisper compatibility
try:
# Convert audio file to WAV format using ffmpeg
ffmpeg.input(audio_file).output(temp_audio_file).run(overwrite_output=True)
# Use Whisper model to transcribe the converted audio file
model = whisper.load_model("base")
result = model.transcribe(temp_audio_file)
os.remove(temp_audio_file) # Clean up the temporary WAV file
return result['text']
except Exception as e:
logging.error(f"Error in audio transcription: {e}")
raise
@app.route(route="get_file_context")
async def get_file_context(req: func.HttpRequest) -> func.HttpResponse:
"""
HTTP Trigger function to extract and return file content from Azure Blob Storage.
Supported File Types:
- CSV: Returns first 600 rows of the file.
- PDF: Returns up to the first 5000 words.
- Audio (MP3/WAV): Returns transcription.
Query Parameter:
- file (str): Required; the name of the file to process.
Responses:
- 200 OK: JSON with file content.
- 400 Bad Request: If the file name is missing.
- 403 Forbidden: Unsupported file type.
- 500 Internal Server Error: Error message if processing fails.
"""
try:
file_name = req.params.get('file')
if not file_name:
return func.HttpResponse(
"File name parameter is missing.",
status_code=400
)
blob_service_client = BlobServiceClient.from_connection_string(BLOB_STORAGE_CONNECTION_STRING)
container_client = blob_service_client.get_container_client(container_name)
blob_client = container_client.get_blob_client(file_name)
context_data = ''
if file_name.endswith('.csv'):
blob_data = blob_client.download_blob().content_as_text()
csv_data = pd.read_csv(StringIO(blob_data))
context_data = csv_data.head(300).to_string(index=False)
elif file_name.endswith('.pdf'):
blob_pdf_data = blob_client.download_blob().readall()
doc = fitz.open(stream=BytesIO(blob_pdf_data), filetype="pdf")
full_text = ""
for page_num in range(len(doc)):
page = doc[page_num]
full_text += page.get_text()
if len(full_text.split()) > 5000:
break
doc.close()
words = full_text.split()
first_5000_words = ' '.join(words[:5000])
context_data = first_5000_words
elif file_name.endswith('.mp3') or file_name.endswith('.wav'):
with tempfile.NamedTemporaryFile(delete=False) as temp_audio:
with open(temp_audio.name, "wb") as audio_file:
download_stream = blob_client.download_blob()
audio_file.write(download_stream.readall())
transcription = await get_audio_transcription(temp_audio.name)
os.remove(temp_audio.name)
context_data = transcription
elif file_name.endswith('.jpg') or file_name.endswith('.jpeg'):
blob_image_data = blob_client.download_blob().readall()
image = Image.open(BytesIO(blob_image_data))
context_data = pytesseract.image_to_string(image)
else:
return func.HttpResponse(
'Unsupported file type.',
mimetype="application/json",
status_code=403
)
return func.HttpResponse(
json.dumps(context_data),
mimetype="application/json",
status_code=200
)
except Exception as e:
logging.error(f"Failed to process file content: {e}")
return func.HttpResponse(
f"Error: {str(e)}",
status_code=500
)
@app.route(route='trigger-api')
def test_trigger(req: func.HttpRequest) -> func.HttpResponse:
# """
# Simple HTTP Trigger function for testing API endpoint.
# Responses:
# - 200 OK: Confirms API was triggered successfully.
# Example Request:
# - GET /api/trigger-api
# Example Response:
# - "API Triggered successfully!"
# """
logging.info('Python HTTP trigger function processed a request.')
return func.HttpResponse(
"API Triggered successfully!",
status_code=200
)
# @app.route(route='context-with-rag')
# def generate_rag_context(req: func.HttpRequest) -> func.HttpResponse:
# logging.info('RAG implementation for context retrieval triggered')
# return