Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async support #7

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 80 additions & 64 deletions recombee_api_client/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union
from enum import Enum

import requests
import httpx
from hashlib import sha1
from urllib.parse import quote

Expand Down Expand Up @@ -36,41 +36,49 @@ class RecombeeClient:
:param region: region of the Recombee cluster where the database is located
"""
BATCH_MAX_SIZE = 10000
CLIENT_CLS = httpx.Client

def __init__(self, database_id: str, token: str, protocol: str = 'https', options: dict = None, region: Region = None):
self.database_id = database_id
self.token = token
self.protocol = protocol

self.base_uri = self.__get_base_uri(options=options or {}, region=region)
self.base_uri = self._get_base_uri(options=options or {}, region=region)
self.client = self.CLIENT_CLS()

def send(self, request: Request) -> Union[dict, str, list]:
"""
:param request: Request to be sent to Recombee recommender
"""

def _send(self, request: Request) -> Union[dict, str, list]:
if isinstance(request, Batch) and len(request.requests) > self.BATCH_MAX_SIZE:
return self.__send_multipart_batch(request)
return self._send_multipart_batch(request)

timeout = request.timeout / 1000
uri = self.__process_request_uri(request)
uri = self.__sign_url(uri)
uri = self._process_request_uri(request)
uri = self._sign_url(uri)
protocol = 'https' if request.ensure_https else self.protocol
uri = protocol + '://' + self.base_uri + uri

if request.method == 'put':
return self._put(request, uri, timeout)
elif request.method == 'get':
return self._get(request, uri, timeout)
elif request.method == 'post':
return self._post(request, uri, timeout)
elif request.method == 'delete':
return self._delete(request, uri, timeout)

def send(self, request: Request) -> Union[dict, str, list]:
"""
:param request: Request to be sent to Recombee recommender
"""
try:
if request.method == 'put':
return self.__put(request, uri, timeout)
elif request.method == 'get':
return self.__get(request, uri, timeout)
elif request.method == 'post':
return self.__post(request, uri, timeout)
elif request.method == 'delete':
return self.__delete(request, uri, timeout)
except requests.exceptions.Timeout:
response = self._send(request)
except httpx.TimeoutException:
raise ApiTimeoutException(request)

self._check_errors(response, request)
return response.json()

@staticmethod
def __get_regional_base_uri(region: Region) -> str:
def _get_regional_base_uri(region: Region) -> str:
uri = {
Region.AP_SE: 'rapi-ap-se.recombee.com',
Region.CA_EAST: 'rapi-ca-east.recombee.com',
Expand All @@ -83,106 +91,114 @@ def __get_regional_base_uri(region: Region) -> str:
return uri

@staticmethod
def __get_base_uri(options: dict, region: str) -> str:
def _get_base_uri(options: dict, region: str) -> str:
base_uri = os.environ.get('RAPI_URI') or options.get('base_uri')
if region is not None:
if base_uri:
raise ValueError('base_uri and region cannot be specified at the same time')
base_uri = RecombeeClient.__get_regional_base_uri(region)
base_uri = RecombeeClient._get_regional_base_uri(region)

return base_uri or 'rapi.recombee.com'

@staticmethod
def __get_http_headers(additional_headers: dict = None) -> dict:
def _get_http_headers(additional_headers: dict = None) -> dict:
headers = {'User-Agent': 'recombee-python-api-client/4.1.0'}
if additional_headers:
headers.update(additional_headers)
return headers

def __put(self, request: Request, uri: str, timeout: int):
response = requests.put(uri,
data=json.dumps(request.get_body_parameters()),
headers=self.__get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)
self.__check_errors(response, request)
return response.json()
def _put(self, request: Request, uri: str, timeout: int):
return self.client.put(uri,
data=json.dumps(request.get_body_parameters()),
headers=self._get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)

def __get(self, request: Request, uri: str, timeout: int):
response = requests.get(uri,
headers=self.__get_http_headers(),
timeout=timeout)
self.__check_errors(response, request)
return response.json()
def _get(self, request: Request, uri: str, timeout: int):
return self.client.get(uri,
headers=self._get_http_headers(),
timeout=timeout)

def __post(self, request: Request, uri: str, timeout: int):
response = requests.post(uri,
data=json.dumps(request.get_body_parameters()),
headers=self.__get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)
self.__check_errors(response, request)
return response.json()
def _post(self, request: Request, uri: str, timeout: int):
return self.client.post(uri,
data=json.dumps(request.get_body_parameters()),
headers=self._get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)

def __delete(self, request: Request, uri: str, timeout: int):
response = requests.delete(uri,
data=json.dumps(request.get_body_parameters()),
headers=self.__get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)
self.__check_errors(response, request)
return response.json()
def _delete(self, request: Request, uri: str, timeout: int):
return self.client.delete(uri,
data=json.dumps(request.get_body_parameters()),
headers=self._get_http_headers({'Content-Type': 'application/json'}),
timeout=timeout)

def __check_errors(self, response, request: Request):
def _check_errors(self, response, request: Request):
status_code = response.status_code
if status_code == 200 or status_code == 201:
return
raise ResponseException(request, status_code, response.text)

@staticmethod
def __get_list_chunks(l: list, n: int) -> list:
def _get_list_chunks(l: list, n: int) -> list:
"""Yield successive n-sized chunks from l."""

for i in range(0, len(l), n):
yield l[i:i + n]

def __send_multipart_batch(self, batch: Batch) -> list:
requests_parts = [rqs for rqs in self.__get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)]
def _send_multipart_batch(self, batch: Batch) -> list:
requests_parts = [rqs for rqs in self._get_list_chunks(batch.requests, self.BATCH_MAX_SIZE)]
responses = [self.send(Batch(rqs)) for rqs in requests_parts]
return sum(responses, [])

def __process_request_uri(self, request: Request) -> str:
def _process_request_uri(self, request: Request) -> str:
uri = request.path
uri += self.__query_parameters_to_url(request)
uri += self._query_parameters_to_url(request)
return uri

def __query_parameters_to_url(self, request: Request) -> str:
def _query_parameters_to_url(self, request: Request) -> str:
ps = ''
query_params = request.get_query_parameters()
for name in query_params:
val = query_params[name]
ps += '&' if ps.find('?') != -1 else '?'
ps += "%s=%s" % (name, self.__format_query_parameter_value(val))
ps += "%s=%s" % (name, self._format_query_parameter_value(val))
return ps

@staticmethod
def __format_query_parameter_value(value) -> str:
def _format_query_parameter_value(value) -> str:
if isinstance(value, list):
return ','.join([quote(str(v)) for v in value])
return quote(str(value))

# Sign request with HMAC, request URI must be exactly the same
# We have 30s to complete request with this token
def __sign_url(self, req_part: str) -> str:
def _sign_url(self, req_part: str) -> str:
uri = '/' + self.database_id + req_part
time_part = self.__hmac_time(uri)
sign = self.__hmac_sign(uri, time_part)
time_part = self._hmac_time(uri)
sign = self._hmac_sign(uri, time_part)
res = uri + time_part + '&hmac_sign=' + sign
return res

def __hmac_time(self, uri: str) -> str:
def _hmac_time(self, uri: str) -> str:
res = '&' if uri.find('?') != -1 else '?'
res += "hmac_timestamp=%s" % int(time.time())
return res

def __hmac_sign(self, uri: str, time_part: str) -> str:
def _hmac_sign(self, uri: str, time_part: str) -> str:
url = uri + time_part
sign = hmac.new(str.encode(self.token), str.encode(url), sha1).hexdigest()
return sign


class AsyncRecombeeClient(RecombeeClient):
CLIENT_CLS = httpx.AsyncClient

async def send(self, request: Request) -> Union[dict, str, list]:
"""
:param request: Request to be sent to Recombee recommender
"""
try:
response = await self._send(request)
except httpx.TimeoutException:
raise ApiTimeoutException(request)

self._check_errors(response, request)
return response.json()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
packages=find_packages(exclude=['contrib', 'docs', 'tests']),


install_requires=['requests'],
install_requires=['httpx'],

python_requires='>=3.4',

Expand Down