-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_with_thread_input.py
95 lines (79 loc) · 2.88 KB
/
main_with_thread_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import logging
import queue
import sys
import time
import click
from LedLine import LedLine
from algo import algos
from control.message import ControlMessage
from control.web import WebControlThread
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s')
def sleep(mode: str, delay: int):
if mode == "pygame":
import pygame
pygame.time.delay(delay)
elif mode in ("console", "led"):
time.sleep(delay / 100.0)
else:
raise ValueError(f"Mode {mode} is not supported")
def log_delay(speed: int) -> int:
"""
Computes logarithmic delay. The higher the speed is, the more should be the increase of
the speed to make it more noticeable, the slower it is the less should be the increase
to allow for fine control.
with speed changing [1, 100] (the higher the faster) delay is inverse proportional to the
speed and can be calculated as:
:param speed:
:return:
"""
@click.command()
@click.option("--mode", default="led", help="Mode to launch in (console/pygame/led)")
@click.option("--num", default=96, help="Number of leds")
@click.option("--algo", default="StarryNight", help="Name of the initial algorithm")
@click.option("--delay", default=100, help="Delay, ms")
@click.option("--apikey", default="", help="Telegram API key")
def run(mode: str, num: int, algo: str, delay: int, apikey: str):
logging.info("Starting main thread with mode %s", mode)
line: LedLine = None
if mode == "pygame":
from PyGameLedLine import PyGameLedLine
import pygame
pygame.init()
# Initializing surface
surface = pygame.display.set_mode((1500, 50))
surface.fill((0, 0, 0))
line = PyGameLedLine(num, surface)
elif mode == "led":
from PhysicalLedLine import Ws2801LedLine
line = Ws2801LedLine(num)
elif mode == "console":
from ConsoleLedLine import ConsoleLedLine
line = ConsoleLedLine(num)
algo = algos.CreateAlgo(algo, line)
q = queue.Queue(1)
control_thread = WebControlThread(q, delay)
control_thread.start()
while True:
if not q.empty():
event : ControlMessage = q.get_nowait()
if event.type() == ControlMessage.MessageType.STOP:
logging.info("Received STOP message, exiting ... ")
sys.exit(0)
elif event.type() == ControlMessage.MessageType.CHANGE_ALGO:
logging.info("Changing algo to %s", event.data())
line.ClearLine()
algo = algos.CreateAlgo(event.data(), line)
elif event.type() == ControlMessage.MessageType.CHANGE_DELAY:
logging.info("Changing delay to %d", event.data())
delay = event.data()
elif event.type() == ControlMessage.MessageType.CHANGE_BRIGHTNESS:
logging.info("Changing brightness to %f", event.data())
line.SetBrightness(event.data())
line.PreUpdate()
algo.update()
line.DisplayLine()
line.PostUpdate()
sleep(mode, delay)
if __name__ == '__main__':
run()