Skip to content

Commit 5a10fa2

Browse files
committed
refine compress, add test
1 parent f8b2af1 commit 5a10fa2

File tree

6 files changed

+164
-91
lines changed

6 files changed

+164
-91
lines changed

aliyun/log/compress.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
2+
import zlib
3+
from enum import Enum
4+
from .logexception import LogException
5+
from .util import Util
6+
7+
import lz4.block
8+
9+
10+
def lz_decompress(raw_size, data):
11+
return lz4.block.decompress(data, uncompressed_size=raw_size)
12+
13+
14+
def lz_compresss(data):
15+
return lz4.block.compress(data)[4:]
16+
17+
18+
class CompressType(Enum):
19+
UNCOMPRESSED = 1
20+
LZ4 = 2
21+
22+
@staticmethod
23+
def default_compress_type():
24+
return CompressType.LZ4
25+
26+
def __str__(self):
27+
if self == CompressType.LZ4:
28+
return 'lz4'
29+
return ''
30+
31+
32+
class Compressor():
33+
@staticmethod
34+
def compress(data, compress_type):
35+
# type: (bytes, CompressType) -> bytes
36+
if compress_type == CompressType.LZ4:
37+
return lz_compresss(data)
38+
39+
if compress_type == CompressType.UNCOMPRESSED:
40+
return data
41+
42+
raise LogException('UnknownCompressType',
43+
'Unknown compress type: ' + compress_type)
44+
45+
@staticmethod
46+
def decompress(data, raw_size, compress_type):
47+
# type: (bytes, int, CompressType) -> bytes
48+
if compress_type == CompressType.LZ4:
49+
return lz_decompress(raw_size, data)
50+
51+
if compress_type == CompressType.UNCOMPRESSED:
52+
return data
53+
54+
raise LogException('UnknownCompressType',
55+
'Unknown compress type: ' + compress_type)
56+
57+
@staticmethod
58+
def decompress_response(header, response):
59+
# type: (dict, bytes) -> bytes
60+
compress_type_str = Util.h_v_td(
61+
header, 'x-log-compresstype', '').lower()
62+
63+
if compress_type_str == 'lz4':
64+
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
65+
return Compressor.decompress(response, raw_size, CompressType.LZ4)
66+
67+
if compress_type_str == '':
68+
return response
69+
70+
if compress_type_str in ('gzip', 'deflate'):
71+
return zlib.decompress(response)
72+
73+
raise LogException('UnknownCompressType', 'Unknown compress type: ' +
74+
compress_type_str, resp_header=header, resp_body=response)

aliyun/log/logclient.py

+22-44
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@
6262
from .export_response import *
6363
from .common_response import *
6464
from .auth import *
65+
from .compress import CompressType, Compressor
6566

6667
logger = logging.getLogger(__name__)
6768

6869
if six.PY3:
6970
xrange = range
7071

71-
lz4_available = Util.is_lz4_available()
72-
if lz4_available:
73-
from .util import lz_decompress, lz_compresss
7472

7573
CONNECTION_TIME_OUT = 120
7674
MAX_LIST_PAGING_SIZE = 500
@@ -384,13 +382,11 @@ def put_log_raw(self, project, logstore, log_group, compress=None):
384382
raw_body_size = len(body)
385383
headers = {'x-log-bodyrawsize': str(raw_body_size), 'Content-Type': 'application/x-protobuf'}
386384

387-
if compress is None or compress:
388-
if lz4_available:
389-
headers['x-log-compresstype'] = 'lz4'
390-
body = lz_compresss(body)
391-
else:
392-
headers['x-log-compresstype'] = 'deflate'
393-
body = zlib.compress(body)
385+
need_compress = compress is None or compress
386+
if need_compress:
387+
compress_type = CompressType.default_compress_type()
388+
headers['x-log-compresstype'] = str(compress_type)
389+
body = Compressor.compress(body, compress_type)
394390

395391
params = {}
396392
resource = '/logstores/' + logstore + "/shards/lb"
@@ -445,16 +441,12 @@ def put_logs(self, request):
445441
len(body) / 1024.0 / 1024))
446442

447443
headers = {'x-log-bodyrawsize': str(len(body)), 'Content-Type': 'application/x-protobuf'}
448-
is_compress = request.get_compress()
449-
450-
compress_data = None
451-
if is_compress:
452-
if lz4_available:
453-
headers['x-log-compresstype'] = 'lz4'
454-
compress_data = lz_compresss(body)
455-
else:
456-
headers['x-log-compresstype'] = 'deflate'
457-
compress_data = zlib.compress(body)
444+
445+
need_compress = request.get_compress() is None or request.get_compress()
446+
if need_compress:
447+
compress_type = CompressType.default_compress_type()
448+
headers['x-log-compresstype'] = str(compress_type)
449+
body = Compressor.compress(body, compress_type)
458450

