-
Notifications
You must be signed in to change notification settings - Fork 364
/
Copy path_map.py
72 lines (52 loc) · 2.05 KB
/
_map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from typing import Callable, Optional, TypeVar, cast
from reactivex import Observable, abc, compose
from reactivex import operators as ops
from reactivex import typing
from reactivex.internal.basic import identity
from reactivex.internal.utils import infinite
from reactivex.typing import Mapper, MapperIndexed
_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")
def map_(
mapper: Optional[Mapper[_T1, _T2]] = None
) -> Callable[[Observable[_T1]], Observable[_T2]]:
_mapper = mapper or cast(Mapper[_T1, _T2], identity)
def map(source: Observable[_T1]) -> Observable[_T2]:
"""Partially applied map operator.
Project each element of an observable sequence into a new form
by incorporating the element's index.
Example:
>>> map(source)
Args:
source: The observable source to transform.
Returns:
Returns an observable sequence whose elements are the
result of invoking the transform function on each element
of the source.
"""
def subscribe(
obv: abc.ObserverBase[_T2], scheduler: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
def on_next(value: _T1) -> None:
try:
result = _mapper(value)
except Exception as err: # pylint: disable=broad-except
obv.on_error(err)
else:
obv.on_next(result)
return source.subscribe(
on_next, obv.on_error, obv.on_completed, scheduler=scheduler
)
return Observable(subscribe)
return map
def map_indexed_(
mapper_indexed: Optional[MapperIndexed[_T1, _T2]] = None
) -> Callable[[Observable[_T1]], Observable[_T2]]:
def _identity(value: _T1, _: int) -> _T2:
return cast(_T2, value)
_mapper_indexed = mapper_indexed or cast(typing.MapperIndexed[_T1, _T2], _identity)
return compose(
ops.zip_with_iterable(infinite()),
ops.starmap(_mapper_indexed),
)
__all__ = ["map_", "map_indexed_"]