|
1 | 1 | from contextlib import contextmanager |
2 | 2 | import sys |
3 | | -from typing import Dict |
4 | | -from threading import Thread |
5 | | -from time import sleep |
| 3 | +from typing import List |
| 4 | +from threading import Thread, Event |
| 5 | +from pipx.util import emoji_support |
6 | 6 |
|
7 | 7 |
|
8 | 8 | @contextmanager |
9 | 9 | def animate(message: str, do_animation: bool): |
10 | | - animate = {"do_animation": do_animation, "message": message} |
11 | | - t = Thread(target=print_animation, args=(animate,)) |
12 | | - t.start() |
| 10 | + event = Event() |
| 11 | + |
| 12 | + if emoji_support: |
| 13 | + symbols = ["⣷", "⣯", "⣟", "⡿", "⢿", "⣻", "⣽", "⣾"] |
| 14 | + message = message + " " |
| 15 | + incremental_wait_time = 0.1 |
| 16 | + else: |
| 17 | + symbols = ["", ".", "..", "..."] |
| 18 | + incremental_wait_time = 1 |
| 19 | + max_symbol_len = max(len(s) for s in symbols) |
| 20 | + spaces = " " * (len(message) + max_symbol_len) |
| 21 | + clear_line = f"\r{spaces}\r" |
| 22 | + |
| 23 | + thread_kwargs = { |
| 24 | + "message": message, |
| 25 | + "clear_line": clear_line, |
| 26 | + "event": event, |
| 27 | + "symbols": symbols, |
| 28 | + "delay": 0, |
| 29 | + "incremental_wait_time": incremental_wait_time, |
| 30 | + } |
| 31 | + |
| 32 | + if do_animation and sys.stderr.isatty(): |
| 33 | + t = Thread(target=print_animation, kwargs=thread_kwargs) |
| 34 | + t.start() |
| 35 | + |
13 | 36 | try: |
14 | 37 | yield |
15 | 38 | finally: |
16 | | - animate["do_animation"] = False |
17 | | - t.join(0) |
| 39 | + event.set() |
| 40 | + sys.stderr.write(clear_line) |
18 | 41 |
|
19 | 42 |
|
20 | | -def print_animation(meta: Dict[str, bool]): |
21 | | - if not sys.stdout.isatty(): |
| 43 | +def print_animation( |
| 44 | + *, |
| 45 | + message: str, |
| 46 | + clear_line: str, |
| 47 | + event: Event, |
| 48 | + symbols: List[str], |
| 49 | + delay: float, |
| 50 | + incremental_wait_time: float, |
| 51 | +): |
| 52 | + if event.wait(delay): |
22 | 53 | return |
23 | | - |
24 | | - cur = "." |
25 | | - longest_len = 0 |
26 | | - sleep(1) |
27 | | - while meta["do_animation"]: |
28 | | - if cur == "": |
29 | | - cur = "." |
30 | | - elif cur == ".": |
31 | | - cur = ".." |
32 | | - elif cur == "..": |
33 | | - cur = "..." |
34 | | - else: |
35 | | - cur = "" |
36 | | - message = f"{meta['message']}{cur}" |
37 | | - longest_len = max(len(message), longest_len) |
38 | | - sys.stdout.write(" " * longest_len) |
39 | | - sys.stdout.write("\r") |
40 | | - sys.stdout.write(message) |
41 | | - sys.stdout.write("\r") |
42 | | - sleep(0.5) |
43 | | - sys.stdout.write(" " * longest_len) |
44 | | - sys.stdout.write("\r") |
| 54 | + while True: |
| 55 | + for s in symbols: |
| 56 | + cur_line = f"{message}{s}" |
| 57 | + sys.stderr.write(clear_line + cur_line + "\r") |
| 58 | + if event.wait(incremental_wait_time): |
| 59 | + return |
0 commit comments