459451
params = {}
460452
logstore = request.get_logstore()
@@ -465,11 +457,7 @@ def put_logs(self, request):
465457
else:
466458
resource = '/logstores/' + logstore + "/shards/lb"
467459

468-
if is_compress:
469-
(resp, header) = self._send('POST', project, compress_data, resource, params, headers)
470-
else:
471-
(resp, header) = self._send('POST', project, body, resource, params, headers)
472-
460+
(resp, header) = self._send('POST', project, body, resource, params, headers)
473461
return PutLogsResponse(header, resp)
474462

475463
def list_logstores(self, request):
@@ -654,12 +642,12 @@ def get_log(self, project, logstore, from_time, to_time, topic=None,
654642
params['forward'] = forward
655643
body_str = six.b(json.dumps(params))
656644
headers["x-log-bodyrawsize"] = str(len(body_str))
657-
accept_encoding = "lz4" if lz4_available else "deflate"
645+
accept_encoding = str(CompressType.default_compress_type())
658646
headers['Accept-Encoding'] = accept_encoding
659647

660648
(resp, header) = self._send("POST", project, body_str, resource, None, headers, respons_body_type=accept_encoding)
661649

662-
raw_data = Util.uncompress_response(header, resp)
650+
raw_data = Compressor.decompress_response(header, resp)
663651
exJson = self._loadJson(200, header, raw_data, requestId=Util.h_v_td(header, 'x-log-requestid', ''))
664652
exJson = Util.convert_unicode_to_str(exJson)
665653
ret = GetLogsResponse(exJson, header)
@@ -1128,11 +1116,10 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=None, e
11281116
"""
11291117

11301118
headers = {}
1131-
if compress is None or compress:
1132-
if lz4_available:
1133-
headers['Accept-Encoding'] = 'lz4'
1134-
else:
1135-
headers['Accept-Encoding'] = 'gzip'
1119+
1120+
need_compress = compress is None or compress
1121+
if need_compress:
1122+
headers['Accept-Encoding'] = str(CompressType.default_compress_type())
11361123
else:
11371124
headers['Accept-Encoding'] = ''
11381125

@@ -1155,18 +1142,9 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=None, e
11551142
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
11561143
if raw_size <= 0:
11571144
return PullLogResponse(None, header)
1158-
compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower()
1159-
if compress_type == 'lz4':
1160-
if lz4_available:
1161-
raw_data = lz_decompress(raw_size, resp)
1162-
return PullLogResponse(raw_data, header)
1163-
else:
1164-
raise LogException("ClientHasNoLz4", "There's no Lz4 lib available to decompress the response", resp_header=header, resp_body=resp)
1165-
elif compress_type in ('gzip', 'deflate'):
1166-
raw_data = zlib.decompress(resp)
1167-
return PullLogResponse(raw_data, header)
1168-
else:
1169-
return PullLogResponse(resp, header)
1145+
1146+
raw_data = Compressor.decompress_response(header, resp)
1147+
return PullLogResponse(raw_data, header)
11701148

11711149
def pull_log(self, project_name, logstore_name, shard_id, from_time, to_time, batch_size=None, compress=None, query=None):
11721150
""" batch pull log data from log service using time-range

aliyun/log/putlogsrequest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class PutLogsRequest(LogRequest):
2929
:param hashKey: put data with set hash, the data will be send to shard whose range contains the hashKey
3030
3131
:type compress: bool
32-
:param compress: if need to compress the logs
32+
:param compress: if need to compress the logs, default is True
3333
3434
:type logtags: list
3535
:param logtags: list of key:value tag pair , [(tag_key_1,tag_value_1) , (tag_key_2,tag_value_2)]

aliyun/log/util.py

+1-45
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,7 @@
1717
import re
1818
import logging
1919
import json
20-
import struct
21-
import zlib
2220

23-
try:
24-
import lz4
25-
if hasattr(lz4, 'loads') and hasattr(lz4, 'dumps'):
26-
def lz_decompress(raw_size, data):
27-
return lz4.loads(struct.pack('<I', raw_size) + data)
28-
29-
def lz_compresss(data):
30-
return lz4.dumps(data)[4:]
31-
else:
32-
import lz4.block
33-
def lz_decompress(raw_size, data):
34-
return lz4.block.decompress(data, uncompressed_size=raw_size)
35-
36-
def lz_compresss(data):
37-
return lz4.block.compress(data)[4:]
38-
lz4_available = True
39-
except ImportError:
40-
lz4_available = False
4121

4222
logger = logging.getLogger(__name__)
4323

@@ -91,11 +71,6 @@ def get_host_ip(logHost):
9171
if s:
9272
s.close()
9373

94-
@staticmethod
95-
def compress_data(data):
96-
import zlib
97-
return zlib.compress(data, 6)
98-
9974
@staticmethod
10075
def cal_md5(content):
10176
return hashlib.md5(content).hexdigest().upper()
@@ -164,9 +139,6 @@ def convert_unicode_to_str(data):
164139

165140
return data
166141

167-
@staticmethod
168-
def is_lz4_available():
169-
return lz4_available
170142

171143
@staticmethod
172144
def h_v_t(header, key):
@@ -208,23 +180,7 @@ def v_or_d(v, default=None):
208180
else return v
209181
"""
210182
return v if v is not None else default
211-
212-
@staticmethod
213-
def uncompress_response(header, response):
214-
"""
215-
Supported compress type [lz4, gzip, deflate]
216-
"""
217-
compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower()
218-
if compress_type == 'lz4':
219-
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
220-
if Util.is_lz4_available():
221-
return lz_decompress(raw_size, response)
222-
else:
223-
raise LogException("ClientHasNoLz4", "There's no Lz4 lib available to decompress the response", resp_header=header, resp_body=response)
224-
elif compress_type in ('gzip', 'deflate'):
225-
return zlib.decompress(response)
226-
else:
227-
raise LogException('UnknownCompressType', 'Unknown compress type: ' + compress_type, resp_header=header, resp_body=response)
183+
228184

229185
ZERO = timedelta(0)
230186

tests/compress_test.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import time
2+
from aliyun.log.compress import Compressor, CompressType
3+
from aliyun.log import LogClient, PutLogsRequest, LogItem, LogGroup
4+
5+
6+
def test_lz4():
7+
text = b'sadsadsa189634o2??ASBKHD'
8+
compressed = Compressor.compress(text, CompressType.LZ4)
9+
raw_size = len(text)
10+
uncompressed = Compressor.decompress(
11+
compressed, raw_size, CompressType.LZ4)
12+
13+
assert text == uncompressed, "The decompressed data does not match the original"
14+
15+
16+
# setup
17+
project = ''
18+
logstore = ''
19+
access_key_id = ''
20+
access_key_secret = ''
21+
endpoint = ''
22+
client = LogClient(endpoint, access_key_id, access_key_secret)
23+
24+
25+
def test_pull_logs():
26+
shards = client.list_shards(project, logstore).get_shards_info()
27+
shard = shards[0]['shardID']
28+
cursor = client.get_cursor(project, logstore, shard, 'begin').get_cursor()
29+
resp = client.pull_logs(project, logstore, shard, cursor, compress=None)
30+
resp.log_print()
31+
client.pull_logs(project, logstore, shard, cursor, compress=True)
32+
client.pull_logs(project, logstore, shard, cursor, compress=False)
33+
34+
35+
def test_put_log_raw():
36+
log_group = LogGroup()
37+
log = log_group.Logs.add()
38+
log.Time = int(time.time())
39+
for i in range(10):
40+
content = log.Contents.add()
41+
content.Key = client._get_unicode('test' + str(i))
42+
content.Value = client._get_binary('test' + str(i))
43+
client.put_log_raw(project, logstore, log_group, True)
44+
client.put_log_raw(project, logstore, log_group, False)
45+
client.put_log_raw(project, logstore, log_group, None)
46+
47+
48+
def test_put_logs():
49+
client.put_logs(PutLogsRequest(project, logstore, 'test',
50+
'', [LogItem(contents=[('ghello', 'test')])], compress=True))
51+
client.put_logs(PutLogsRequest(project, logstore, 'test',
52+
'', [LogItem(contents=[('ghello', 'test')])], compress=None))
53+
client.put_logs(PutLogsRequest(project, logstore, 'test',
54+
'', [LogItem(contents=[('ghello', 'test')])], compress=False))
55+
56+
57+
def test_get_log():
58+
client._get_logs_v2_enabled = True
59+
from_time = int(time.time()) - 100
60+
to_time = int(time.time())
61+
resp = client.get_log(project, logstore, from_time, to_time, query='*')
62+
resp.log_print()
63+
64+
client._get_logs_v2_enabled = False
65+
resp = client.get_log(project, logstore, from_time, to_time, query='*')
66+
resp.log_print()

tests/logclient/test_get_logs.py

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ def setUp(self):
1818
self.endpoint, self.access_key_id, self.access_key_secret)
1919
self.to_time = int(time.time())
2020
self.from_time = self.to_time - 3600
21-
print('lz4 is available:', Util.is_lz4_available())
2221

2322
def tearDown(self):
2423
pass

0 commit comments

Comments
 (0)