forked from adafruit/Adafruit_CircuitPython_BLE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnordic.py
executable file
·118 lines (94 loc) · 3.61 KB
/
nordic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# SPDX-FileCopyrightText: 2019 Dan Halbert for Adafruit Industries
# SPDX-FileCopyrightText: 2019 Scott Shawcroft for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`nordic`
====================================================
This module provides Services used by Nordic Semiconductors.
"""
from __future__ import annotations
from . import Service
from ..uuid import VendorUUID
from ..characteristics.stream import StreamOut, StreamIn
try:
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from circuitpython_typing import WriteableBuffer, ReadableBuffer
import _bleio
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git"
class UARTService(Service):
"""
Provide UART-like functionality via the Nordic NUS service.
See ``examples/ble_uart_echo_test.py`` for a usage example.
"""
# pylint: disable=no-member
uuid = VendorUUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_server_tx = StreamOut(
uuid=VendorUUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E"),
timeout=1.0,
# 512 is the largest negotiated MTU-3 value for all CircuitPython ports.
buffer_size=512,
)
_server_rx = StreamIn(
uuid=VendorUUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E"),
timeout=1.0,
# 512 is the largest negotiated MTU-3 value for all CircuitPython ports.
buffer_size=512,
)
def __init__(self, service: Optional[_bleio.Service] = None) -> None:
super().__init__(service=service)
self.connectable = True
if not service:
self._rx = self._server_rx
self._tx = self._server_tx
else:
# If we're a client then swap the characteristics we use.
self._tx = self._server_rx
self._rx = self._server_tx
def deinit(self):
"""The characteristic buffers must be deinitialized when no longer needed.
Otherwise they will leak storage.
"""
for obj in (self._tx, self._rx):
if hasattr(obj, "deinit"):
obj.deinit()
def read(self, nbytes: Optional[int] = None) -> Optional[bytes]:
"""
Read characters. If ``nbytes`` is specified then read at most that many bytes.
Otherwise, read everything that arrives until the connection times out.
Providing the number of bytes expected is highly recommended because it will be faster.
:return: Data read
:rtype: bytes or None
"""
return self._rx.read(nbytes)
def readinto(
self, buf: WriteableBuffer, nbytes: Optional[int] = None
) -> Optional[int]:
"""
Read bytes into the ``buf``. If ``nbytes`` is specified then read at most
that many bytes. Otherwise, read at most ``len(buf)`` bytes.
:return: number of bytes read and stored into ``buf``
:rtype: int or None (on a non-blocking error)
"""
return self._rx.readinto(buf, nbytes)
def readline(self) -> Optional[bytes]:
"""
Read a line, ending in a newline character.
:return: the line read
:rtype: bytes or None
"""
return self._rx.readline()
@property
def in_waiting(self) -> int:
"""The number of bytes in the input buffer, available to be read."""
return self._rx.in_waiting
def reset_input_buffer(self) -> None:
"""Discard any unread characters in the input buffer."""
self._rx.reset_input_buffer()
def write(self, buf: ReadableBuffer) -> None:
"""Write a buffer of bytes."""
self._tx.write(buf)