Skip to content

Commit cab78b4

Browse files
committed
Add other starmap_indexed tests
1 parent 953ce32 commit cab78b4

File tree

1 file changed

+172
-0
lines changed

1 file changed

+172
-0
lines changed

tests/test_observable/test_starmap.py

+172
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,54 @@ def mapper(x, y):
405405
assert xs.subscriptions == [subscribe(200, 290)]
406406
assert invoked[0] == 3
407407

408+
def test_starmap_with_index_throws(self):
409+
with self.assertRaises(RxException):
410+
mapper = ops.starmap_indexed(lambda x, y, index: x)
411+
412+
return return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex"))
413+
414+
with self.assertRaises(RxException):
415+
return (
416+
throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex))
417+
)
418+
419+
with self.assertRaises(RxException):
420+
return (
421+
empty()
422+
.pipe(mapper)
423+
.subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex"))
424+
)
425+
426+
with self.assertRaises(RxException):
427+
return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe()
428+
429+
def test_starmap_with_index_dispose_inside_mapper(self):
430+
scheduler = TestScheduler()
431+
xs = scheduler.create_hot_observable(
432+
on_next(100, (4, 40)), on_next(200, (3, 30)), on_next(500, (2, 20)), on_next(600, (1, 10))
433+
)
434+
invoked = [0]
435+
results = scheduler.create_observer()
436+
d = SerialDisposable()
437+
438+
def projection(x, y, index):
439+
invoked[0] += 1
440+
if scheduler.clock > 400:
441+
d.dispose()
442+
443+
return x + y + index * 100
444+
445+
d.disposable = xs.pipe(ops.starmap_indexed(projection)).subscribe(results)
446+
447+
def action(scheduler, state):
448+
return d.dispose()
449+
450+
scheduler.schedule_absolute(disposed, action)
451+
scheduler.start()
452+
assert results.messages == [on_next(100, 44), on_next(200, 133)]
453+
assert xs.subscriptions == [subscribe(0, 500)]
454+
assert invoked[0] == 3
455+
408456
def test_starmap_with_index_completed(self):
409457
scheduler = TestScheduler()
410458
invoked = [0]
@@ -438,6 +486,130 @@ def projection(x, y, index):
438486
assert xs.subscriptions == [subscribe(200, 400)]
439487
assert invoked[0] == 4
440488

489+
def test_starmap_with_index_default_mapper(self):
490+
scheduler = TestScheduler()
491+
xs = scheduler.create_hot_observable(
492+
on_next(180, (5, 50)),
493+
on_next(210, (4, 40)),
494+
on_next(240, (3, 30)),
495+
on_next(290, (2, 20)),
496+
on_next(350, (1, 10)),
497+
on_completed(400),
498+
on_next(410, (-1, -10)),
499+
on_completed(420),
500+
on_error(430, "ex"),
501+
)
502+
503+
def factory():
504+
return xs.pipe(ops.starmap_indexed())
505+
506+
results = scheduler.start(factory)
507+
assert results.messages == [
508+
on_next(210, (4, 40)),
509+
on_next(240, (3, 30)),
510+
on_next(290, (2, 20)),
511+
on_next(350, (1, 10)),
512+
on_completed(400),
513+
]
514+
515+
assert xs.subscriptions == [subscribe(200, 400)]
516+
517+
def test_starmap_with_index_not_completed(self):
518+
scheduler = TestScheduler()
519+
invoked = [0]
520+
xs = scheduler.create_hot_observable(
521+
on_next(180, (5, 50)),
522+
on_next(210, (4, 40)),
523+
on_next(240, (3, 30)),
524+
on_next(290, (2, 20)),
525+
on_next(350, (1, 10)),
526+
)
527+
528+
def factory():
529+
def projection(x, y, index):
530+
invoked[0] += 1
531+
return (x + 1) + (y + 10) + (index * 100)
532+
533+
return xs.pipe(ops.starmap_indexed(projection))
534+
535+
results = scheduler.start(factory)
536+
assert results.messages == [
537+
on_next(210, 55),
538+
on_next(240, 144),
539+
on_next(290, 233),
540+
on_next(350, 322),
541+
]
542+
assert xs.subscriptions == [subscribe(200, 1000)]
543+
assert invoked[0] == 4
544+
545+
def test_starmap_with_index_error(self):
546+
scheduler = TestScheduler()
547+
ex = "ex"
548+
invoked = [0]
549+
xs = scheduler.create_hot_observable(
550+
on_next(180, (5, 50)),
551+
on_next(210, (4, 40)),
552+
on_next(240, (3, 30)),
553+
on_next(290, (2, 20)),
554+
on_next(350, (1, 10)),
555+
on_error(400, ex),
556+
on_next(410, (-1, -10)),
557+
on_completed(420),
558+
on_error(430, "ex"),
559+
)
560+
561+
def factory():
562+
def projection(x, y, index):
563+
invoked[0] += 1
564+
return (x + 1) + (y + 10) + (index * 100)
565+
566+
return xs.pipe(ops.starmap_indexed(projection))
567+
568+
results = scheduler.start(factory)
569+
570+
assert results.messages == [
571+
on_next(210, 55),
572+
on_next(240, 144),
573+
on_next(290, 233),
574+
on_next(350, 322),
575+
on_error(400, ex),
576+
]
577+
assert xs.subscriptions == [subscribe(200, 400)]
578+
579+
def test_starmap_with_index_mapper_throws(self):
580+
scheduler = TestScheduler()
581+
invoked = [0]
582+
ex = "ex"
583+
xs = scheduler.create_hot_observable(
584+
on_next(180, (5, 50)),
585+
on_next(210, (4, 40)),
586+
on_next(240, (3, 30)),
587+
on_next(290, (2, 20)),
588+
on_next(350, (1, 10)),
589+
on_completed(400),
590+
on_next(410, (-1, -10)),
591+
on_completed(420),
592+
on_error(430, "ex"),
593+
)
594+
595+
def factory():
596+
def projection(x, y, index):
597+
invoked[0] += 1
598+
if invoked[0] == 3:
599+
raise Exception(ex)
600+
return (x + 1) + (y + 10) + (index * 100)
601+
602+
return xs.pipe(ops.starmap_indexed(projection))
603+
604+
results = scheduler.start(factory)
605+
assert results.messages == [
606+
on_next(210, 55),
607+
on_next(240, 144),
608+
on_error(290, ex),
609+
]
610+
assert xs.subscriptions == [subscribe(200, 290)]
611+
assert invoked[0] == 3
612+
441613

442614
if __name__ == "__main__":
443615
unittest.main()

0 commit comments

Comments
 (0)