-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBLESerialUI.py
2189 lines (1865 loc) · 102 KB
/
BLESerialUI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
#############################################################################################
# BLE Serial Communication GUI
# ============================
#
# A simple BLE terminal application using Qt and Bleak library to communicate
# with a BLE device through Nordic UART Service.
#
#
# This program will give you option to scan for BLE device, connect to a device, and pair
# with the device. It will allow you to send text to the BLE device and receive text.
#############################################################################################
# Need to rewrite code. I can not run bluetoothctl wrapper with Bleak simultaneously.
# I need to start it up each time I use trust/distrust, pair/remove, status.
# Basic libraries
import sys
import os
import re
import logging
import time
import warnings
import platform
warnings.filterwarnings("ignore", category=DeprecationWarning)
# Other standard libraries
from datetime import datetime
from types import SimpleNamespace
from markdown import markdown
# Qt library
try:
from PyQt6 import QtCore, QtWidgets, QtGui, uic
from PyQt6.QtCore import (
QObject, QProcess, pyqtSignal, pyqtSlot,
QTimer, QMutex, QMutexLocker, QThread,
QEventLoop, Qt, QStandardPaths
)
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QPushButton, QLabel,
QTextEdit, QVBoxLayout, QWidget, QComboBox, QHBoxLayout, QSizePolicy,
QFileDialog, QShortcut, QLineEdit, QSlider, QMessageBox, QDialog, QTabWidget
)
from PyQt6.QtGui import QIcon, QKeySequence, QTextCursor, QPalette, QColor
PYQT6 = True
except ImportError:
from PyQt5 import QtCore, QtWidgets, QtGui, uic
from PyQt5.QtCore import (
QObject, QProcess, pyqtSignal, pyqtSlot,
QTimer, QMutex, QMutexLocker, QThread,
QEventLoop, Qt, QStandardPaths
)
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QPushButton, QLabel,
QTextEdit, QVBoxLayout, QWidget, QComboBox, QHBoxLayout, QSizePolicy,
QFileDialog, QShortcut, QLineEdit, QSlider, QMessageBox, QDialog, QTabWidget
)
from PyQt5.QtGui import QIcon, QKeySequence, QTextCursor, QPalette, QColor
PYQT6 = False
# IO event loop
import asyncio
import threading # for asyncio in a separate thread
# Bluetooth library
from bleak import BleakClient, BleakScanner, BleakError
from bleak.backends.device import BLEDevice
# Custom Helper Classes
# ---------------------
# bluetoothctl program wrapper
from helpers.Qbluetoothctl_helper import BluetoothctlWrapper
# Bluetooth Helper
# Once this program works I will put the QBLESerialUI and the QBLESerial class into a helper file:
# from helpers.QBLE_helper import QBLESerialUI, QBLESerial,
# Codec Helper
from helpers.Codec_helper import BinaryStreamProcessor, ArduinoTextStreamProcessor
#
# BinaryStreamProcessor(eop=b'\x00', logger = None)
# process(new_data: bytes) -> List[Dict]
#
# ArduinoTextStreamProcessor(eol=b'\n', encoding='utf-8', logger=None)
# process(new_data: bytes, labels: bool = True) -> List[Dict]:
#
# results.append({
# "datatype": data_type,
# "name": self.name.get(data_type, f"Unknown_{data_type}"),
# "data": numbers,
# "timestamp": time.time(), # Add a timestamp
# })
#
# numbers can be list of floats for ArduinoTextStreamProcessor
# numbers can be byte, int8, unit8, int16, uint16, int32, uint32, float, double, list of strings, numpy arrays, for BinaryStreamProcessor
# Deal with high resolution displays
if not PYQT6:
# Deal with high resolution displays
if hasattr(QtCore.Qt.ApplicationAttribute, "AA_EnableHighDpiScaling"):
QtWidgets.QApplication.setAttribute(QtCore.Qt.ApplicationAttribute.AA_EnableHighDpiScaling, True)
if hasattr(QtCore.Qt.ApplicationAttribute, "AA_UseHighDpiPixmaps"):
QtWidgets.QApplication.setAttribute(QtCore.Qt.ApplicationAttribute.AA_UseHighDpiPixmaps, True)
##########################################################################################################################################
##########################################################################################################################################
#
# QBLESerial interaction with Graphical User Interface
#
# This section contains routines that can not be moved to a separate thread
# because it interacts with the QT User Interface.
# The BLE Worker is in a separate thread and receives data through signals from this class
#
# Receiving from BLE device is bytes or a list of bytes
# Sending to BLE device is bytes or list of bytes
# We need to encode/decode received/sent text in QBLESerialUI
#
# This is the Controller (Presenter) of the Model - View - Controller (MVC) architecture.
#
##########################################################################################################################################
##########################################################################################################################################
class QBLESerialUI(QObject):
"""
Object providing functionality between User Interface and BLE Serial Worker.
This interface must run in the main thread and interacts with user.
Signals (to be emitted by UI abd picked up by BLE Worker)
scanDevicesRequest request that QBLESerial is scanning for devices
connectDeviceRequest request that QBLESerial is connecting to device
disconnectDeviceRequest request that QBLESerial is disconnecting from device
pairDeviceRequest request that QBLESerial is paring bluetooth device
removeDeviceRequest request that QBLESerial is removing bluetooth device
changeLineTerminationRequest request that QBLESerial is using difference line termination
sendFileRequest request that file is sent over BLE
sendTextRequest request that provided text is transmitted over BLE
sendLineRequest request that provided line of text is transmitted over BLE
sendLinesRequest request that provided lines of text are transmitted over BLE
statusRequest request that QBLESerial reports current status
setupTransceiverRequest request that bluetoothctl interface and throughput timer is created
setupBLEWorkerRequest request that asyncio event loop is created and bluetoothctrl wrapper is started
stopTransceiverRequest (not used) request that bluetoothctl and throughput timer are stopped
finishWorkerRequest request that QBLESerial worker is finished
Slots (functions available to respond to external signals or events from buttons, input fields, etc.)
on_pushButton_Send send file over BLE
on_pushButton_Clear clear the BLE text display window
on_pushButton_Start start/stop BLE transceiver
on_pushButton_Save save text from display window into text file
on_pushButton_Scan update BLE device list
on_pushButton_Connect open/close BLE device
on_pushButton_Pair pair or remove BLE device
on_pushButton_Trust trust or distrust BLE device
on_pushButton_Status request BLE device status
on_comboBoxDropDown_BLEDevices user selected a new BLE device from the drop down list
on_comboBoxDropDown_LineTermination user selected a different line termination from drop down menu
on_upArrowPressed recall previous line of text from BLE console line buffer
on_downArrowPressed recall next line of text from BLE console line buffer
on_carriageReturnPressed transmit text from UI to BLE transceiver
on_statusReady pickup BLE device status
on_deviceListReady pickup new list of devices
on_receivedData pickup text from BLE transceiver
on_receivedLines pickup lines of text from BLE transceiver
on_throughputReady pickup throughput data from BLE transceiver
on_pairingSuccess pickup wether device pairing was successful
on_removalSuccess pickup wether device removal was successful
on_logSignal pickup log messages
"""
# Constants
########################################################################################
MAX_TEXTBROWSER_LENGTH = 1024 * 1024 # display window character length is trimmed to this length
# lesser value results in better performance
MAX_LINE_LENGTH = 1024 # number of characters after which an end of line characters is expected
NUM_LINES_COLLATE = 10 # [lines] estimated number of lines to collate before emitting signal
# this results in collating about NUM_LINES_COLLATE * 48 bytes in a list of lines
# plotting and processing large amounts of data is more efficient for display and plotting
TARGET_DEVICE_NAME = "MediBrick_BLE" # The name of the BLE device to search for
BLEPIN = 123456 # Known pairing pin for Medibrick_BLE
# Remove ANSI escape sequences
ANSI_ESCAPE = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# Signals
########################################################################################
scanDevicesRequest = pyqtSignal() # scan for BLE devices
connectDeviceRequest = pyqtSignal(BLEDevice, int, bool) # connect to BLE device, mac, timeout,
disconnectDeviceRequest = pyqtSignal() # disconnect from BLE device
pairDeviceRequest = pyqtSignal(str,str) # pair with BLE device mac and pin
removeDeviceRequest = pyqtSignal(str) # remove BLE device from systems paired list
trustDeviceRequest = pyqtSignal(str) # trust a device
distrustDeviceRequest = pyqtSignal(str) # distrust a device
changeLineTerminationRequest = pyqtSignal(bytes) # request line termination to change
sendTextRequest = pyqtSignal(bytes) # request to transmit text
sendLineRequest = pyqtSignal(bytes) # request to transmit one line of text to TX
sendLinesRequest = pyqtSignal(list) # request to transmit lines of text to TX
sendFileRequest = pyqtSignal(str) # request to open file and send with transceiver
statusRequest = pyqtSignal(str) # request BLE device status
setupTransceiverRequest = pyqtSignal() # start transceiver
setupBLEWorkerRequest = pyqtSignal() # request that QBLESerial worker is setup
stopTransceiverRequest = pyqtSignal() # stop transceiver (display of incoming text, connection remains)
finishWorkerRequest = pyqtSignal() # request worker to finish
setupBLEWorkerFinished = pyqtSignal() # QBLESerial worker setup is finished
setupTransceiverFinished = pyqtSignal() # transceiver setup is finished
workerFinished = pyqtSignal() # QBLESerialUI is finished
def __init__(self, parent=None, ui=None, worker=None, logger=None):
"""
Need to provide the user interface and worker
Start the timers for text display and log display trimming
"""
super(QBLESerialUI, self).__init__(parent)
# state variables, populated by service routines
self.device = "" # BLE device
self.device_info = {} # BLE device status
self.bleSendHistory = [] # previously sent text (e.g. commands)
self.bleSendHistoryIndx = -1 # init history
self.rx = 0 # init throughput
self.tx = 0 # init throughput
self.textLineTerminator = b"" # default line termination: none
self.encoding = "utf-8" # default encoding
self.isLogScrolling = False # keep track of log display scrolling
self.isTextScrolling = False # keep track of text display scrolling
self.device_backup = "" # keep track of previously connected device
self.transceiverIsRunning = False # BLE transceiver is not running
self.instance_name = self.objectName() if self.objectName() else self.__class__.__name__
if logger is None:
self.logger = logging.getLogger("QBLE_UI")
else:
self.logger = logger
if ui is None:
self.handle_log(logging.ERROR, f"[{self.instance_name}] This applications needs to have access to User Interface")
raise ValueError("User Interface (ui) is required but was not provided.")
else:
self.ui = ui
if worker is None:
self.handle_log(logging.ERROR, f"[{self.instance_name}] This applications needs to have access to BLE Worker")
raise ValueError("BLE Worker (worker) is required but was not provided.")
else:
self.worker = worker
# Limit the amount of text retained in the text display window
self.textTrimTimer = QTimer(self)
self.textTrimTimer.timeout.connect(self.on_bleTextDisplay_trim)
self.textTrimTimer.start(10000) # Trigger every 10 seconds, this halts the display for a fraction of second, so dont do it often
# Limit the amount of text retained in the log display window
# execute a text trim function every minute
self.logTrimTimer = QTimer(self)
self.logTrimTimer.timeout.connect(self.on_bleLogDisplay_trim)
self.logTrimTimer.start(100000) # Trigger every 10 seconds, this halts the display for a fraction of second, so dont do it often
self.handle_log(logging.INFO,"QSerialUI initialized.")
########################################################################################
# Helper functions
########################################################################################
def handle_log(self, level, message):
if level == logging.INFO:
self.logger.info(message)
elif level == logging.WARNING:
self.logger.warning(message)
elif level == logging.ERROR:
self.logger.error(message)
elif level == logging.DEBUG:
self.logger.debug(message)
elif level == logging.CRITICAL:
self.logger.critical(message)
else:
self.handle_log(level, message)
self.append_log(message, add_newline=True)
def append_log(self, text, add_newline=False):
"""Appends log text to the output area."""
text = self.ANSI_ESCAPE.sub('', text)
try:
if self.ui.logScrollbar.value() >= self.ui.logScrollbar.maximum() - 20:
self.isLogScrolling = True
else:
self.isLogScrolling = False
if PYQT6:
self.ui.logCursor.movePosition(QTextCursor.MoveOperation.End)
else:
self.ui.logCursor.movePosition(QTextCursor.End)
if add_newline:
self.ui.logCursor.insertText(text + "\n")
else:
self.ui.logCursor.insertText(text)
if self.isLogScrolling:
self.ui.plainTextEdit_Log.ensureCursorVisible()
except Exception as e:
self.handle_log(logging.ERROR, f"[{self.instance_name}] could not display text in {repr(text)}. Error {str(e)}")
def append_text(self, text, add_newline=False):
"""Appends text to the BLE output area."""
self.handle_log(logging.DEBUG, "text received: {text}")
try:
if self.ui.textScrollbar.value() >= self.ui.textScrollbar.maximum() - 20:
self.isTextScrolling = True
else:
self.isTextScrolling = False
if PYQT6:
self.ui.textCursor.movePosition(QTextCursor.MoveOperation.End)
else:
self.ui.textCursor.movePosition(QTextCursor.End)
if add_newline:
self.ui.textCursor.insertText(text+"\n")
else:
self.ui.textCursor.insertText(text)
if self.isTextScrolling:
self.ui.plainTextEdit_Text.ensureCursorVisible()
except Exception as e:
self.handle_log(logging.ERROR,f"could not display text in {repr(text)}. Error {str(e)}")
def _safely_cleanconnect(signal, slot, previous_slot: None):
try:
if previous_slot is None:
signal.disconnect()
else:
signal.disconnect(previous_slot)
except TypeError:
pass
try:
signal.connect(slot)
except TypeError:
pass
def _safely_connect(signal, slot):
try:
signal.connect(slot)
except TypeError:
pass
def _safely_disconnect(signal, slot):
try:
signal.disconnect(slot)
except TypeError:
pass
########################################################################################
# Slots
########################################################################################
@pyqtSlot()
def on_bleTextDisplay_trim(self):
"""
Reduce the amount of text kept in the text display window
Attempt to keep the scrollbar location
"""
# Where is the scrollbar indicator?
scrollbarMax = self.ui.textScrollbar.maximum()
if scrollbarMax != 0:
proportion = self.ui.textScrollbar.value() / scrollbarMax
else:
proportion = 1.0
# How much do we need to trim?
len_current_text = self.ui.plainTextEdit_Text.document().characterCount()
numCharstoTrim = len_current_text - self.MAX_TEXTBROWSER_LENGTH
if numCharstoTrim > 0:
# Select the text to remove
self.ui.textCursor.setPosition(0)
if PYQT6:
self.ui.textCursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.KeepAnchor,numCharstoTrim)
else:
self.ui.textCursor.movePosition(QTextCursor.Right,QTextCursor.KeepAnchor,numCharstoTrim)
# Remove the selected text
self.ui.textCursor.removeSelectedText()
if PYQT6:
self.ui.textCursor.movePosition(QTextCursor.MoveOperation.End)
else:
self.ui.textCursor.movePosition(QTextCursor.End)
# update scrollbar position
new_max = self.ui.textScrollbar.maximum()
new_value = round(proportion * new_max)
self.ui.textScrollbar.setValue(new_value)
# ensure that text is scrolling when we set cursor towards the end
if new_value >= new_max - 20:
self.ui.plainTextEdit_Text.ensureCursorVisible()
self.handle_log(logging.INFO, f"[{self.instance_name}] Text Display Trimmed.")
@pyqtSlot()
def on_bleLogDisplay_trim(self):
"""
Reduce the amount of text kept in the log display window
Attempt to keep the scrollbar location
"""
# Where is the scrollbar?
scrollbarMax = self.ui.logScrollbar.maximum()
if scrollbarMax != 0:
proportion = self.ui.logScrollbar.value() / scrollbarMax
else:
proportion = 1.0
# How much do we need to trim?
len_current_text = self.ui.plainTextEdit_Log.document().characterCount()
numCharstoTrim = len_current_text - self.MAX_TEXTBROWSER_LENGTH
if numCharstoTrim > 0:
# Select the text to remove
self.ui.textCursor.setPosition(0)
if PYQT6:
self.ui.logCursor.movePosition(QTextCursor.MoveOperation.Right, QTextCursor.MoveMode.KeepAnchor,numCharstoTrim)
else:
self.ui.logCursor.movePosition(QTextCursor.Right,QTextCursor.KeepAnchor,numCharstoTrim)
# Remove the selected text
self.ui.logCursor.removeSelectedText()
if PYQT6:
self.ui.logCursor.movePosition(QTextCursor.MoveOperation.End)
else:
self.ui.logCursor.movePosition(QTextCursor.End)
# update scrollbar position
new_max = self.ui.logScrollbar.maximum()
new_value = round(proportion * new_max)
self.ui.logScrollbar.setValue(new_value)
# ensure that text is scrolling when we set cursor towards the end
if new_value >= new_max - 20:
self.ui.plainTextEdit_Log.ensureCursorVisible()
self.handle_log(logging.INFO, f"[{self.instance_name}] Log Display Trimmed.")
@pyqtSlot()
def on_carriageReturnPressed(self):
"""
Transmitting text from UI to BLE transceiver
"""
text = self.ui.lineEdit_Text.text() # obtain text from send input window
self.bleSendHistory.append(text) # keep history of previously sent commands
self.bleSendHistoryIndx = -1 # reset history pointer
if not self.transceiverIsRunning:
# Remove connections and cleanly connect signals
self._safely_cleanconnect(self.worker.receivedLines, self.on_ReceivedLines)
self._safely_cleanconnect(self.worker.receivedData, self.on_ReceivedText)
# Update state and UI
self.transceiverIsRunning = True
self.ui.pushButton_Start.setText("Stop")
text_bytearray = text.encode(self.encoding) + self.textLineTerminator # add line termination
self.sendTextRequest.emit(text_bytearray) # send text to BLE TX line
self.ui.lineEdit_Text.clear() # clear send input window
self.handle_log(logging.INFO,"Text sent.")
@pyqtSlot()
def on_pushButton_Send(self):
"""Request to send a file over BLE."""
stdFileName = ( QStandardPaths.writableLocation(
QStandardPaths.StandardLocation.DocumentsLocation) + "/upload.txt"
)
fname, _ = QFileDialog.getOpenFileName(
self.ui, "Open", stdFileName, "Text files (*.txt)"
)
if fname:
self.sendFileRequest.emit(fname)
self.handle_log(logging.INFO, 'Text file send request completed.')
@pyqtSlot()
def on_upArrowPressed(self):
"""
Handle special keys on lineEdit: UpArrow
"""
self.bleSendHistoryIndx += 1 # increment history pointer
# if pointer at end of buffer restart at -1
if self.bleSendHistoryIndx == len(self.bleSendHistory):
self.bleSendHistoryIndx = -1
# populate with previously sent command from history buffer
if self.bleSendHistoryIndx == -1:
# if index is -1, use empty string as previously sent command
self.ui.lineEdit_Text.setText("")
else:
self.ui.lineEdit_Text.setText(
self.bleSendHistory[self.bleSendHistoryIndx]
)
self.handle_log(logging.INFO,"Previously sent text retrieved.")
@pyqtSlot()
def on_downArrowPressed(self):
"""
Handle special keys on lineEdit: DownArrow
"""
self.bleSendHistoryIndx -= 1 # decrement history pointer
# if pointer is at start of buffer, reset index to end of buffer
if self.bleSendHistoryIndx == -2:
self.bleSendHistoryIndx = len(self.bleSendHistory) - 1
# populate with previously sent command from history buffer
if self.bleSendHistoryIndx == -1:
# if index is -1, use empty string as previously sent command
self.ui.lineEdit_Text.setText("")
else:
self.ui.lineEdit_Text.setText(
self.bleSendHistory[self.bleSendHistoryIndx]
)
self.handle_log(logging.INFO, f"[{self.instance_name}] Previously sent text retrieved.")
def on_ReceivedLines(self, lines):
"""Received lines"""
for line in lines:
self.append_text(line, add_newline=True)
def on_ReceivedText(self, text):
"""Received text"""
self.append_text(text, add_newline=False)
@pyqtSlot()
def on_pushButton_Clear(self):
"""
Clearing text display window
"""
self.ui.plainTextEdit_Text.clear()
self.ui.plainTextEdit_Log.clear()
self.handle_log(logging.INFO, f"[{self.instance_name}] Text and Log display cleared.")
@pyqtSlot()
def on_pushButton_Start(self):
"""
Start BLE receiver
This does not start or stop Transceiver, it just connects, disconnects signals
"""
if self.ui.pushButton_Start.text() == "Start":
# Start text display
self.ui.pushButton_Start.setText("Stop")
self._safely_connect(self.worker.receivedLines, self.on_ReceivedLines)
self._safely_connect(self.worker.receivedData, self.on_ReceivedText)
self.transceiverIsRunning = True
self.handle_log(logging.DEBUG, "text display is on.")
else:
# End text display
self.ui.pushButton_Start.setText("Start")
self._safely_disconnect(self.worker.receivedLines, self.on_ReceivedLines)
self._safely_disconnect(self.worker.receivedData, self.on_ReceivedText)
self.transceiverIsRunning = False
self.handle_log(logging.DEBUG, "text display is off.")
@pyqtSlot()
def on_pushButton_Save(self):
"""
Saving text from display window into text file
"""
stdFileName = (
QStandardPaths.writableLocation(QStandardPaths.DocumentsLocation)
+ "/QBLE.txt"
)
fname, _ = QFileDialog.getSaveFileName(
self.ui, "Save as", stdFileName, "Text files (*.txt)"
)
if fname:
if not fname.endswith(".txt"):
fname += ".txt"
with open(fname, "w") as f:
f.write(self.ui.plainTextEdit_Text.toPlainText())
self.handle_log(logging.INFO,"Text saved.")
@pyqtSlot()
def on_pushButton_Scan(self):
"""
Update BLE device list
"""
self.scanDevicesRequest.emit()
self.ui.pushButton_Scan.setEnabled(False)
self.ui.pushButton_Connect.setEnabled(False)
self.handle_log(logging.INFO, f"[{self.instance_name}] BLE device list update requested.")
@pyqtSlot()
def on_pushButton_Connect(self):
"""
Handle connect/disconnect requests.
"""
if self.ui.pushButton_Connect.text() == "Connect":
if self.device:
self.connectDeviceRequest.emit(self.device, 10, False)
self.handle_log(logging.INFO, f"[{self.instance_name}] Attempting to connect to device.")
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device selected for connection.")
elif self.ui.pushButton_Connect.text() == "Disconnect":
if self.device:
self.disconnectDeviceRequest.emit()
self.handle_log(logging.INFO, f"[{self.instance_name}] Attempting to disconnect from device.")
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device selected for disconnection.")
else:
self.handle_log(logging.ERROR, f"[{self.instance_name}] User interface Connect button is labeled incorrectly.")
@pyqtSlot()
def on_pushButton_Pair(self):
"""Trigger pairing with a device when the pair button is clicked."""
if self.ui.pushButton_Pair.text() == "Pair":
if self.device is not None:
self.pairDeviceRequest.emit(self.device.address, self.BLEPIN)
self.handle_log(logging.INFO, f"[{self.instance_name}] Paired with {self.TARGET_DEVICE_NAME}")
self.ui.pushButton_Pair.setText("Remove")
self._safely_disconnect(self.ui.pushButton_Pair.clicked, self.worker.on_pairDeviceRequest)
self._safely_connect(self.ui.pushButton_Pair.clicked, self.worker.on_removeDeviceRequest)
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device set to pair")
elif self.ui.pushButton_Pair.text() == "Remove":
if self.device is not None:
self.removeDeviceRequest.emit(self.device.address)
self.handle_log(logging.INFO, f"[{self.instance_name}] {self.TARGET_DEVICE_NAME} removed")
self.ui.pushButton_Pair.setText("Pair")
self._safely_disconnect(self.ui.pushButton_Pair.clicked, self.worker.on_removeDeviceRequest)
self._safely_connect(self.ui.pushButton_Pair.clicked, self.worker.on_pairDeviceRequest)
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device set to pair")
else:
self.handle_log(logging.ERROR, f"[{self.instance_name}] User interface Pair button is labeled incorrectly.")
@pyqtSlot()
def on_pushButton_Trust(self):
"""Trigger trusting with a device when the trust button is clicked."""
if self.ui.pushButton_Trust.text() == "Trust":
if self.device is not None:
self.trustDeviceRequest.emit(self.device.address)
self.handle_log(logging.INFO, f"[{self.instance_name}] Trusted {self.TARGET_DEVICE_NAME}")
self.ui.pushButton_Trust.setText("Distrust")
self._safely_disconnect(self.ui.pushButton_Trust.clicked, self.worker.on_trustDeviceRequest)
self._safely_connect(self.ui.pushButton_Trust.clicked, self.worker.on_distrustDeviceRequest)
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device set to trust")
elif self.ui.pushButton_Trust.text() == "Distrust":
if self.device is not None:
self.distrustDeviceRequest.emit(self.device.address)
self.handle_log(logging.INFO, f"[{self.instance_name}] {self.TARGET_DEVICE_NAME} distrusted")
self.ui.pushButton_Trust.setText("Trust")
self._safely_disconnect(self.ui.pushButton_Trust.clicked, self.worker.on_distrustDeviceRequest)
self._safely_connect(self.ui.pushButton_Trust.clicked, self.worker.on_trustDeviceRequest)
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No device set to trust")
else:
self.handle_log(logging.ERROR, f"[{self.instance_name}] User interface Trust button is labeled incorrectly.")
@pyqtSlot()
def on_pushButton_Status(self):
if self.device is not None:
self.statusRequest.emit(self.device.address)
@pyqtSlot()
def on_comboBoxDropDown_BLEDevices(self):
"user selected a different BLE device from the drop down list"
# disconnect current device
self.disconnectDeviceRequest.emit()
# prepare UI for new selection
index=self.ui.comboBoxDropDown_Device.currentIndex()
if index >= 0:
self.device = self.ui.comboBoxDropDown_Device.itemData(index) # BLE device from BLEAK scanner
self.handle_log(logging.INFO, f"[{self.instance_name}] Selected device: {self.device.name}, Address: {self.device.address}")
self.ui.pushButton_Connect.setEnabled(True) # will want to connect
if self.hasBluetoothctl: self.ui.pushButton_Pair.setEnabled(True) # uses bluetoothctl
if self.hasBluetoothctl: self.ui.pushButton_Trust.setEnabled(True) # uses bluetoothctl
if self.hasBluetoothctl: self.ui.pushButton_Status.setEnabled(True) # uses bluetoothctl
self.ui.pushButton_Send.setEnabled(False) # its not yet connected
self.ui.pushButton_Pair.setText("Pair")
self.ui.pushButton_Connect.setText("Connect")
self.ui.pushButton_Trust.setText("Trust")
else:
self.handle_log(logging.WARNING, f"[{self.instance_name}] No devices found")
self.ui.pushButton_Connect.setEnabled(False)
if self.hasBluetoothctl: self.ui.pushButton_Pair.setEnabled(False)
if self.hasBluetoothctl: self.ui.pushButton_Trust.setEnabled(False)
if self.hasBluetoothctl: self.ui.pushButton_Status.setEnabled(False)
self.ui.pushButton_Send.setEnabled(False)
self.ui.pushButton_Scan.setEnabled(True)
@pyqtSlot()
def on_comboBoxDropDown_LineTermination(self):
"""
User selected a different line termination from drop down menu
"""
_tmp = self.ui.comboBoxDropDown_LineTermination.currentText()
if _tmp == "newline (\\n)": self.textLineTerminator = b"\n"
elif _tmp == "return (\\r)": self.textLineTerminator = b"\r"
elif _tmp == "newline return (\\n\\r)": self.textLineTerminator = b"\n\r"
elif _tmp == "none": self.textLineTerminator = b""
else: self.textLineTerminator = b"\r\n"
# ask line termination to be changed
self.changeLineTerminationRequest.emit(self.textLineTerminator)
self.handle_log(logging.INFO, f"[{self.instance_name}] line termination {repr(self.textLineTerminator)}")
@pyqtSlot()
def on_comboBoxDropDown_DataSeparator(self):
"""
User selected a different data separator from drop down menu
"""
_idx = self.ui.comboBoxDropDown_DataSeparator.currentIndex()
if _idx == 0: self.dataSeparator = 0
elif _idx == 1: self.dataSeparator = 1
elif _idx == 2: self.dataSeparator = 2
elif _idx == 3: self.dataSeparator = 3
else: self.dataSeparator = 0
self.handle_log(logging.INFO, f"[{self.instance_name}] data separator {repr(self.dataSeparator)}")
@pyqtSlot(dict)
def on_statusReady(self, status):
"""
pickup BLE device status
the status is:
device_info = {
"mac": None,
"name": None,
"paired": None,
"trusted": None,
"connected": None,
"rssi": None
}
"""
self.device_info = status
if (self.device_info["mac"] is not None) and (self.device_info["mac"] != ""):
if self.device_info["paired"]:
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Remove")
else:
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Pair")
if self.device_info["trusted"]:
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Distrust")
else:
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Trust")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device status: {status}")
@pyqtSlot(list)
def on_deviceListReady(self, devices:list):
"""pickup new list of devices"""
self.ui.pushButton_Scan.setEnabled(True) # re-enable device scan, was turned of during scanning
# save current selected device
currentIndex = self.ui.comboBoxDropDown_Device.currentIndex()
selectedDevice = self.ui.comboBoxDropDown_Device.itemData(currentIndex)
self.ui.comboBoxDropDown_Device.blockSignals(True)
self.ui.comboBoxDropDown_Device.clear()
for device in devices:
self.ui.comboBoxDropDown_Device.addItem(f"{device.name} ({device.address})", device)
# search for previous device and select it
if selectedDevice is not None:
for index in range(self.ui.comboBoxDropDown_Device.count()):
if self.ui.comboBoxDropDown_Device.itemData(index) == selectedDevice:
self.ui.comboBoxDropDown_Device.setCurrentIndex(index)
break
self.ui.comboBoxDropDown_Device.blockSignals(False)
if len(devices) > 0:
self.ui.pushButton_Connect.setEnabled(True)
self.handle_log(logging.INFO, f"[{self.instance_name}] Device list updated.")
@pyqtSlot(bytes)
def on_receivedData(self, data):
"""pickup text from BLE transceiver"""
self.append_text(data.decode(self.encoding), new_line=False)
# Handle data decoding
# No EOL, just emit raw data
# 0 None
# 1 No Labels (simple)
# 2 Labels [Label:]
# 3 Binary
if self.dataSeparator == 0:
# There is no data decoding wanted
results = []
elif self.dataSeparator == 1:
results = self.arduinoStream.process(data,labels=False)
elif self.dataSeparator == 2:
results = self.arduinoStream.process(data,labels=True)
elif self.dataSeparator == 3:
results = self.binaryStream.process(data)
else:
self.handle_log(logging.ERROR, f"[{self.instance_name}] Unknown data separator: {self.dataSeparator}")
results = []
# results.append({
# "datatype": data_type,
# "name": self.name.get(data_type, f"Unknown_{data_type}"),
# "data": numbers,
# "timestamp": time.time(), # Add a timestamp
# })
#
# numbers can be list of floats for ArduinoTextStreamProcessor
# numbers can be byte, int8, unit8, int16, uint16, int32, uint32, float, double, list of strings, numpy arrays, for BinaryStreamProcessor
for result in results:
data_type = result.get("datatype", "Unknown")
name = result.get("name", "Unknown Name")
data = result.get("data", "No Data")
timestamp = result.get("timestamp", "No Timestamp")
self.handle_log(
logging.DEBUG,
f"Result Processed - Type: {data_type}, Name: {name}, "
f"Data: {data}, "
f"Timestamp: {timestamp}"
)
@pyqtSlot(list)
def on_receivedLines(self, lines):
"""pickup lines of text from BLE transceiver"""
for line in lines:
self.append_text(line, add_newline=True)
@pyqtSlot(float,float)
def on_throughputReady(self, rx:float, tx:float):
"""pickup throughput data from BLE transceiver"""
self.rx = rx
self.tx = tx
self.ui.label_throughput.setText(f"Throughput: RX:{rx} TX:{tx} Bps")
@pyqtSlot(bool)
def on_connectingSuccess(self, success):
"""pickup wether device connection was successful"""
self.device_info["connected"] = success
if success:
self.ui.pushButton_Send.setEnabled(True)
self.ui.pushButton_Connect.setEnabled(True)
self.ui.pushButton_Connect.setText("Disconnect")
else:
self.ui.pushButton_Send.setEnabled(False)
self.ui.pushButton_Connect.setEnabled(True)
self.ui.pushButton_Connect.setText("Connect")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} connection: {'successful' if success else 'failed'}")
@pyqtSlot()
def on_disconnectingSuccess(self, success):
"""pickup wether device disconnection was successful"""
self.device_info["connected"] = not(success)
if success: # disconnecting
self.ui.pushButton_Send.setEnabled(False)
self.ui.pushButton_Connect.setEnabled(True)
self.ui.pushButton_Connect.setText("Connect")
else: # disconnecting failed
self.ui.pushButton_Connect.setEnabled(True)
self.ui.pushButton_Connect.setText("Disconnect")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} disconnection: {'successful' if success else 'failed'}")
@pyqtSlot(bool)
def on_pairingSuccess(self, success):
"""pickup wether device pairing was successful"""
self.device_info["paired"] = success
if success:
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Remove")
else:
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Pair")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} pairing: {'successful' if success else 'failed'}")
@pyqtSlot(bool)
def on_removalSuccess(self, success):
"""pickup wether device removal was successful"""
self.device_info["paired"] = not(success)
if success: # removing
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Pair")
else: # removing failed
self.ui.pushButton_Pair.setEnabled(True)
self.ui.pushButton_Pair.setText("Remove")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} removal: {'successful' if success else 'failed'}")
@pyqtSlot(bool)
def on_trustSuccess(self, success):
"""pickup wether device pairing was successful"""
self.device_info["trusted"] = success
if success:
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Distrust")
else:
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Trust")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} trusting: {'successful' if success else 'failed'}")
@pyqtSlot(bool)
def on_distrustSuccess(self, success):
"""pickup wether device removal was successful"""
self.device_info["trusted"] = not(success)
if success: # removing
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Trust")
else: # removing failed
self.ui.pushButton_Trust.setEnabled(True)
self.ui.pushButton_Trust.setText("Distrust")
self.handle_log(logging.INFO, f"[{self.instance_name}] Device {self.device.name} distrusting: {'successful' if success else 'failed'}")
@pyqtSlot(int,str)
def on_logSignal(self, int, str):
"""pickup log messages"""
self.handle_log(int, str)
def cleanup(self):
"""
Perform cleanup tasks for QBLESerialUI, such as stopping timers, disconnecting signals,
and ensuring proper worker shutdown.
"""
self.logger.info("Performing QBLESerialUI cleanup...")
# Stop timers
if self.textTrimTimer.isActive():
self.textTrimTimer.stop()
if self.logTrimTimer.isActive():
self.logTrimTimer.stop()
# Disconnect signals to avoid lingering connections
self.textTrimTimer.timeout.disconnect()
self.logTrimTimer.timeout.disconnect()
# Log cleanup completion
self.logger.info("QBLESerialUI cleanup completed.")
##########################################################################################################################################
##########################################################################################################################################
#
# Q BLE Serial
#
# separate thread handling BLE serial input and output
# these routines have no access to the user interface,
# communication occurs through signals
#
# for BLE device write we send bytes
# for BLE device read we receive bytes
# conversion from text to bytes occurs in QBLESerialUI
#
# This is the Model of the Model - View - Controller (MVC) architecture.
#
##########################################################################################################################################
##########################################################################################################################################
class QBLESerial(QObject):
"""
BLE Serial Worker for QT
Worker Signals