forked from adafruit/Adafruit_CircuitPython_BLE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.py
executable file
·125 lines (103 loc) · 3.76 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# SPDX-FileCopyrightText: 2019 Scott Shawcroft for Adafruit Industries
#
# SPDX-License-Identifier: MIT
"""
`stream`
====================================================
This module provides stream characteristics that bind readable or writable objects to the Service
object they are on.
"""
from __future__ import annotations
import _bleio
from . import Attribute
from . import Characteristic
from . import ComplexCharacteristic
try:
from typing import Optional, Union, TYPE_CHECKING
if TYPE_CHECKING:
from circuitpython_typing import ReadableBuffer
from adafruit_ble.characteristics import Characteristic
from adafruit_ble.uuid import UUID
from adafruit_ble.services import Service
except ImportError:
pass
__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_BLE.git"
class BoundWriteStream:
"""Writes data out to the peer."""
def __init__(self, bound_characteristic: Characteristic) -> None:
self.bound_characteristic = bound_characteristic
def write(self, buf: ReadableBuffer) -> None:
"""Write data from buf out to the peer."""
# We can only write 20 bytes at a time.
offset = 0
while offset < len(buf):
self.bound_characteristic.value = buf[offset : offset + 20]
offset += 20
class StreamOut(ComplexCharacteristic):
"""Output stream from the Service server."""
def __init__(
self,
*,
uuid: Optional[UUID] = None,
timeout: float = 1.0,
buffer_size: int = 64,
properties: int = Characteristic.NOTIFY,
read_perm: int = Attribute.OPEN,
write_perm: int = Attribute.OPEN,
) -> None:
self._timeout = timeout
self._buffer_size = buffer_size
super().__init__(
uuid=uuid,
properties=properties,
read_perm=read_perm,
write_perm=write_perm,
max_length=buffer_size,
)
def bind(
self, service: Service
) -> Union[_bleio.CharacteristicBuffer, BoundWriteStream]:
"""Binds the characteristic to the given Service."""
bound_characteristic = super().bind(service)
# If we're given a remote service then we're the client and need to buffer in.
if service.remote:
bound_characteristic.set_cccd(notify=True)
return _bleio.CharacteristicBuffer(
bound_characteristic,
timeout=self._timeout,
buffer_size=self._buffer_size,
)
return BoundWriteStream(bound_characteristic)
class StreamIn(ComplexCharacteristic):
"""Input stream into the Service server."""
def __init__(
self,
*,
uuid: Optional[UUID] = None,
timeout: float = 1.0,
buffer_size: int = 64,
properties: int = (Characteristic.WRITE | Characteristic.WRITE_NO_RESPONSE),
write_perm: int = Attribute.OPEN,
) -> None:
self._timeout = timeout
self._buffer_size = buffer_size
super().__init__(
uuid=uuid,
properties=properties,
read_perm=Attribute.NO_ACCESS,
write_perm=write_perm,
max_length=buffer_size,
)
def bind(
self, service: Service
) -> Union[_bleio.CharacteristicBuffer, BoundWriteStream]:
"""Binds the characteristic to the given Service."""
bound_characteristic = super().bind(service)
# If the service is remote need to write out.
if service.remote:
return BoundWriteStream(bound_characteristic)
# We're the server so buffer incoming writes.
return _bleio.CharacteristicBuffer(
bound_characteristic, timeout=self._timeout, buffer_size=self._buffer_size
)