Skip to content

Commit b030711

Browse files
authored
Merge pull request #166 from chaolee50/master
add ingestion
2 parents ce5adc7 + d30d9dd commit b030711

File tree

3 files changed

+366
-0
lines changed

3 files changed

+366
-0
lines changed

aliyun/log/ingestion_response.py

+169
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
#!/usr/bin/env python
2+
# encoding: utf-8
3+
4+
# Copyright (C) Alibaba Cloud Computing
5+
# All rights reserved.
6+
import json
7+
8+
from .logresponse import LogResponse
9+
10+
__all__ = ['ListIngestionResponse', 'CreateIngestionResponse', 'UpdateIngestionResponse', 'DeleteIngestionResponse', 'GetIngestionResponse',
11+
'StartIngestionResponse', 'StopIngestionResponse']
12+
13+
14+
class ListIngestionResponse(LogResponse):
15+
""" The response of the ListIngestion API from log.
16+
17+
:type resp: dict
18+
:param resp: ListIngestionResponse HTTP response body
19+
20+
:type header: dict
21+
:param header: ListIngestionResponse HTTP response header
22+
"""
23+
24+
def __init__(self, resp, header):
25+
LogResponse.__init__(self, header, resp)
26+
self.count = resp['count']
27+
self.total = resp['total']
28+
self.ingestions = resp.get('results', [])
29+
30+
def get_count(self):
31+
""" Get total count of ingestions from the response
32+
33+
:return: int, the number of total ingestions from the response
34+
"""
35+
return self.count
36+
37+
def get_total(self):
38+
return self.total
39+
40+
def get_ingestions(self):
41+
""" Get all the ingestions from the response
42+
43+
:return: list, all ingestions
44+
"""
45+
return self.ingestions
46+
47+
def log_print(self):
48+
print('ListIngestionResponse:')
49+
print('headers:', self.get_all_headers())
50+
print('count:', self.count)
51+
print('total:', self.total)
52+
print('ingestions:', self.ingestions)
53+
54+
55+
class CreateIngestionResponse(LogResponse):
56+
""" The response of the CreateIngestion API from log.
57+
58+
:type resp: dict
59+
:param resp: CreateIngestionResponse HTTP response body
60+
61+
:type header: dict
62+
:param header: CreateIngestionResponse HTTP response header
63+
"""
64+
65+
def __init__(self, header, resp=''):
66+
LogResponse.__init__(self, header, resp)
67+
68+
def log_print(self):
69+
print('CreateIngestionResponse:')
70+
print('headers:', self.get_all_headers())
71+
72+
73+
class UpdateIngestionResponse(LogResponse):
74+
""" The response of the UpdateIngestion API from log.
75+
76+
:type resp: dict
77+
:param resp: UpdateIngestionResponse HTTP response body
78+
79+
:type header: dict
80+
:param header: UpdateIngestionResponse HTTP response header
81+
"""
82+
83+
def __init__(self, header, resp=''):
84+
LogResponse.__init__(self, header, resp)
85+
86+
def log_print(self):
87+
print('UpdateIngestionResponse:')
88+
print('headers:', self.get_all_headers())
89+
90+
91+
class DeleteIngestionResponse(LogResponse):
92+
""" The response of the DeleteIngestion API from log.
93+
94+
:type resp: dict
95+
:param resp: DeleteIngestionResponse HTTP response body
96+
97+
:type header: dict
98+
:param header: DeleteIngestionResponse HTTP response header
99+
"""
100+
101+
def __init__(self, header, resp=''):
102+
LogResponse.__init__(self, header, resp)
103+
104+
def log_print(self):
105+
print('DeleteIngestionResponse:')
106+
print('headers:', self.get_all_headers())
107+
108+
109+
class GetIngestionResponse(LogResponse):
110+
""" The response of the GetIngestion API from log.
111+
112+
:type resp: dict
113+
:param resp: GetIngestionResponse HTTP response body
114+
115+
:type header: dict
116+
:param header: GetIngestionResponse HTTP response header
117+
"""
118+
119+
def __init__(self, resp, header):
120+
LogResponse.__init__(self, header, resp)
121+
self.ingestion_config = resp
122+
123+
def get_ingestion(self):
124+
""" Get ingestion from the response
125+
126+
:return: dict, ingestion
127+
"""
128+
return self.ingestion_config
129+
130+
def log_print(self):
131+
print('GetIngestionResponse:')
132+
print('headers:', self.get_all_headers())
133+
print('ingestion:', json.dumps(self.ingestion_config))
134+
135+
136+
class StartIngestionResponse(LogResponse):
137+
""" The response of the StartIngestion API from log.
138+
139+
:type resp: dict
140+
:param resp: StartIngestionResponse HTTP response body
141+
142+
:type header: dict
143+
:param header: StartIngestionResponse HTTP response header
144+
"""
145+
146+
def __init__(self, header, resp=''):
147+
LogResponse.__init__(self, header, resp)
148+
149+
def log_print(self):
150+
print('StartIngestionResponse:')
151+
print('headers:', self.get_all_headers())
152+
153+
154+
class StopIngestionResponse(LogResponse):
155+
""" The response of the StopIngestion API from log.
156+
157+
:type resp: dict
158+
:param resp: StopIngestionResponse HTTP response body
159+
160+
:type header: dict
161+
:param header: StopIngestionResponse HTTP response header
162+
"""
163+
164+
def __init__(self, header, resp=''):
165+
LogResponse.__init__(self, header, resp)
166+
167+
def log_print(self):
168+
print('StopIngestionResponse:')
169+
print('headers:', self.get_all_headers())

