-
Notifications
You must be signed in to change notification settings - Fork 394
feat: add Session binding capability via session_id
in Request
#1086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
38b5531
08a7d01
a67d3b3
039f12f
88c0e8c
7fb5fde
300cf4f
6037829
5e94912
25a21a0
14b8974
d633930
b5fc9a4
1a8cd22
b5d1dc5
9557ac5
448f9e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
import asyncio | ||
from datetime import timedelta | ||
from typing import Callable | ||
|
||
from crawlee import ConcurrencySettings, Request | ||
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext | ||
from crawlee.errors import RequestCollisionError | ||
from crawlee.sessions import Session, SessionPool | ||
|
||
|
||
# Define a function for creating sessions with simple logic for unique `id` generation. | ||
# This is necessary if you need to specify a particular session for the first request, | ||
# for example during authentication | ||
def create_session_function() -> Callable[[], Session]: | ||
counter = 0 | ||
|
||
def create_session() -> Session: | ||
nonlocal counter | ||
counter += 1 | ||
return Session( | ||
id=str(counter), | ||
max_usage_count=999_999, | ||
max_age=timedelta(hours=999_999), | ||
max_error_score=100, | ||
blocked_status_codes=[403], | ||
) | ||
|
||
return create_session | ||
|
||
|
||
async def main() -> None: | ||
crawler = HttpCrawler( | ||
# Adjust request limits according to your pool size | ||
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=500), | ||
# Requests are bound to specific sessions, no rotation needed | ||
max_session_rotations=0, | ||
session_pool=SessionPool( | ||
max_pool_size=10, create_session_function=create_session_function() | ||
), | ||
) | ||
|
||
@crawler.router.default_handler | ||
async def basic_handler(context: HttpCrawlingContext) -> None: | ||
context.log.info(f'Processing {context.request.url}') | ||
|
||
# Initialize the session and bind the next request to this session if needed | ||
@crawler.router.handler(label='session_init') | ||
async def session_init(context: HttpCrawlingContext) -> None: | ||
next_requests = [] | ||
if context.session: | ||
context.log.info(f'Init session {context.session.id}') | ||
next_request = Request.from_url( | ||
'https://placeholder.dev', session_id=context.session.id | ||
) | ||
next_requests.append(next_request) | ||
|
||
await context.add_requests(next_requests) | ||
|
||
# Handle errors when a session is blocked and no longer available in the pool | ||
# when attempting to execute requests bound to it | ||
@crawler.failed_request_handler | ||
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None: | ||
if isinstance(error, RequestCollisionError) and context.session: | ||
context.log.error( | ||
f'Request {context.request.url} failed, because the bound ' | ||
'session is unavailable' | ||
) | ||
|
||
# Create a pool of requests bound to their respective sessions | ||
# Use `always_enqueue=True` if session initialization happens on a non-unique address, | ||
# such as the site's main page | ||
init_requests = [ | ||
Request.from_url( | ||
'https://example.org/', | ||
label='session_init', | ||
session_id=str(session_id), | ||
always_enqueue=True, | ||
) | ||
for session_id in range(1, 11) | ||
] | ||
|
||
await crawler.run(init_requests) | ||
|
||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import asyncio | ||
from datetime import timedelta | ||
|
||
from crawlee import ConcurrencySettings, Request | ||
from crawlee.crawlers import BasicCrawlingContext, HttpCrawler, HttpCrawlingContext | ||
from crawlee.errors import SessionError | ||
from crawlee.sessions import SessionPool | ||
|
||
|
||
async def main() -> None: | ||
crawler = HttpCrawler( | ||
# Limit requests per minute to reduce the chance of being blocked | ||
concurrency_settings=ConcurrencySettings(max_tasks_per_minute=50), | ||
# Disable session rotation | ||
max_session_rotations=0, | ||
session_pool=SessionPool( | ||
# Only one session in the pool | ||
max_pool_size=1, | ||
create_session_settings={ | ||
# High value for session usage limit | ||
'max_usage_count': 999_999, | ||
# High value for session lifetime | ||
'max_age': timedelta(hours=999_999), | ||
# High score allows the session to encounter more errors | ||
# before crawlee decides the session is blocked | ||
# Make sure you know how to handle these errors | ||
'max_error_score': 100, | ||
# 403 status usually indicates you're already blocked | ||
'blocked_status_codes': [403], | ||
}, | ||
), | ||
) | ||
|
||
# Basic request handling logic | ||
@crawler.router.default_handler | ||
async def basic_handler(context: HttpCrawlingContext) -> None: | ||
context.log.info(f'Processing {context.request.url}') | ||
|
||
# Handler for session initialization (authentication, initial cookies, etc.) | ||
@crawler.router.handler(label='session_init') | ||
async def session_init(context: HttpCrawlingContext) -> None: | ||
if context.session: | ||
context.log.info(f'Init session {context.session.id}') | ||
|
||
# Monitor if our session gets blocked and explicitly stop the crawler | ||
@crawler.error_handler | ||
async def error_processing(context: BasicCrawlingContext, error: Exception) -> None: | ||
if isinstance(error, SessionError) and context.session: | ||
context.log.info(f'Session {context.session.id} blocked') | ||
crawler.stop() | ||
|
||
await crawler.run([Request.from_url('https://example.org/', label='session_init')]) | ||
|
||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,9 @@ class CrawleeRequestData(BaseModel): | |
crawl_depth: Annotated[int, Field(alias='crawlDepth')] = 0 | ||
"""The depth of the request in the crawl tree.""" | ||
|
||
session_id: Annotated[str | None, Field()] = None | ||
"""ID of a session to which the request is bound.""" | ||
|
||
|
||
class UserData(BaseModel, MutableMapping[str, JsonSerializable]): | ||
"""Represents the `user_data` part of a Request. | ||
|
@@ -84,6 +87,7 @@ def __setitem__(self, key: str, value: JsonSerializable) -> None: | |
raise ValueError('`label` must be str or None') | ||
|
||
self.label = value | ||
|
||
self.__pydantic_extra__[key] = value | ||
|
||
def __delitem__(self, key: str) -> None: | ||
|
@@ -119,6 +123,7 @@ class RequestOptions(TypedDict): | |
headers: NotRequired[HttpHeaders | dict[str, str] | None] | ||
payload: NotRequired[HttpPayload | str | None] | ||
label: NotRequired[str | None] | ||
session_id: NotRequired[str | None] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't CC @vdusek - you wrote a big part of the unique key functionality. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, deduplication will affect this. But I expect that users will use existing mechanisms to return a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good point! Currently, it infers the |
||
unique_key: NotRequired[str | None] | ||
id: NotRequired[str | None] | ||
keep_url_fragment: NotRequired[bool] | ||
|
@@ -227,6 +232,7 @@ def from_url( | |
headers: HttpHeaders | dict[str, str] | None = None, | ||
payload: HttpPayload | str | None = None, | ||
label: str | None = None, | ||
session_id: str | None = None, | ||
unique_key: str | None = None, | ||
id: str | None = None, | ||
keep_url_fragment: bool = False, | ||
|
@@ -248,6 +254,9 @@ def from_url( | |
payload: The data to be sent as the request body. Typically used with 'POST' or 'PUT' requests. | ||
label: A custom label to differentiate between request types. This is stored in `user_data`, and it is | ||
used for request routing (different requests go to different handlers). | ||
session_id: ID of a specific Session to which the request will be strictly bound. | ||
If the session becomes unavailable when the request is processed, a RequestCollisionError will be | ||
raised. | ||
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
unique_key: A unique key identifying the request. If not provided, it is automatically computed based on | ||
the URL and other parameters. Requests with the same `unique_key` are treated as identical. | ||
id: A unique identifier for the request. If not provided, it is automatically generated from the | ||
|
@@ -296,6 +305,9 @@ def from_url( | |
if label is not None: | ||
request.user_data['label'] = label | ||
|
||
if session_id is not None: | ||
request.crawlee_data.session_id = session_id | ||
|
||
return request | ||
|
||
def get_query_param_from_url(self, param: str, *, default: str | None = None) -> str | None: | ||
|
@@ -308,6 +320,11 @@ def label(self) -> str | None: | |
"""A string used to differentiate between arbitrary request types.""" | ||
return cast('UserData', self.user_data).label | ||
|
||
@property | ||
def session_id(self) -> str | None: | ||
"""A string used to identify the bound session.""" | ||
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return self.crawlee_data.session_id | ||
|
||
@property | ||
def crawlee_data(self) -> CrawleeRequestData: | ||
"""Crawlee-specific configuration stored in the `user_data`.""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ | |
ContextPipelineInterruptedError, | ||
HttpClientStatusCodeError, | ||
HttpStatusCodeError, | ||
RequestCollisionError, | ||
RequestHandlerError, | ||
SessionError, | ||
UserDefinedErrorHandlerError, | ||
|
@@ -439,6 +440,20 @@ async def _get_session(self) -> Session | None: | |
logger=self._logger, | ||
) | ||
|
||
async def _get_session_by_id(self, session_id: str | None) -> Session | None: | ||
"""If session pool is being used, try to take a session by id from it.""" | ||
if not self._use_session_pool or not session_id: | ||
return None | ||
|
||
return await wait_for( | ||
partial(self._session_pool.get_session_by_id, session_id), | ||
timeout=self._internal_timeout, | ||
timeout_message='Fetching a session from the pool timed out after ' | ||
f'{self._internal_timeout.total_seconds()} seconds', | ||
max_retries=3, | ||
logger=self._logger, | ||
) | ||
|
||
async def _get_proxy_info(self, request: Request, session: Session | None) -> ProxyInfo | None: | ||
"""Retrieve a new ProxyInfo object based on crawler configuration and the current request and session.""" | ||
if not self._proxy_configuration: | ||
|
@@ -1043,7 +1058,10 @@ async def __run_task_function(self) -> None: | |
if request is None: | ||
return | ||
|
||
session = await self._get_session() | ||
if request.session_id: | ||
session = await self._get_session_by_id(request.session_id) | ||
else: | ||
session = await self._get_session() | ||
proxy_info = await self._get_proxy_info(request, session) | ||
result = RequestHandlerRunResult(key_value_store_getter=self.get_key_value_store) | ||
|
||
|
@@ -1066,6 +1084,8 @@ async def __run_task_function(self) -> None: | |
try: | ||
request.state = RequestState.REQUEST_HANDLER | ||
|
||
self._raise_request_collision(context.request, context.session) | ||
|
||
try: | ||
await self._run_request_handler(context=context) | ||
except asyncio.TimeoutError as e: | ||
|
@@ -1088,6 +1108,10 @@ async def __run_task_function(self) -> None: | |
|
||
self._statistics.record_request_processing_finish(statistics_id) | ||
|
||
except RequestCollisionError as request_error: | ||
context.request.no_retry = True | ||
await self._handle_request_error(context, request_error) | ||
|
||
janbuchar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
except RequestHandlerError as primary_error: | ||
primary_error = cast( | ||
'RequestHandlerError[TCrawlingContext]', primary_error | ||
|
@@ -1204,3 +1228,18 @@ def _raise_for_session_blocked_status_code(self, session: Session | None, status | |
ignore_http_error_status_codes=self._ignore_http_error_status_codes, | ||
): | ||
raise SessionError(f'Assuming the session is blocked based on HTTP status code {status_code}') | ||
|
||
def _raise_request_collision(self, request: Request, session: Session | None) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to call this |
||
"""Raise an exception if a request cannot access required resources. | ||
|
||
Args: | ||
request: The Request that might require specific resources (like a session). | ||
session: The Session object that was retrieved for the request, or None if not available. | ||
|
||
Raises: | ||
RequestCollisionError: If the Session referenced by the request is not available. | ||
""" | ||
vdusek marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if self._use_session_pool and request.session_id and not session: | ||
raise RequestCollisionError( | ||
f'The Session (id: {request.session_id}) bound to the Request is no longer available in SessionPool' | ||
) |
Uh oh!
There was an error while loading. Please reload this page.