-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_put_get_basic_operations.py
More file actions
275 lines (228 loc) · 10.6 KB
/
test_put_get_basic_operations.py
File metadata and controls
275 lines (228 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import gzip
import tempfile
from pathlib import Path
from tests.compatibility import NEW_DRIVER_ONLY, OLD_DRIVER_ONLY
from tests.e2e.put_get.put_get_helper import (
as_file_uri,
create_temporary_stage,
create_temporary_stage_and_upload_file,
get_file_from_stage,
list_stage_contents,
)
from tests.utils import shared_test_data_dir
def test_should_select_data_from_file_uploaded_to_stage(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
with connection.cursor() as cursor:
# Given File is uploaded to stage
stage_name, _ = create_temporary_stage_and_upload_file(
cursor,
"TEST_STAGE_SELECT",
test_file_path,
auto_compress=True,
overwrite=True,
)
# When File data is queried using Select command
select_sql = f"SELECT $1, $2, $3 FROM @{stage_name}"
cursor.execute(select_sql)
# Then File data should be correctly returned
row = cursor.fetchone()
assert row == ("1", "2", "3")
def test_should_list_file_uploaded_to_stage(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
with connection.cursor() as cursor:
# Given File is uploaded to stage
stage_name, _ = create_temporary_stage_and_upload_file(
cursor, "TEST_STAGE_LS", test_file_path, auto_compress=True, overwrite=True
)
# When Stage content is listed using LS command
files = list_stage_contents(cursor, stage_name)
# Then File should be listed with correct filename
assert len(files) == 1
file_info = files[0]
assert filename + ".gz" in file_info[0]
def test_should_get_file_uploaded_to_stage(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
with connection.cursor() as cursor:
# Given File is uploaded to stage
stage_name, _ = create_temporary_stage_and_upload_file(
cursor, "TEST_STAGE_GET", test_file_path, auto_compress=True, overwrite=True
)
with tempfile.TemporaryDirectory() as temp_dir:
# When File is downloaded using GET command
download_dir = Path(temp_dir)
get_result = get_file_from_stage(cursor, stage_name, filename, download_dir)
# Then File should be downloaded
assert get_result[2] == "DOWNLOADED"
downloaded_file = download_dir / (filename + ".gz")
assert downloaded_file.exists()
# And Have correct content
with gzip.open(downloaded_file, "rt") as f:
content = f.read().strip()
assert content == "1,2,3"
def test_should_return_correct_rowset_for_put(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# When File is uploaded to stage
_, upload_result = create_temporary_stage_and_upload_file(
cursor,
"TEST_STAGE_PUT_ROWSET",
test_file_path,
auto_compress=True,
overwrite=True,
)
# Then Rowset for PUT command should be correct
assert upload_result[0] == "test_data.csv"
assert upload_result[1] == "test_data.csv.gz"
assert upload_result[2] == 6
if OLD_DRIVER_ONLY("BD#1"):
assert upload_result[3] == 48
elif NEW_DRIVER_ONLY("BD#1"):
assert upload_result[3] == 32
assert upload_result[4] == "NONE"
assert upload_result[5] == "GZIP"
assert upload_result[6] == "UPLOADED"
assert upload_result[7] == ""
def test_should_return_correct_rowset_for_get(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
with connection.cursor() as cursor:
# Given File is uploaded to stage
stage_name, _ = create_temporary_stage_and_upload_file(
cursor,
"TEST_STAGE_GET_ROWSET",
test_file_path,
auto_compress=True,
overwrite=True,
)
with tempfile.TemporaryDirectory() as temp_dir:
# When File is downloaded using GET command
download_dir = Path(temp_dir)
get_result = get_file_from_stage(cursor, stage_name, filename, download_dir)
# Then Rowset for GET command should be correct
assert get_result[0] == "test_data.csv.gz"
if OLD_DRIVER_ONLY("BD#1"):
assert get_result[1] == 42
elif NEW_DRIVER_ONLY("BD#1"):
assert get_result[1] == 26
assert get_result[2] == "DOWNLOADED"
assert get_result[3] == ""
def test_should_return_correct_column_metadata_for_put(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# When File is uploaded to stage
_, upload_result = create_temporary_stage_and_upload_file(
cursor,
"TEST_STAGE_PUT_COLUMN_METADATA",
test_file_path,
auto_compress=True,
overwrite=True,
)
# Then Column metadata for PUT command should be correct
columns = cursor.description
assert columns is not None, "cursor.description should not be None after PUT"
assert len(columns) == 8, "PUT command should return 8 columns"
assert upload_result[6] == "UPLOADED"
# Verify column names and type codes (TEXT=2, FIXED=0)
expected_columns = [
("source", 2),
("target", 2),
("source_size", 0),
("target_size", 0),
("source_compression", 2),
("target_compression", 2),
("status", 2),
("message", 2),
]
for i, (expected_name, expected_type_code) in enumerate(expected_columns):
actual_name = columns[i][0].lower()
actual_type_code = columns[i][1]
assert actual_name == expected_name, f"Column {i} should be named '{expected_name}', got '{actual_name}'"
assert actual_type_code == expected_type_code, (
f"Column '{expected_name}' type_code should be {expected_type_code}, got {actual_type_code}"
)
def test_should_return_correct_column_metadata_for_get(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
with connection.cursor() as cursor:
# Given File is uploaded to stage
stage_name, _ = create_temporary_stage_and_upload_file(
cursor,
"TEST_STAGE_GET_COLUMN_METADATA",
test_file_path,
auto_compress=True,
overwrite=True,
)
# When File is downloaded using GET command
with tempfile.TemporaryDirectory() as temp_dir:
download_dir = Path(temp_dir)
get_result = get_file_from_stage(cursor, stage_name, filename, download_dir)
# Then Column metadata for GET command should be correct
columns = cursor.description
assert columns is not None, "cursor.description should not be None after GET"
assert len(columns) == 4, "GET command should return 4 columns"
assert get_result[2] == "DOWNLOADED"
# Verify column names and type codes (TEXT=2, FIXED=0)
expected_columns = [
("file", 2),
("size", 0),
("status", 2),
("message", 2),
]
for i, (expected_name, expected_type_code) in enumerate(expected_columns):
actual_name = columns[i][0].lower()
actual_type_code = columns[i][1]
assert actual_name == expected_name, (
f"Column {i} should be named '{expected_name}', got '{actual_name}'"
)
assert actual_type_code == expected_type_code, (
f"Column '{expected_name}' type_code should be {expected_type_code}, got {actual_type_code}"
)
def test_should_get_file_from_subdirectory_in_stage(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
with connection.cursor() as cursor:
# Given File is uploaded to a subdirectory in stage
stage_name = create_temporary_stage(cursor, "TEST_SUBDIR_GET")
subdir = "nested/subdir"
file_uri = as_file_uri(test_file_path)
put_command = f"PUT 'file://{file_uri}' @{stage_name}/{subdir} AUTO_COMPRESS=FALSE OVERWRITE=TRUE"
cursor.execute(put_command)
upload_result = cursor.fetchone()
assert upload_result[6] == "UPLOADED"
with tempfile.TemporaryDirectory() as temp_dir:
# When All files are downloaded from stage using GET command
download_dir = Path(temp_dir)
download_uri = as_file_uri(download_dir)
get_command = f"GET @{stage_name}/ 'file://{download_uri}/'"
cursor.execute(get_command)
get_result = cursor.fetchone()
# Then File should be downloaded flat into the local directory
assert get_result[2] == "DOWNLOADED"
downloaded_file = download_dir / filename
assert downloaded_file.exists(), (
f"Expected file at {downloaded_file}, but directory contents: {list(download_dir.rglob('*'))}"
)
# And Have correct content
content = downloaded_file.read_text().strip()
assert content == "1,2,3"
def test_should_upload_file_to_subdirectory_in_stage(connection):
test_file_path = shared_test_data_dir() / "compression" / "test_data.csv"
filename = test_file_path.name
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# When File is uploaded to a subdirectory in stage
stage_name = create_temporary_stage(cursor, "TEST_SUBDIR_UPLOAD")
subdir_path = f"@{stage_name}/nested/subdir"
file_uri = as_file_uri(test_file_path)
put_command = f"PUT 'file://{file_uri}' {subdir_path} AUTO_COMPRESS=FALSE"
cursor.execute(put_command)
upload_result = cursor.fetchone()
assert upload_result[6] == "UPLOADED"
# Then File should be listed under the subdirectory
files = list_stage_contents(cursor, stage_name)
listed_names = [row[0] for row in files]
assert any("nested/subdir" in name and filename in name for name in listed_names)