-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwrite_to_airia_datastore.py
39 lines (32 loc) · 1.16 KB
/
write_to_airia_datastore.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import io
import json
import requests
# Function for uploading files
def url_creation(file_bytes, file_name="document.txt"):
url = "https://prodaus.api.airia.ai/v2/DataSource/a60d4YOUR-DATASOURCE-ID5d1a6/files"
headers = {
"X-API-Key": "ak-YOUR-AIRIA-KEY", # Replace with actual API key
}
files = {
'file': (file_name, io.BytesIO(file_bytes), 'text/plain')
}
payload = {
"userId": "2ceYOUR-USER-ID73b348",
"projectId": "b289YOUR-PROJECT-IDc349f9",
"asyncOutput": False
}
# Set the file as a plain text document
files = {'file': (file_name, file_bytes, 'text/plain')}
response = requests.post(url, headers=headers, files=files)
if response.status_code == 200:
dictionary = json.loads(response.text)
return input
else:
return f"Failed to upload the file. Status code: {response.status_code}, Response: {response.text}"
# Create the text in memory
text_content = input
# Save text to a BytesIO stream
file_stream = io.BytesIO(text_content.encode('utf-8'))
# Upload the text file
response = url_creation(file_stream.getvalue(), file_name="document.txt")
output = response