aliyun/log/logclient.py

+158
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from .getlogsresponse import GetLogsResponse
2424
from .getcontextlogsresponse import GetContextLogsResponse
2525
from .index_config_response import *
26+
from .ingestion_response import *
2627
from .listlogstoresresponse import ListLogstoresResponse
2728
from .listtopicsresponse import ListTopicsResponse
2829
from .logclient_core import make_lcrud_methods
@@ -2659,6 +2660,163 @@ def disable_alert(self, project_name, job_name):
26592660
(resp, header) = self._send("PUT", project_name, None, resource, params, headers)
26602661
return LogResponse(header)
26612662

2663+
def list_ingestion(self, project_name):
2664+
""" list ingestion
2665+
Unsuccessful opertaion will cause an LogException.
2666+
2667+
:type project_name: string
2668+
:param project_name: the Project name
2669+
2670+
:return: ListIngestionResponse
2671+
2672+
:raise: LogException
2673+
"""
2674+
2675+
headers = {}
2676+
params = {}
2677+
resource = '/jobs'
2678+
(resp, header) = self._send("GET", project_name, None, resource, params, headers)
2679+
return ListIngestionResponse(resp, header)
2680+
2681+
def create_ingestion(self, project_name, ingestion_config):
2682+
""" create ingestion config
2683+
Unsuccessful opertaion will cause an LogException.
2684+
2685+
:type project_name: string
2686+
:param project_name: the Project name
2687+
2688+
:type ingestion_config: string
2689+
:param ingestion_config: the ingestion config
2690+
2691+
:return: CreateIngestionResponse
2692+
2693+
:raise: LogException
2694+
"""
2695+
2696+
headers = {}
2697+
params = {}
2698+
resource = "/jobs"
2699+
headers['Content-Type'] = 'application/json'
2700+
body = six.b(ingestion_config)
2701+
headers['x-log-bodyrawsize'] = str(len(body))
2702+
2703+
(resp, header) = self._send("POST", project_name, body, resource, params, headers)
2704+
return CreateIngestionResponse(header, resp)
2705+
2706+
def update_ingestion(self, project_name, ingestion_name, ingestion_config):
2707+
""" update ingestion config
2708+
Unsuccessful opertaion will cause an LogException.
2709+
2710+
:type project_name: string
2711+
:param project_name: the Project name
2712+
2713+
:type ingestion_name: string
2714+
:param ingestion_name: the ingestion name
2715+
2716+
:type ingestion_config: string
2717+
:param ingestion_config: the ingestion config
2718+
2719+
:return: UpdateIngestionResponse
2720+
2721+
:raise: LogException
2722+
"""
2723+
2724+
headers = {}
2725+
params = {}
2726+
resource = "/jobs/" + ingestion_name
2727+
headers['Content-Type'] = 'application/json'
2728+
body = six.b(ingestion_config)
2729+
headers['x-log-bodyrawsize'] = str(len(body))
2730+
2731+
(resp, header) = self._send("PUT", project_name, body, resource, params, headers)
2732+
return UpdateIngestionResponse(header, resp)
2733+
2734+
def delete_ingestion(self, project_name, ingestion_name):
2735+
""" delete ingestion config
2736+
Unsuccessful opertaion will cause an LogException.
2737+
2738+
:type project_name: string
2739+
:param project_name: the Project name
2740+
2741+
:type ingestion_name: string
2742+
:param ingestion_name: the ingestion name
2743+
2744+
:return: DeleteIngestionResponse
2745+
2746+
:raise: LogException
2747+
"""
2748+
2749+
headers = {}
2750+
params = {}
2751+
resource = "/jobs/" + ingestion_name
2752+
(resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
2753+
return DeleteIngestionResponse(header, resp)
2754+
2755+
def get_ingestion(self, project_name, ingestion_name):
2756+
""" get ingestion config detail
2757+
Unsuccessful opertaion will cause an LogException.
2758+
2759+
:type project_name: string
2760+
:param project_name: the Project name
2761+
2762+
:type ingestion_name: string
2763+
:param ingestion_name: the ingestion name
2764+
2765+
:return: GetIngestionResponse
2766+
2767+
:raise: LogException
2768+
"""
2769+
2770+
headers = {}
2771+
params = {}
2772+
resource = "/jobs/" + ingestion_name
2773+
(resp, header) = self._send("GET", project_name, None, resource, params, headers)
2774+
return GetIngestionResponse(resp, header)
2775+
2776+
def start_ingestion(self, project_name, ingestion_name):
2777+
""" start ingestion
2778+
Unsuccessful opertaion will cause an LogException.
2779+
2780+
:type project_name: string
2781+
:param project_name: the Project name
2782+
2783+
:type ingestion_name: string
2784+
:param ingestion_name: the ingestion name
2785+
2786+
:return: StartIngestionResponse
2787+
2788+
:raise: LogException
2789+
"""
2790+
2791+
headers = {}
2792+
params = {"action":"START"}
2793+
resource = "/jobs/" + ingestion_name
2794+
2795+
(resp, header) = self._send("PUT", project_name, None, resource, params, headers)
2796+
return StartIngestionResponse(header, resp)
2797+
2798+
def stop_ingestion(self, project_name, ingestion_name):
2799+
""" stop ingestion
2800+
Unsuccessful opertaion will cause an LogException.
2801+
2802+
:type project_name: string
2803+
:param project_name: the Project name
2804+
2805+
:type ingestion_name: string
2806+
:param ingestion_name: the ingestion name
2807+
2808+
:return: StopIngestionResponse
2809+
2810+
:raise: LogException
2811+
"""
2812+
2813+
headers = {}
2814+
params = {"action":"STOP"}
2815+
resource = "/jobs/" + ingestion_name
2816+
2817+
(resp, header) = self._send("PUT", project_name, None, resource, params, headers)
2818+
return StopIngestionResponse(header, resp)
2819+
26622820

26632821
make_lcrud_methods(LogClient, 'dashboard', name_field='dashboardName')
26642822
make_lcrud_methods(LogClient, 'alert', name_field='name', root_resource='/jobs', entities_key='results')

tests/sample_ingestion.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# encoding: utf-8
2+
import os
3+
from aliyun.log import *
4+
5+
6+
def main():
7+
endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', '')
8+
access_key_id = os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSID', '')
9+
access_key = os.environ.get('ALIYUN_LOG_SAMPLE_ACCESSKEY', '')
10+
project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', '')
11+
12+
client = LogClient(endpoint, access_key_id, access_key)
13+
14+
ingest_config = '{"schedule":{"delay":0,"runImmediately":true,"interval":"5m","type":"FixedRate"},"lastModifiedTime":1596087431,"recyclable":false,"configuration":{"source":{"bucket":"test-lichao","endpoint":"oss-cn-hangzhou-internal.aliyuncs.com","roleARN":"acs:ram::1049446484210612:role/aliyunlogimportossrole","prefix":" 2019/08/20/13/","compressionCodec":"snappy","restoreObjectEnabled":false,"pattern":"","format":{"skipInvalidRows":false,"timeField":"","type":"JSON"},"type":"AliyunOSS","encoding":"UTF-8"},"logstore":"233"},"createTime":1596087431,"displayName":"osstest","name":"ingest-1596087431-683090","description":"","state":"Enabled","type":"Ingestion","status":"SUCCESSED"}'
15+
res = client.create_ingestion(project, ingest_config)
16+
print res.log_print()
17+
18+
res = client.list_ingestion(project)
19+
print res.get_ingestions()
20+
21+
res = client.get_ingestion(project, "ingest-1596087431-683089")
22+
print res.log_print()
23+
24+
res = client.stop_ingestion(project, "ingest-1596087431-683089")
25+
print res.log_print()
26+
27+
res = client.start_ingestion(project, "ingest-1596087431-683089")
28+
print res.log_print()
29+
30+
ingest_config = '{"schedule":{"delay":0,"runImmediately":true,"interval":"5m","type":"FixedRate"},"lastModifiedTime":1596087431,"recyclable":false,"configuration":{"source":{"bucket":"test-lichao","endpoint":"oss-cn-hangzhou-internal.aliyuncs.com","roleARN":"acs:ram::1049446484210612:role/aliyunlogimportossrole","prefix":" 2019/08/20/13/","compressionCodec":"snappy","restoreObjectEnabled":false,"pattern":"","format":{"skipInvalidRows":false,"timeField":"","type":"JSON"},"type":"AliyunOSS","encoding":"UTF-8"},"logstore":"666"},"createTime":1596087431,"displayName":"osstest","name":"ingest-1596087431-683089","description":"","state":"Enabled","type":"Ingestion","status":"SUCCESSED"}'
31+
res = client.update_ingestion(project, "ingest-1596087431-683089", ingest_config)
32+
print res.log_print()
33+
34+
res = client.delete_ingestion(project, "ingest-1596087431-683089")
35+
print res.log_print()
36+
37+
38+
if __name__ == '__main__':
39+
main()

0 commit comments

Comments
 (0)