6
6
It guides the user through selecting the deployment and data type and then uploads the specified files.
7
7
"""
8
8
9
+ import os
10
+ import mimetypes
9
11
from time import perf_counter
10
12
import asyncio
11
13
import streamlit as st
12
14
import requests
13
15
from requests .auth import HTTPBasicAuth
14
16
import aiohttp
15
- from aiohttp import BasicAuth
17
+ from aiohttp import BasicAuth , ClientTimeout , FormData
16
18
import nest_asyncio
19
+ from tenacity import retry , wait_fixed , stop_after_attempt
20
+
17
21
18
22
nest_asyncio .apply ()
19
23
@@ -44,7 +48,10 @@ def get_deployments(user, pwd):
44
48
return []
45
49
46
50
47
- async def get_presigned_url (user , pwd , name , bucket , dep_id , data_type , file_name ):
51
+ @retry (wait = wait_fixed (2 ), stop = stop_after_attempt (5 ))
52
+ async def get_presigned_url (
53
+ session , user , pwd , name , bucket , dep_id , data_type , file_name , file_type
54
+ ):
48
55
"""
49
56
Fetches a presigned URL for uploading a file using the provided details.
50
57
@@ -61,24 +68,22 @@ async def get_presigned_url(user, pwd, name, bucket, dep_id, data_type, file_nam
61
68
dict: The JSON response containing the presigned URL if the request is successful.
62
69
"""
63
70
url = "https://connect-apps.ceh.ac.uk/ami-data-upload/generate-presigned-url/"
64
- auth = BasicAuth (user , pwd )
65
71
66
- data = aiohttp . FormData ()
72
+ data = FormData ()
67
73
data .add_field ("name" , name )
68
74
data .add_field ("country" , bucket )
69
75
data .add_field ("deployment" , dep_id )
70
76
data .add_field ("data_type" , data_type )
71
77
data .add_field ("filename" , file_name )
78
+ data .add_field ("file_type" , file_type )
72
79
73
- async with aiohttp .ClientSession (
74
- auth = auth , timeout = aiohttp .ClientTimeout (total = 600 )
75
- ) as session :
76
- async with session .post (url , data = data ) as response :
77
- response .raise_for_status ()
78
- return await response .json ()
80
+ async with session .post (url , auth = BasicAuth (user , pwd ), data = data ) as response :
81
+ response .raise_for_status ()
82
+ return await response .json ()
79
83
80
84
81
- async def upload_file_to_s3 (presigned_url , file_content , file_type ):
85
+ @retry (wait = wait_fixed (2 ), stop = stop_after_attempt (5 ))
86
+ async def upload_file_to_s3 (session , presigned_url , file_content , file_type ):
82
87
"""
83
88
Uploads a file to S3 using a presigned URL.
84
89
@@ -91,17 +96,14 @@ async def upload_file_to_s3(presigned_url, file_content, file_type):
91
96
None
92
97
"""
93
98
headers = {"Content-Type" : file_type }
94
- async with aiohttp .ClientSession (
95
- timeout = aiohttp .ClientTimeout (total = 600 )
96
- ) as session :
97
- async with session .put (
98
- presigned_url , data = file_content , headers = headers
99
- ) as response :
100
- response .raise_for_status ()
99
+ async with session .put (
100
+ presigned_url , data = file_content , headers = headers
101
+ ) as response :
102
+ response .raise_for_status ()
101
103
102
104
103
105
async def upload_files_in_batches (
104
- user , pwd , name , bucket , dep_id , data_type , files , batch_size = 50
106
+ user , pwd , name , bucket , dep_id , data_type , files , batch_size = 100
105
107
):
106
108
"""
107
109
Uploads files in batches to S3.
@@ -119,13 +121,32 @@ async def upload_files_in_batches(
119
121
Returns:
120
122
None
121
123
"""
122
- for i in range (0 , len (files ), batch_size ):
123
- end = i + batch_size
124
- batch = files [i :end ]
125
- await upload_files (user , pwd , name , bucket , dep_id , data_type , batch )
124
+ async with aiohttp .ClientSession (timeout = ClientTimeout (total = 1200 )) as session :
125
+ while True :
126
+ files_to_upload = await check_files (
127
+ session , user , pwd , name , bucket , dep_id , data_type , files
128
+ )
129
+ if not files_to_upload :
130
+ print ("All files have been uploaded successfully." )
131
+ break
132
+
133
+ if len (files_to_upload ) <= batch_size :
134
+ await upload_files (
135
+ session , user , pwd , name , bucket , dep_id , data_type , files_to_upload
136
+ )
137
+ else :
138
+ for i in range (0 , len (files_to_upload ), batch_size ):
139
+ end = i + batch_size
140
+ batch = files_to_upload [i :end ]
141
+ await upload_files (
142
+ session , user , pwd , name , bucket , dep_id , data_type , batch
143
+ )
126
144
145
+ # Update files list to only include those that still need to be checked
146
+ files = files_to_upload
127
147
128
- async def upload_files (user , pwd , name , bucket , dep_id , data_type , files ):
148
+
149
+ async def upload_files (session , user , pwd , name , bucket , dep_id , data_type , files ):
129
150
"""
130
151
Uploads multiple files to S3 by first obtaining presigned URLs and then uploading the files.
131
152
@@ -145,15 +166,62 @@ async def upload_files(user, pwd, name, bucket, dep_id, data_type, files):
145
166
for file_name , file_content , file_type in files :
146
167
try :
147
168
presigned_url = await get_presigned_url (
148
- user , pwd , name , bucket , dep_id , data_type , file_name
169
+ session ,
170
+ user ,
171
+ pwd ,
172
+ name ,
173
+ bucket ,
174
+ dep_id ,
175
+ data_type ,
176
+ file_name ,
177
+ file_type ,
149
178
)
150
- task = upload_file_to_s3 (presigned_url , file_content , file_type )
179
+ task = upload_file_to_s3 (session , presigned_url , file_content , file_type )
151
180
tasks .append (task )
152
181
except Exception as e :
153
182
st .error (f"Error getting presigned URL for { file_name } : { e } " )
154
183
await asyncio .gather (* tasks )
155
184
156
185
186
+ async def check_files (session , user , pwd , name , bucket , dep_id , data_type , files ):
187
+ """Check if files exists in the object store already."""
188
+ files_to_upload = []
189
+
190
+ for file_path in files :
191
+ if not await check_file_exist (
192
+ session , user , pwd , name , bucket , dep_id , data_type , file_path [0 ]
193
+ ):
194
+ files_to_upload .append (file_path )
195
+
196
+ return files_to_upload
197
+
198
+
199
+ async def check_file_exist (
200
+ session , user , pwd , name , bucket , dep_id , data_type , file_path
201
+ ):
202
+ """Check if files exists in the object store already."""
203
+ url = "https://connect-apps.ceh.ac.uk/ami-data-upload/check-file-exist/"
204
+ file_name , _ = get_file_info (file_path )
205
+ data = FormData ()
206
+ data .add_field ("name" , name )
207
+ data .add_field ("country" , bucket )
208
+ data .add_field ("deployment" , dep_id )
209
+ data .add_field ("data_type" , data_type )
210
+ data .add_field ("filename" , file_name )
211
+
212
+ async with session .post (url , auth = BasicAuth (user , pwd ), data = data ) as response :
213
+ response .raise_for_status ()
214
+ exist = await response .json ()
215
+ return exist ["exists" ]
216
+
217
+
218
+ def get_file_info (file_path ):
219
+ """Get file information including name, content, and type."""
220
+ filename = os .path .basename (file_path )
221
+ file_type = mimetypes .guess_type (file_path )[0 ] or "application/octet-stream"
222
+ return filename , file_type
223
+
224
+
157
225
def main (user , pwd , deployments ):
158
226
"""
159
227
The main function to handle the user interface and interaction in the Streamlit app.
@@ -330,3 +398,8 @@ def handle_upload(
330
398
331
399
if "deployments" in st .session_state :
332
400
main (username , password , st .session_state .deployments )
401
+
402
+
403
+ # To run this app, save it as app.py
404
+ # and run the following command in your terminal:
405
+ # streamlit run app.py
0 commit comments