Skip to content

Commit

Permalink
update fastapi version, add run_in_processpool api
Browse files Browse the repository at this point in the history
  • Loading branch information
utmhikari committed Dec 6, 2020
1 parent 898c424 commit b5403c1
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 15 deletions.
32 changes: 32 additions & 0 deletions application/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
Concurrency module
"""
import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import typing
from application import logger
import pprint

LOGGER = logger.get_application_logger()

T = typing.TypeVar('T')

PROCESS_POOL_EXECUTOR = ProcessPoolExecutor()


async def run_in_processpool(func: typing.Callable[..., T],
*args: typing.Any,
**kwargs: typing.Any) -> T:
"""
run in process pool executor
:param func: function
:param args: args
:param kwargs: keyword args
:return:
"""
LOGGER.debug('Run in processpool with func: %s, args: %s, kwargs: %s' %
(pprint.pformat(func), pprint.pformat(args), pprint.pformat(kwargs)))
loop = asyncio.get_event_loop()
f = functools.partial(func, **kwargs)
return await loop.run_in_executor(None, f, *args)
13 changes: 12 additions & 1 deletion controller/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,22 @@ def update_item(item_id: int, item: Item):
return success(item)


@router.get('/v1/items/print')
@router.get('/v1/items/print/instant')
def print_items(background_tasks: BackgroundTasks):
"""
print item info in background
an example with
"""
background_tasks.add_task(item_service.print_items_one_by_one, 1)
return success(msg='Printing items now~')


@router.get('/v1/items/print/delay')
def delay_print_items(background_tasks: BackgroundTasks):
"""
print item info in delayed mode
:param background_tasks:
:return:
"""
background_tasks.add_task(item_service.print_items_one_by_one_in_another_process)
return success(msg='launched print items delayed!')
28 changes: 15 additions & 13 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
atomicwrites==1.4.0
attrs==19.3.0
certifi==2020.6.20
attrs==20.3.0
certifi==2020.12.5
chardet==3.0.4
click==7.1.2
colorama==0.4.3
fastapi==0.61.1
h11==0.9.0
colorama==0.4.4
fastapi==0.62.0
h11==0.11.0
idna==2.10
more-itertools==8.4.0
packaging==20.4
iniconfig==1.0.1
more-itertools==8.6.0
packaging==20.7
pluggy==0.13.1
py==1.9.0
pydantic==1.6.1
pydantic==1.7.3
pyparsing==2.4.7
pytest==6.1.1
python-dotenv==0.14.0
pytest==6.1.2
python-dotenv==0.15.0
python-multipart==0.0.5
requests==2.24.0
requests==2.25.0
six==1.15.0
starlette==0.13.6
urllib3==1.25.10
uvicorn==0.11.8
toml==0.10.1
urllib3==1.26.2
uvicorn==0.12.3
wcwidth==0.2.5
websockets==8.1
21 changes: 20 additions & 1 deletion service/item.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from typing import Dict, List
from model.item import Item
from application.logger import get_service_logger
import time, pprint
import time
import pprint
from application import concurrency

LOGGER = get_service_logger('ITEM')

Expand Down Expand Up @@ -42,7 +44,24 @@ def update_item(item_id: int, item: Item) -> bool:
def print_items_one_by_one(interval: int = 3):
if interval <= 0:
interval = 3
LOGGER.info('Start printing items!')
for item_id, item_info in _ITEMS.items():
LOGGER.info('Item %d: %s' % (item_id, pprint.pformat(item_info)))
time.sleep(interval)
LOGGER.info('Finish printing items!')


def __print_single_item(tag: str = 'UNKNOWN', item: Item = Item(name='', price=0.0)):
LOGGER.info('[%s] %s' % (tag, item.json()))


async def print_items_one_by_one_in_another_process(interval: int = 3):
if interval <= 0:
interval = 3
LOGGER.info('Start printing items!')
for item_id, item_info in _ITEMS.items():
await concurrency.run_in_processpool(__print_single_item,
str(item_id),
item=item_info)
time.sleep(interval)
LOGGER.info('Finish printing items!')

0 comments on commit b5403c1

Please sign in to comment.