-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtest_put_get_source_compression.py
More file actions
293 lines (246 loc) · 11 KB
/
test_put_get_source_compression.py
File metadata and controls
293 lines (246 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from pathlib import Path
import pytest
from tests.compatibility import NEW_DRIVER_ONLY, OLD_DRIVER_ONLY
from tests.e2e.put_get.put_get_helper import (
as_file_uri,
create_temporary_stage,
)
from tests.utils import shared_test_data_dir
@pytest.mark.parametrize(
"expected_compression,filename",
[
("GZIP", "test_data.csv.gz"),
("BZIP2", "test_data.csv.bz2"),
("BROTLI", "test_data.csv.br"),
("ZSTD", "test_data.csv.zst"),
("DEFLATE", "test_data.csv.deflate"),
],
)
def test_should_auto_detect_standard_compression_types_when_source_compression_set_to_auto_detect(
connection, expected_compression, filename
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And File with standard type (GZIP, BZIP2, BROTLI, ZSTD, DEFLATE)
stage_name, test_file_path = create_stage_and_get_compression_file(
cursor, f"TEST_STAGE_{expected_compression}", expected_compression
)
# When File is uploaded with SOURCE_COMPRESSION set to AUTO_DETECT
put_command = f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} SOURCE_COMPRESSION=AUTO_DETECT"
cursor.execute(put_command)
result = cursor.fetchone()
# Then Target compression has correct type and all PUT results are correct
if expected_compression == "DEFLATE":
if OLD_DRIVER_ONLY("BD#2"):
expected_target = f"{filename}.gz"
assert_put_compression_result(
result,
filename,
"NONE",
expected_target,
"GZIP",
)
elif NEW_DRIVER_ONLY("BD#2"):
assert_put_compression_result(
result,
filename,
expected_compression,
filename,
expected_compression,
)
else:
assert_put_compression_result(
result,
filename,
expected_compression,
filename,
expected_compression,
)
@pytest.mark.parametrize(
"compression,filename",
[
("GZIP", "test_data.csv.gz"),
("BZIP2", "test_data.csv.bz2"),
("BROTLI", "test_data.csv.br"),
("ZSTD", "test_data.csv.zst"),
("DEFLATE", "test_data.csv.deflate"),
("RAW_DEFLATE", "test_data.csv.raw_deflate"),
],
)
def test_should_upload_compressed_files_with_source_compression_set_to_explicit_types(
connection, compression, filename
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And File with standard type (GZIP, BZIP2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE)
stage_name, test_file_path = create_stage_and_get_compression_file(
cursor, f"TEST_STAGE_{compression}", compression
)
# When File is uploaded with SOURCE_COMPRESSION set to explicit type
put_command = f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} SOURCE_COMPRESSION={compression}"
if compression == "BROTLI":
if OLD_DRIVER_ONLY("BD#3"):
with pytest.raises(Exception) as exc_info:
cursor.execute(put_command)
assert "253007" in str(exc_info.value)
assert "Feature is not supported" in str(exc_info.value)
return
elif NEW_DRIVER_ONLY("BD#3"):
cursor.execute(put_command)
result = cursor.fetchone()
else:
cursor.execute(put_command)
result = cursor.fetchone()
# Then Target compression has correct type and all PUT results are correct
assert_put_compression_result(result, filename, compression, filename, compression)
def test_should_not_compress_file_when_source_compression_set_to_auto_detect_and_auto_compress_set_to_false(
connection,
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And Uncompressed file
stage_name, test_file_path = create_stage_and_get_compression_file(
cursor, "TEST_STAGE_NONE_NO_AUTO_COMPRESS", "NONE"
)
filename = "test_data.csv"
# When File is uploaded with SOURCE_COMPRESSION set to AUTO_DETECT and AUTO_COMPRESS set to FALSE
put_command = (
f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} "
"SOURCE_COMPRESSION=AUTO_DETECT AUTO_COMPRESS=FALSE"
)
cursor.execute(put_command)
result = cursor.fetchone()
# Then File is not compressed
assert_put_compression_result(result, filename, "NONE", filename, "NONE")
def test_should_not_compress_file_when_source_compression_set_to_none_and_auto_compress_set_to_false(
connection,
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And Uncompressed file
stage_name, test_file_path = create_stage_and_get_compression_file(
cursor, "TEST_STAGE_NONE_NO_AUTO_COMPRESS", "NONE"
)
filename = "test_data.csv"
# When File is uploaded with SOURCE_COMPRESSION set to NONE and AUTO_COMPRESS set to FALSE
put_command = (
f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} SOURCE_COMPRESSION=NONE AUTO_COMPRESS=FALSE"
)
cursor.execute(put_command)
result = cursor.fetchone()
# Then File is not compressed
assert_put_compression_result(result, filename, "NONE", filename, "NONE")
def test_should_compress_uncompressed_file_when_source_compression_set_to_auto_detect_and_auto_compress_set_to_true(
connection,
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And Uncompressed file
stage_name, test_file_path = create_stage_and_get_compression_file(cursor, "TEST_STAGE_AUTO_COMPRESS", "NONE")
filename = "test_data.csv"
# When File is uploaded with SOURCE_COMPRESSION set to AUTO_DETECT and AUTO_COMPRESS set to TRUE
put_command = (
f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} "
"SOURCE_COMPRESSION=AUTO_DETECT AUTO_COMPRESS=TRUE"
)
cursor.execute(put_command)
result = cursor.fetchone()
# Then Target compression has GZIP type and all PUT results are correct
expected_target = f"{filename}.gz"
assert_put_compression_result(result, filename, "NONE", expected_target, "GZIP")
def test_should_compress_uncompressed_file_when_source_compression_set_to_none_and_auto_compress_set_to_true(
connection,
):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And Uncompressed file
stage_name, test_file_path = create_stage_and_get_compression_file(
cursor, "TEST_STAGE_NONE_AUTO_COMPRESS", "NONE"
)
filename = "test_data.csv"
# When File is uploaded with SOURCE_COMPRESSION set to NONE and AUTO_COMPRESS set to TRUE
put_command = (
f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} SOURCE_COMPRESSION=NONE AUTO_COMPRESS=TRUE"
)
cursor.execute(put_command)
result = cursor.fetchone()
# Then Target compression has GZIP type and all PUT results are correct
expected_target = f"{filename}.gz"
assert_put_compression_result(result, filename, "NONE", expected_target, "GZIP")
def test_should_return_error_for_unsupported_compression_type(connection):
# Given Snowflake client is logged in
with connection.cursor() as cursor:
# And File compressed with unsupported format
stage_name, test_file_path = create_stage_and_get_compression_file(cursor, "TEST_STAGE_UNSUPPORTED", "LZMA")
# When File is uploaded with SOURCE_COMPRESSION set to AUTO_DETECT
put_command = f"PUT 'file://{as_file_uri(test_file_path)}' @{stage_name} SOURCE_COMPRESSION=AUTO_DETECT"
# Then Unsupported compression error is thrown
with pytest.raises(Exception) as exc_info:
cursor.execute(put_command)
if NEW_DRIVER_ONLY("BD#4"):
assert "Unsupported compression type" in str(exc_info.value)
if OLD_DRIVER_ONLY("BD#4"):
assert "253007" in str(exc_info.value)
assert "Feature is not supported" in str(exc_info.value)
def get_compression_test_file_path(compression_type: str) -> Path:
"""
Get the path for a test file with the specified compression type.
Args:
compression_type: Compression type (GZIP, BZIP2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, LZMA)
Returns:
Path: Path to the test file
Raises:
ValueError: If compression type is not supported
"""
compression_map = {
"GZIP": "test_data.csv.gz",
"BZIP2": "test_data.csv.bz2",
"BROTLI": "test_data.csv.br",
"ZSTD": "test_data.csv.zst",
"DEFLATE": "test_data.csv.deflate",
"RAW_DEFLATE": "test_data.csv.raw_deflate",
"NONE": "test_data.csv",
"LZMA": "test_data.csv.xz",
}
filename = compression_map.get(compression_type.upper())
if not filename:
raise ValueError(f"Unsupported compression type: {compression_type}")
return shared_test_data_dir() / "compression" / filename
def create_stage_and_get_compression_file(cursor, stage_prefix: str, compression_type: str):
"""
Create a temporary stage and get the compression test file path.
Args:
cursor: Database cursor to use
stage_prefix: Prefix for the temporary stage name
compression_type: Compression type (GZIP, BZIP2, BROTLI, ZSTD, DEFLATE, RAW_DEFLATE, NONE, LZMA)
Returns:
tuple: (stage_name, test_file_path)
"""
stage_name = create_temporary_stage(cursor, stage_prefix)
test_file_path = get_compression_test_file_path(compression_type)
return stage_name, test_file_path
def assert_put_compression_result(
result,
expected_source: str,
expected_source_compression: str,
expected_target: str,
expected_target_compression: str,
):
"""
Assert that PUT result matches expected compression values.
Args:
result: PUT command result row
expected_source: Expected source filename
expected_source_compression: Expected source compression type
expected_target: Expected target filename
expected_target_compression: Expected target compression type
"""
assert result[0] == expected_source, f"Source should be '{expected_source}', got '{result[0]}'"
assert result[1] == expected_target, f"Target should be '{expected_target}', got '{result[1]}'"
assert result[4] == expected_source_compression, (
f"Source compression should be '{expected_source_compression}', got '{result[4]}'"
)
assert result[5] == expected_target_compression, (
f"Target compression should be '{expected_target_compression}', got '{result[5]}'"
)
assert result[6] == "UPLOADED", f"Status should be 'UPLOADED', got '{result[6]}'"