From f3a0b9b49518d414c5f4444dc17c2d94467bf5f7 Mon Sep 17 00:00:00 2001 From: yuansl <yuanshenglong@qiniu.com> Date: Mon, 9 Dec 2024 11:31:37 +0800 Subject: [PATCH 01/12] =?UTF-8?q?DP-5290=20python-sdk/cdn:=20=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E5=9F=9F=E5=90=8D=E5=B8=A6=E5=AE=BD=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81type=E5=8F=82=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - cdn/manager: add DataType class(enum), add optional parameter 'data_type' to `CdnManager.get_bandwidth_data` & `CdnManager.get_flux_data` - see https://jira.qiniu.io/browse/DP-5290 --- examples/cdn_bandwidth.py | 8 +++++++- qiniu/__init__.py | 2 +- qiniu/services/cdn/manager.py | 20 +++++++++++++++++--- 3 files changed, 25 insertions(+), 5 deletions(-) diff --git a/examples/cdn_bandwidth.py b/examples/cdn_bandwidth.py index ad4e97a8..32fbb40b 100644 --- a/examples/cdn_bandwidth.py +++ b/examples/cdn_bandwidth.py @@ -5,7 +5,7 @@ 查询指定域名指定时间段内的带宽 """ import qiniu -from qiniu import CdnManager +from qiniu import CdnManager, DataType # 账户ak,sk @@ -31,3 +31,9 @@ print(ret) print(info) + +ret, info = cdn_manager.get_bandwidth_data( + urls, startDate, endDate, granularity, data_type=DataType.BANDWIDTH) + +print(ret) +print(info) diff --git a/qiniu/__init__.py b/qiniu/__init__.py index 1ae68c00..a18aac2c 100644 --- a/qiniu/__init__.py +++ b/qiniu/__init__.py @@ -21,7 +21,7 @@ build_batch_stat, build_batch_delete, build_batch_restoreAr, build_batch_restore_ar from .services.storage.uploader import put_data, put_file, put_stream from .services.storage.upload_progress_recorder import UploadProgressRecorder -from .services.cdn.manager import CdnManager, create_timestamp_anti_leech_url, DomainManager +from .services.cdn.manager import CdnManager, DataType, create_timestamp_anti_leech_url, DomainManager from .services.processing.pfop import PersistentFop from .services.processing.cmd import build_op, pipe_cmd, op_save from .services.compute.app import AccountClient diff --git a/qiniu/services/cdn/manager.py b/qiniu/services/cdn/manager.py index bf107926..8ba944e5 100644 --- a/qiniu/services/cdn/manager.py +++ b/qiniu/services/cdn/manager.py @@ -5,9 +5,17 @@ from qiniu.compat import is_py2 from qiniu.compat import is_py3 +from enum import Enum import hashlib +class DataType(Enum): + BANDWIDTH = 'bandwidth' + X302BANDWIDTH = '302bandwidth' + X302MBANDWIDTH = '302mbandwidth' + FLOW = 'flow' + X302FLOW = '302flow' + X302MFLOW = '302mflow' def urlencode(str): if is_py2: @@ -60,7 +68,7 @@ def refresh_urls_and_dirs(self, urls, dirs): Returns: 一个dict变量和一个ResponseInfo对象 参考代码 examples/cdn_manager.py - """ + """ req = {} if urls is not None and len(urls) > 0: req.update({"urls": urls}) @@ -89,7 +97,7 @@ def prefetch_urls(self, urls): url = '{0}/v2/tune/prefetch'.format(self.server) return self.__post(url, body) - def get_bandwidth_data(self, domains, start_date, end_date, granularity): + def get_bandwidth_data(self, domains, start_date, end_date, granularity, data_type=None): """ 查询带宽数据,文档 https://developer.qiniu.com/fusion/api/traffic-bandwidth @@ -98,6 +106,7 @@ def get_bandwidth_data(self, domains, start_date, end_date, granularity): start_date: 起始日期 end_date: 结束日期 granularity: 数据间隔 + data_type: 计量数据类型, see class DataType.XXXBANDWIDTH Returns: 一个dict变量和一个ResponseInfo对象 @@ -108,12 +117,14 @@ def get_bandwidth_data(self, domains, start_date, end_date, granularity): req.update({"startDate": start_date}) req.update({"endDate": end_date}) req.update({"granularity": granularity}) + if data_type is not None: + req.update({'type': data_type.value}) # should be one of 'bandwidth', '302bandwidth', '302mbandwidth' body = json.dumps(req) url = '{0}/v2/tune/bandwidth'.format(self.server) return self.__post(url, body) - def get_flux_data(self, domains, start_date, end_date, granularity): + def get_flux_data(self, domains, start_date, end_date, granularity, data_type=None): """ 查询流量数据,文档 https://developer.qiniu.com/fusion/api/traffic-bandwidth @@ -122,6 +133,7 @@ def get_flux_data(self, domains, start_date, end_date, granularity): start_date: 起始日期 end_date: 结束日期 granularity: 数据间隔 + data_type: 计量数据类型, see class DataType.XXXFLOW Returns: 一个dict变量和一个ResponseInfo对象 @@ -132,6 +144,8 @@ def get_flux_data(self, domains, start_date, end_date, granularity): req.update({"startDate": start_date}) req.update({"endDate": end_date}) req.update({"granularity": granularity}) + if data_type is not None: + req.update({'type': data_type.value}) # should be one of 'flow', '302flow', '302mflow' body = json.dumps(req) url = '{0}/v2/tune/flux'.format(self.server) From 1ca9520e869e1f283411787e1b5fd177369bd779 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Tue, 10 Dec 2024 16:35:00 +0800 Subject: [PATCH 02/12] fix: improve file cache of regions in concurrent scenarios --- qiniu/compat.py | 7 + qiniu/http/regions_provider.py | 215 +++++++++++++----- qiniu/http/response.py | 1 + .../cases/test_http/test_regions_provider.py | 41 +++- 4 files changed, 210 insertions(+), 54 deletions(-) diff --git a/qiniu/compat.py b/qiniu/compat.py index 6a418c99..079aef70 100644 --- a/qiniu/compat.py +++ b/qiniu/compat.py @@ -14,6 +14,13 @@ # because of u'...' Unicode literals. import json # noqa +# ------- +# Platform +# ------- + +is_windows = sys.platform == 'win32' +is_linux = sys.platform == 'linux' +is_macos = sys.platform == 'darwin' # ------- # Pythons diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index 13d1800a..aa22568c 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -6,8 +6,9 @@ import tempfile import os import shutil +import threading -from qiniu.compat import json, b as to_bytes +from qiniu.compat import json, b as to_bytes, is_windows, is_linux, is_macos from qiniu.utils import io_md5, dt2ts from .endpoint import Endpoint @@ -24,7 +25,7 @@ def __iter__(self): """ Returns ------- - list[Region] + Generator[Region, None, None] """ @@ -137,27 +138,112 @@ def __init__(self, message): super(FileAlreadyLocked, self).__init__(message) -class _FileLocker: - def __init__(self, origin_file): - self._origin_file = origin_file +_file_threading_lockers_lock = threading.Lock() +_file_threading_lockers = {} + + +class _FileThreadingLocker: + def __init__(self, fd): + self._fd = fd def __enter__(self): - if os.access(self.lock_file_path, os.R_OK | os.W_OK): - raise FileAlreadyLocked('File {0} already locked'.format(self._origin_file)) - with open(self.lock_file_path, 'w'): - pass + with _file_threading_lockers_lock: + global _file_threading_lockers + threading_lock = _file_threading_lockers.get(self._file_path, threading.Lock()) + # Could use keyword style `acquire(blocking=False)` when min version of python update to >= 3 + if not threading_lock.acquire(False): + raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) + _file_threading_lockers[self._file_path] = threading_lock def __exit__(self, exc_type, exc_val, exc_tb): - os.remove(self.lock_file_path) + with _file_threading_lockers_lock: + global _file_threading_lockers + threading_lock = _file_threading_lockers.get(self._file_path) + if threading_lock and threading_lock.locked(): + threading_lock.release() + del _file_threading_lockers[self._file_path] @property - def lock_file_path(self): - """ - Returns - ------- - str - """ - return self._origin_file + '.lock' + def _file_path(self): + return self._fd.name + + +if is_linux or is_macos: + import fcntl + + # Use subclass of _FileThreadingLocker when min version of python update to >= 3 + class _FileLocker: + def __init__(self, fd): + self._fd = fd + + def __enter__(self): + try: + fcntl.lockf(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except IOError: + # Use `raise ... from ...` when min version of python update to >= 3 + raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) + + def __exit__(self, exc_type, exc_val, exc_tb): + fcntl.lockf(self._fd, fcntl.LOCK_UN) + + @property + def _file_path(self): + return self._fd.name + +elif is_windows: + import msvcrt + + + class _FileLocker: + def __init__(self, fd): + self._fd = fd + + def __enter__(self): + try: + # TODO(lihs): set `_nbyte` bigger? + msvcrt.locking(self._fd, msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1) + except OSError: + raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) + + def __exit__(self, exc_type, exc_val, exc_tb): + msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1) + + @property + def _file_path(self): + return self._fd.name + +else: + class _FileLocker: + def __init__(self, fd): + self._fd = fd + + def __enter__(self): + try: + # Atomic file creation + open_flags = os.O_EXCL | os.O_RDWR | os.O_CREAT + fd = os.open(self.lock_file_path, open_flags) + os.close(fd) + except FileExistsError: + raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + os.remove(self.lock_file_path) + except FileNotFoundError: + pass + + @property + def _file_path(self): + return self._fd.name + + @property + def lock_file_path(self): + """ + Returns + ------- + str + """ + return self._file_path + '.lock' # use dataclass instead namedtuple if min version of python update to 3.7 @@ -168,7 +254,8 @@ def lock_file_path(self): 'persist_path', 'last_shrink_at', 'shrink_interval', - 'should_shrink_expired_regions' + 'should_shrink_expired_regions', + 'memo_cache_lock' ] ) @@ -177,11 +264,12 @@ def lock_file_path(self): memo_cache={}, persist_path=os.path.join( tempfile.gettempdir(), - 'qn-regions-cache.jsonl' + 'qn-py-sdk-regions-cache.jsonl' ), last_shrink_at=datetime.datetime.fromtimestamp(0), - shrink_interval=datetime.timedelta(-1), # useless for now - should_shrink_expired_regions=False + shrink_interval=datetime.timedelta(days=1), + should_shrink_expired_regions=False, + memo_cache_lock=threading.Lock() ) @@ -323,7 +411,7 @@ def _parse_persisted_regions(persisted_data): return parsed_data.get('cacheKey'), regions -def _walk_persist_cache_file(persist_path, ignore_parse_error=False): +def _walk_persist_cache_file(persist_path, ignore_parse_error=True): """ Parameters ---------- @@ -394,23 +482,24 @@ def __init__( self.base_regions_provider = base_regions_provider persist_path = kwargs.get('persist_path', None) + last_shrink_at = datetime.datetime.fromtimestamp(0) if persist_path is None: persist_path = _global_cache_scope.persist_path + last_shrink_at = _global_cache_scope.last_shrink_at shrink_interval = kwargs.get('shrink_interval', None) if shrink_interval is None: - shrink_interval = datetime.timedelta(days=1) + shrink_interval = _global_cache_scope.shrink_interval should_shrink_expired_regions = kwargs.get('should_shrink_expired_regions', None) if should_shrink_expired_regions is None: - should_shrink_expired_regions = False + should_shrink_expired_regions = _global_cache_scope.should_shrink_expired_regions - self._cache_scope = CacheScope( - memo_cache=_global_cache_scope.memo_cache, + self._cache_scope = _global_cache_scope._replace( persist_path=persist_path, - last_shrink_at=datetime.datetime.fromtimestamp(0), + last_shrink_at=last_shrink_at, shrink_interval=shrink_interval, - should_shrink_expired_regions=should_shrink_expired_regions, + should_shrink_expired_regions=should_shrink_expired_regions ) def __iter__(self): @@ -423,7 +512,7 @@ def __iter__(self): self.__get_regions_from_base_provider ] - regions = None + regions = [] for get_regions in get_regions_fns: regions = get_regions(fallback=regions) if regions and all(r.is_live for r in regions): @@ -439,7 +528,8 @@ def set_regions(self, regions): ---------- regions: list[Region] """ - self._cache_scope.memo_cache[self.cache_key] = regions + with self._cache_scope.memo_cache_lock: + self._cache_scope.memo_cache[self.cache_key] = regions if not self._cache_scope.persist_path: return @@ -469,8 +559,11 @@ def persist_path(self, value): ---------- value: str """ + if value == self._cache_scope.persist_path: + return self._cache_scope = self._cache_scope._replace( - persist_path=value + persist_path=value, + last_shrink_at=datetime.datetime.fromtimestamp(0) ) @property @@ -586,7 +679,6 @@ def __get_regions_from_base_provider(self, fallback=None): def __flush_file_cache_to_memo(self): for cache_key, regions in _walk_persist_cache_file( persist_path=self._cache_scope.persist_path - # ignore_parse_error=True ): if cache_key not in self._cache_scope.memo_cache: self._cache_scope.memo_cache[cache_key] = regions @@ -609,12 +701,18 @@ def __should_shrink(self): def __shrink_cache(self): # shrink memory cache if self._cache_scope.should_shrink_expired_regions: - kept_memo_cache = {} - for k, regions in self._cache_scope.memo_cache.items(): - live_regions = [r for r in regions if r.is_live] - if live_regions: - kept_memo_cache[k] = live_regions - self._cache_scope = self._cache_scope._replace(memo_cache=kept_memo_cache) + memo_cache_old = self._cache_scope.memo_cache.copy() + # Could use keyword style `acquire(blocking=False)` when min version of python update to >= 3 + if self._cache_scope.memo_cache_lock.acquire(False): + try: + for k, regions in memo_cache_old.items(): + live_regions = [r for r in regions if r.is_live] + if live_regions: + self._cache_scope.memo_cache[k] = live_regions + else: + del self._cache_scope.memo_cache[k] + finally: + self._cache_scope.memo_cache_lock.release() # shrink file cache if not self._cache_scope.persist_path: @@ -625,7 +723,7 @@ def __shrink_cache(self): shrink_file_path = self._cache_scope.persist_path + '.shrink' try: - with _FileLocker(shrink_file_path): + with open(shrink_file_path, 'a') as f, _FileThreadingLocker(f), _FileLocker(f): # filter data shrunk_cache = {} for cache_key, regions in _walk_persist_cache_file( @@ -646,25 +744,36 @@ def __shrink_cache(self): ) # write data - with open(shrink_file_path, 'a') as f: - for cache_key, regions in shrunk_cache.items(): - f.write( - json.dumps( - { - 'cacheKey': cache_key, - 'regions': [_persist_region(r) for r in regions] - } - ) + os.linesep - ) + for cache_key, regions in shrunk_cache.items(): + f.write( + json.dumps( + { + 'cacheKey': cache_key, + 'regions': [_persist_region(r) for r in regions] + } + ) + os.linesep + ) + + # make the cache file available for all users + if is_linux or is_macos: + os.chmod(shrink_file_path, 0o666) # rename file shutil.move(shrink_file_path, self._cache_scope.persist_path) + + # update last shrink time + self._cache_scope = self._cache_scope._replace( + last_shrink_at=datetime.datetime.now() + ) + global _global_cache_scope + if _global_cache_scope.persist_path == self._cache_scope.persist_path: + _global_cache_scope = _global_cache_scope._replace( + last_shrink_at=self._cache_scope.last_shrink_at + ) + except FileAlreadyLocked: + # skip file shrink by another running pass - finally: - self._cache_scope = self._cache_scope._replace( - last_shrink_at=datetime.datetime.now() - ) def get_default_regions_provider( diff --git a/qiniu/http/response.py b/qiniu/http/response.py index 6450438d..cbfcf034 100644 --- a/qiniu/http/response.py +++ b/qiniu/http/response.py @@ -57,6 +57,7 @@ def need_retry(self): ]): return False # https://developer.qiniu.com/fusion/kb/1352/the-http-request-return-a-status-code + # https://developer.qiniu.com/kodo/3928/error-responses if self.status_code in [ 501, 509, 573, 579, 608, 612, 614, 616, 618, 630, 631, 632, 640, 701 ]: diff --git a/tests/cases/test_http/test_regions_provider.py b/tests/cases/test_http/test_regions_provider.py index 7289f5ca..ba84faec 100644 --- a/tests/cases/test_http/test_regions_provider.py +++ b/tests/cases/test_http/test_regions_provider.py @@ -1,7 +1,9 @@ import os import datetime import tempfile +import time import json +from multiprocessing.pool import ThreadPool import pytest @@ -9,7 +11,15 @@ from qiniu.config import QUERY_REGION_HOST, QUERY_REGION_BACKUP_HOSTS from qiniu.http.endpoint import Endpoint from qiniu.http.region import Region -from qiniu.http.regions_provider import QueryRegionsProvider, CachedRegionsProvider, _global_cache_scope, _persist_region +from qiniu.http.regions_provider import ( + CachedRegionsProvider, + FileAlreadyLocked, + QueryRegionsProvider, + _FileThreadingLocker, + _FileLocker, + _global_cache_scope, + _persist_region, +) @pytest.fixture(scope='session') @@ -32,6 +42,16 @@ def query_regions_provider(access_key, bucket_name, query_regions_endpoints_prov yield query_regions_provider +@pytest.fixture(scope='function') +def temp_file_path(rand_string): + p = os.path.join(tempfile.gettempdir(), rand_string(16)) + yield p + try: + os.remove(p) + except FileNotFoundError: + pass + + class TestQueryRegionsProvider: def test_getter(self, query_regions_provider): ret = list(query_regions_provider) @@ -267,3 +287,22 @@ def test_shrink_with_ignore_expired_regions(self, cached_regions_provider): cached_regions_provider.cache_key = 'another-cache-key' list(cached_regions_provider) # trigger __shrink_cache() assert len(cached_regions_provider._cache_scope.memo_cache[origin_cache_key]) > 0 + + def test_file_locker(self, temp_file_path): + handled_cnt = 0 + skipped_cnt = 0 + + + def process_file(_n): + nonlocal handled_cnt, skipped_cnt + try: + with open(temp_file_path, 'w') as f, _FileThreadingLocker(f), _FileLocker(f): + time.sleep(1) + handled_cnt += 1 + except FileAlreadyLocked: + skipped_cnt += 1 + + + ThreadPool(4).map(process_file, range(20)) + assert handled_cnt + skipped_cnt == 20 + assert 0 < handled_cnt <= 4 From 642c71397a29ed7ccf107ee013ba6e0420628c0a Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Tue, 10 Dec 2024 17:10:20 +0800 Subject: [PATCH 03/12] feat: add single flight and apply to QueryRegionsProvider --- qiniu/http/regions_provider.py | 14 ++++- qiniu/http/single_flight.py | 50 +++++++++++++++++ tests/cases/test_http/test_single_flight.py | 59 +++++++++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 qiniu/http/single_flight.py create mode 100644 tests/cases/test_http/test_single_flight.py diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index aa22568c..a521a471 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -15,6 +15,7 @@ from .region import Region, ServiceName from .default_client import qn_http_client from .middleware import RetryDomainsMiddleware +from .single_flight import SingleFlight class RegionsProvider: @@ -70,6 +71,9 @@ def _get_region_from_query(data, **kwargs): ) +_query_regions_single_flight = SingleFlight() + + class QueryRegionsProvider(RegionsProvider): def __init__( self, @@ -95,7 +99,15 @@ def __init__( self.max_retry_times_per_endpoint = max_retry_times_per_endpoint def __iter__(self): - regions = self.__fetch_regions() + endpoints_md5 = io_md5([ + to_bytes(e.host) for e in self.endpoints_provider + ]) + flight_key = ':'.join([ + endpoints_md5, + self.access_key, + self.bucket_name + ]) + regions = _query_regions_single_flight.do(flight_key, self.__fetch_regions) # change to `yield from` when min version of python update to >= 3.3 for r in regions: yield r diff --git a/qiniu/http/single_flight.py b/qiniu/http/single_flight.py new file mode 100644 index 00000000..28536de0 --- /dev/null +++ b/qiniu/http/single_flight.py @@ -0,0 +1,50 @@ +import threading + + +class _FlightLock: + """ + Do not use dataclass which caused the event created only once + """ + def __init__(self): + self.event = threading.Event() + self.result = None + self.error = None + + +class SingleFlight: + def __init__(self): + self._locks = {} + self._lock = threading.Lock() + + def do(self, key, fn, *args, **kwargs): + # here does not use `with` statement + # because need to wait by another object if it exists, + # and reduce the `acquire` times if it not exists + self._lock.acquire() + if key in self._locks: + flight_lock = self._locks[key] + + self._lock.release() + flight_lock.event.wait() + + if flight_lock.error: + raise flight_lock.error + return flight_lock.result + + flight_lock = _FlightLock() + self._locks[key] = flight_lock + self._lock.release() + + try: + flight_lock.result = fn(*args, **kwargs) + except Exception as e: + flight_lock.error = e + finally: + flight_lock.event.set() + + with self._lock: + del self._locks[key] + + if flight_lock.error: + raise flight_lock.error + return flight_lock.result diff --git a/tests/cases/test_http/test_single_flight.py b/tests/cases/test_http/test_single_flight.py new file mode 100644 index 00000000..48748ecd --- /dev/null +++ b/tests/cases/test_http/test_single_flight.py @@ -0,0 +1,59 @@ +import pytest +import time +from multiprocessing.pool import ThreadPool + +from qiniu.http.single_flight import SingleFlight + +class TestSingleFlight: + def test_single_flight_success(self): + sf = SingleFlight() + + def fn(): + return "result" + + result = sf.do("key1", fn) + assert result == "result" + + def test_single_flight_exception(self): + sf = SingleFlight() + + def fn(): + raise ValueError("error") + + with pytest.raises(ValueError, match="error"): + sf.do("key2", fn) + + def test_single_flight_concurrent(self): + sf = SingleFlight() + share_state = [] + results = [] + + def fn(): + time.sleep(1) + share_state.append('share_state') + return "result" + + def worker(_n): + result = sf.do("key3", fn) + results.append(result) + + ThreadPool(2).map(worker, range(5)) + + assert len(share_state) == 3 + assert all(result == "result" for result in results) + + def test_single_flight_different_keys(self): + sf = SingleFlight() + results = [] + + def fn(): + time.sleep(1) + return "result" + + def worker(n): + result = sf.do("key{}".format(n), fn) + results.append(result) + + ThreadPool(2).map(worker, range(2)) + assert len(results) == 2 + assert all(result == "result" for result in results) From ae5773041eed273370a735cd55430b28c5caf885 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Wed, 11 Dec 2024 14:19:10 +0800 Subject: [PATCH 04/12] test: fix test cases --- qiniu/http/regions_provider.py | 3 ++- tests/cases/test_http/test_regions_provider.py | 8 +++++++- tests/cases/test_zone/test_qiniu_conf.py | 10 +++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index a521a471..de7f1350 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -524,7 +524,8 @@ def __iter__(self): self.__get_regions_from_base_provider ] - regions = [] + # set the fallback to None for raise errors when failed + regions = None for get_regions in get_regions_fns: regions = get_regions(fallback=regions) if regions and all(r.is_live for r in regions): diff --git a/tests/cases/test_http/test_regions_provider.py b/tests/cases/test_http/test_regions_provider.py index ba84faec..fb13c17d 100644 --- a/tests/cases/test_http/test_regions_provider.py +++ b/tests/cases/test_http/test_regions_provider.py @@ -275,7 +275,13 @@ def test_shrink_with_expired_regions(self, cached_regions_provider): origin_cache_key = cached_regions_provider.cache_key cached_regions_provider.set_regions([expired_region]) cached_regions_provider.cache_key = 'another-cache-key' - list(cached_regions_provider) # trigger __shrink_cache() + + # trigger __shrink_cache() + cached_regions_provider._cache_scope = cached_regions_provider._cache_scope._replace( + last_shrink_at=datetime.datetime.fromtimestamp(0) + ) + list(cached_regions_provider) + assert len(cached_regions_provider._cache_scope.memo_cache[origin_cache_key]) == 0 def test_shrink_with_ignore_expired_regions(self, cached_regions_provider): diff --git a/tests/cases/test_zone/test_qiniu_conf.py b/tests/cases/test_zone/test_qiniu_conf.py index c6bce5b8..0c05dfaf 100644 --- a/tests/cases/test_zone/test_qiniu_conf.py +++ b/tests/cases/test_zone/test_qiniu_conf.py @@ -51,7 +51,7 @@ def test_config_compatible(self, set_conf_default): 'set_conf_default', [ { - 'default_query_region_host': 'https://fake-uc.phpsdk.qiniu.com' + 'default_query_region_host': 'https://fake-uc.pysdk.qiniu.com' } ], indirect=True @@ -66,9 +66,9 @@ def test_query_region_with_custom_domain(self, access_key, bucket_name, set_conf 'set_conf_default', [ { - 'default_query_region_host': 'https://fake-uc.phpsdk.qiniu.com', + 'default_query_region_host': 'https://fake-uc.pysdk.qiniu.com', 'default_query_region_backup_hosts': [ - 'unavailable-uc.phpsdk.qiniu.com', + 'unavailable-uc.pysdk.qiniu.com', 'uc.qbox.me' ] } @@ -78,13 +78,13 @@ def test_query_region_with_custom_domain(self, access_key, bucket_name, set_conf def test_query_region_with_backup_domains(self, access_key, bucket_name, set_conf_default): zone = Zone() data = zone.bucket_hosts(access_key, bucket_name) - assert data != 'null' + assert data != 'null' and len(data) > 0 @pytest.mark.parametrize( 'set_conf_default', [ { - 'default_uc_host': 'https://fake-uc.phpsdk.qiniu.com', + 'default_uc_host': 'https://fake-uc.pysdk.qiniu.com', 'default_query_region_backup_hosts': [ 'unavailable-uc.phpsdk.qiniu.com', 'uc.qbox.me' From cd942fc67d96b9f9673c9ee885c09f0093e19a01 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Wed, 11 Dec 2024 14:46:10 +0800 Subject: [PATCH 05/12] fix: python2 compatible and flake8 style --- qiniu/http/regions_provider.py | 5 ++--- tests/cases/test_http/test_regions_provider.py | 15 +++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index de7f1350..57a34e44 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -205,7 +205,6 @@ def _file_path(self): elif is_windows: import msvcrt - class _FileLocker: def __init__(self, fd): self._fd = fd @@ -235,13 +234,13 @@ def __enter__(self): open_flags = os.O_EXCL | os.O_RDWR | os.O_CREAT fd = os.open(self.lock_file_path, open_flags) os.close(fd) - except FileExistsError: + except IOError: raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) def __exit__(self, exc_type, exc_val, exc_tb): try: os.remove(self.lock_file_path) - except FileNotFoundError: + except IOError: pass @property diff --git a/tests/cases/test_http/test_regions_provider.py b/tests/cases/test_http/test_regions_provider.py index fb13c17d..73dca89d 100644 --- a/tests/cases/test_http/test_regions_provider.py +++ b/tests/cases/test_http/test_regions_provider.py @@ -294,21 +294,20 @@ def test_shrink_with_ignore_expired_regions(self, cached_regions_provider): list(cached_regions_provider) # trigger __shrink_cache() assert len(cached_regions_provider._cache_scope.memo_cache[origin_cache_key]) > 0 - def test_file_locker(self, temp_file_path): - handled_cnt = 0 - skipped_cnt = 0 + def test_file_locker(self, temp_file_path, use_ref): + handled_cnt = use_ref(0) + skipped_cnt = use_ref(0) def process_file(_n): - nonlocal handled_cnt, skipped_cnt try: with open(temp_file_path, 'w') as f, _FileThreadingLocker(f), _FileLocker(f): time.sleep(1) - handled_cnt += 1 + handled_cnt.value += 1 except FileAlreadyLocked: - skipped_cnt += 1 + skipped_cnt.value += 1 ThreadPool(4).map(process_file, range(20)) - assert handled_cnt + skipped_cnt == 20 - assert 0 < handled_cnt <= 4 + assert handled_cnt.value + skipped_cnt.value == 20 + assert 0 < handled_cnt.value <= 4 From b8e29a779226d020b08084a95cffb419a62ce6e8 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Wed, 11 Dec 2024 14:49:13 +0800 Subject: [PATCH 06/12] docs: update version and changelogs --- CHANGELOG.md | 4 ++++ qiniu/__init__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5722b56b..40f8fc7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ # Changelog +## 7.16.0 +* 对象存储,优化并发场景的区域查询 +* CDN,查询域名带宽,支持 `data_type` 参数 + ## 7.15.0 * 对象存储,持久化处理支持工作流模版 * 对象存储,修复 Windows 平台兼容性问题 diff --git a/qiniu/__init__.py b/qiniu/__init__.py index a18aac2c..d097fdb8 100644 --- a/qiniu/__init__.py +++ b/qiniu/__init__.py @@ -9,7 +9,7 @@ # flake8: noqa -__version__ = '7.15.0' +__version__ = '7.16.0' from .auth import Auth, QiniuMacAuth From e33c0e19e23ba9a54ffff5d6f10e26df9e508b8e Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Wed, 11 Dec 2024 14:53:05 +0800 Subject: [PATCH 07/12] styles: fix flake8 styles --- qiniu/services/cdn/manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/qiniu/services/cdn/manager.py b/qiniu/services/cdn/manager.py index 8ba944e5..6700ecaf 100644 --- a/qiniu/services/cdn/manager.py +++ b/qiniu/services/cdn/manager.py @@ -9,6 +9,7 @@ import hashlib + class DataType(Enum): BANDWIDTH = 'bandwidth' X302BANDWIDTH = '302bandwidth' @@ -17,6 +18,7 @@ class DataType(Enum): X302FLOW = '302flow' X302MFLOW = '302mflow' + def urlencode(str): if is_py2: import urllib2 @@ -118,7 +120,7 @@ def get_bandwidth_data(self, domains, start_date, end_date, granularity, data_ty req.update({"endDate": end_date}) req.update({"granularity": granularity}) if data_type is not None: - req.update({'type': data_type.value}) # should be one of 'bandwidth', '302bandwidth', '302mbandwidth' + req.update({'type': data_type.value}) # should be one of 'bandwidth', '302bandwidth', '302mbandwidth' body = json.dumps(req) url = '{0}/v2/tune/bandwidth'.format(self.server) @@ -145,7 +147,7 @@ def get_flux_data(self, domains, start_date, end_date, granularity, data_type=No req.update({"endDate": end_date}) req.update({"granularity": granularity}) if data_type is not None: - req.update({'type': data_type.value}) # should be one of 'flow', '302flow', '302mflow' + req.update({'type': data_type.value}) # should be one of 'flow', '302flow', '302mflow' body = json.dumps(req) url = '{0}/v2/tune/flux'.format(self.server) From b6fd793836a5f3f7c9f61d96c84bce4fbcc6eb14 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Thu, 12 Dec 2024 11:14:14 +0800 Subject: [PATCH 08/12] fix: file lock on windows --- qiniu/http/regions_provider.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index 57a34e44..cbcb7228 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -211,13 +211,12 @@ def __init__(self, fd): def __enter__(self): try: - # TODO(lihs): set `_nbyte` bigger? - msvcrt.locking(self._fd, msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1) + msvcrt.locking(self._fd.fileno(), msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1) except OSError: raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) def __exit__(self, exc_type, exc_val, exc_tb): - msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1) + msvcrt.locking(self._fd.fileno(), msvcrt.LK_UNLCK, 1) @property def _file_path(self): From 166be8f7b062a747a13ba4c7920254cb2d768264 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Thu, 12 Dec 2024 20:40:18 +0800 Subject: [PATCH 09/12] fix: windows locker --- qiniu/http/regions_provider.py | 44 ++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index cbcb7228..c4415973 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -208,38 +208,64 @@ def _file_path(self): class _FileLocker: def __init__(self, fd): self._fd = fd + self._lock_fd = None + self._already_locked = False def __enter__(self): try: - msvcrt.locking(self._fd.fileno(), msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1) + self._lock_fd = open(self._lock_file_path, 'w') + msvcrt.locking(self._lock_fd.fileno(), msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1) except OSError: + self._already_locked = True raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) def __exit__(self, exc_type, exc_val, exc_tb): - msvcrt.locking(self._fd.fileno(), msvcrt.LK_UNLCK, 1) + if self._already_locked: + if self._lock_fd: + self._lock_fd.close() + return + + try: + msvcrt.locking(self._lock_fd.fileno(), msvcrt.LK_UNLCK, 1) + finally: + self._lock_fd.close() + os.remove(self._lock_file_path) @property def _file_path(self): return self._fd.name + @property + def _lock_file_path(self): + """ + Returns + ------- + str + """ + return self._file_path + '.lock' + else: class _FileLocker: def __init__(self, fd): self._fd = fd + self._already_locked = False def __enter__(self): try: # Atomic file creation open_flags = os.O_EXCL | os.O_RDWR | os.O_CREAT - fd = os.open(self.lock_file_path, open_flags) + fd = os.open(self._lock_file_path, open_flags) os.close(fd) - except IOError: + except OSError: + self._already_locked = True raise FileAlreadyLocked('File {0} already locked'.format(self._file_path)) def __exit__(self, exc_type, exc_val, exc_tb): + if self._already_locked: + return try: - os.remove(self.lock_file_path) - except IOError: + os.remove(self._lock_file_path) + except OSError: pass @property @@ -247,7 +273,7 @@ def _file_path(self): return self._fd.name @property - def lock_file_path(self): + def _lock_file_path(self): """ Returns ------- @@ -770,6 +796,10 @@ def __shrink_cache(self): os.chmod(shrink_file_path, 0o666) # rename file + if is_windows: + # windows must close first, or will raise permission error + # be careful to do something with the file after this + f.close() shutil.move(shrink_file_path, self._cache_scope.persist_path) # update last shrink time From dcc0da4128d2f9f9529d2aa25fe389a4f022a268 Mon Sep 17 00:00:00 2001 From: lihsai0 <lihsai0@gmail.com> Date: Mon, 16 Dec 2024 11:50:58 +0800 Subject: [PATCH 10/12] feat: no raise error if regions cache shrink failed --- qiniu/http/regions_provider.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index c4415973..e75b6457 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -540,7 +540,10 @@ def __init__( def __iter__(self): if self.__should_shrink: - self.__shrink_cache() + try: + self.__shrink_cache() + except Exception as err: + logging.warning('failed to shrink cache', err) get_regions_fns = [ self.__get_regions_from_memo, From 132c9d2893c80646405a40818ea9bc61560b6a84 Mon Sep 17 00:00:00 2001 From: Jie Liu <eirture@gmail.com> Date: Fri, 14 Feb 2025 15:50:30 +0800 Subject: [PATCH 11/12] fix: error messages format --- qiniu/http/regions_provider.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index e75b6457..19d85161 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -543,7 +543,7 @@ def __iter__(self): try: self.__shrink_cache() except Exception as err: - logging.warning('failed to shrink cache', err) + logging.warning('failed to shrink cache. error: %s', err) get_regions_fns = [ self.__get_regions_from_memo, @@ -581,7 +581,7 @@ def set_regions(self, regions): 'regions': [_persist_region(r) for r in regions] }) + os.linesep) except Exception as err: - logging.warning('failed to cache regions result to file', err) + logging.warning('failed to cache regions result to file. error: %s', err) @property def persist_path(self): From 322adecbc8804d483a48267afb6c047b4d61f3ad Mon Sep 17 00:00:00 2001 From: Jie Liu <eirture@gmail.com> Date: Mon, 17 Feb 2025 20:44:06 +0800 Subject: [PATCH 12/12] fix: permissions of the region provider cache file --- qiniu/http/regions_provider.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/qiniu/http/regions_provider.py b/qiniu/http/regions_provider.py index 19d85161..c451d166 100644 --- a/qiniu/http/regions_provider.py +++ b/qiniu/http/regions_provider.py @@ -1,5 +1,6 @@ import abc import datetime +import errno import itertools from collections import namedtuple import logging @@ -300,7 +301,8 @@ def _lock_file_path(self): memo_cache={}, persist_path=os.path.join( tempfile.gettempdir(), - 'qn-py-sdk-regions-cache.jsonl' + 'qn-py-sdk', + 'regions-cache.jsonl' ), last_shrink_at=datetime.datetime.fromtimestamp(0), shrink_interval=datetime.timedelta(days=1), @@ -520,8 +522,24 @@ def __init__( persist_path = kwargs.get('persist_path', None) last_shrink_at = datetime.datetime.fromtimestamp(0) if persist_path is None: - persist_path = _global_cache_scope.persist_path - last_shrink_at = _global_cache_scope.last_shrink_at + cache_dir = os.path.dirname(_global_cache_scope.persist_path) + try: + # make sure the cache dir is available for all users. + # we can not use the '/tmp' dir directly on linux, + # because the permission is 0o1777 + if not os.path.exists(cache_dir): + # os.makedirs have no exists_ok parameter in python 2.7 + os.makedirs(cache_dir) + os.chmod(cache_dir, 0o777) + persist_path = _global_cache_scope.persist_path + last_shrink_at = _global_cache_scope.last_shrink_at + except Exception as err: + if isinstance(err, OSError) and err.errno == errno.EEXIST: + persist_path = _global_cache_scope.persist_path + last_shrink_at = _global_cache_scope.last_shrink_at + else: + logging.warning( + 'failed to create cache dir %s. error: %s', cache_dir, err) shrink_interval = kwargs.get('shrink_interval', None) if shrink_interval is None: