Skip to content

Commit

Permalink
work on bt
Browse files Browse the repository at this point in the history
  • Loading branch information
Rocky14683 committed May 22, 2024
1 parent 10b38af commit aec3435
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 38 deletions.
78 changes: 41 additions & 37 deletions pros/cli/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .common import *
from pros.ga.analytics import analytics


@pros_root
def upload_cli():
pass
Expand All @@ -20,28 +21,30 @@ def upload_cli():
@project_option(required=False, allow_none=True)
@click.option('--run-after/--no-run-after', 'run_after', default=None, help='Immediately run the uploaded program.',
cls=PROSDeprecated, replacement='after')
@click.option('--run-screen/--execute', 'run_screen', default=None, help='Display run program screen on the brain after upload.',
@click.option('--run-screen/--execute', 'run_screen', default=None,
help='Display run program screen on the brain after upload.',
cls=PROSDeprecated, replacement='after')
@click.option('-af', '--after', type=click.Choice(['run','screen','none']), default=None, help='Action to perform on the brain after upload.',
@click.option('-af', '--after', type=click.Choice(['run', 'screen', 'none']), default=None,
help='Action to perform on the brain after upload.',
cls=PROSOption, group='V5 Options')
@click.option('--quirk', type=int, default=0)
@click.option('--name', 'remote_name', type=str, default=None, required=False, help='Remote program name.',
cls=PROSOption, group='V5 Options')
@click.option('--slot', default=None, type=click.IntRange(min=1, max=8), help='Program slot on the GUI.',
cls=PROSOption, group='V5 Options')
@click.option('--icon', type=click.Choice(['pros','pizza','planet','alien','ufo','robot','clawbot','question','X','power']), default='pros',
@click.option('--icon', type=click.Choice(
['pros', 'pizza', 'planet', 'alien', 'ufo', 'robot', 'clawbot', 'question', 'X', 'power']), default='pros',
help="Change Program's icon on the V5 Brain", cls=PROSOption, group='V5 Options')
@click.option('--program-version', default=None, type=str, help='Specify version metadata for program.',
cls=PROSOption, group='V5 Options', hidden=True)
@click.option('--ini-config', type=click.Path(exists=True), default=None, help='Specify a program configuration file.',
cls=PROSOption, group='V5 Options', hidden=True)
@click.option('--compress-bin/--no-compress-bin', 'compress_bin', cls=PROSOption, group='V5 Options', default=True,
help='Compress the program binary before uploading.')
@click.option('--description', default="Made with PROS", type=str, cls=PROSOption, group='V5 Options',
@click.option('--description', default="Made with PROS", type=str, cls=PROSOption, group='V5 Options',
help='Change the description displayed for the program.')
@click.option('--name', default=None, type=str, cls=PROSOption, group='V5 Options',
@click.option('--name', default=None, type=str, cls=PROSOption, group='V5 Options',
help='Change the name of the program.')

@default_options
def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwargs):
"""
Expand All @@ -56,7 +59,8 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg
analytics.send("upload")
import pros.serial.devices.vex as vex
from pros.serial.ports import DirectPort
kwargs['ide_version'] = project.kernel if not project==None else "None"
from pros.serial.ports import BluetoothPort
kwargs['ide_version'] = project.kernel if not project == None else "None"
kwargs['ide'] = 'PROS'
if path is None or os.path.isdir(path):
if project is None:
Expand All @@ -70,15 +74,13 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg

# apply upload_options as a template
options = dict(**project.upload_options)
if 'port' in options and port is None:
port = options.get('port', None)
if 'slot' in options and kwargs.get('slot', None) is None:
kwargs.pop('slot')
elif kwargs.get('slot', None) is None:
kwargs['slot'] = 1
if 'icon' in options and kwargs.get('icon','pros') == 'pros':
if 'icon' in options and kwargs.get('icon', 'pros') == 'pros':
kwargs.pop('icon')
if 'after' in options and kwargs.get('after','screen') is None:
if 'after' in options and kwargs.get('after', 'screen') is None:
kwargs.pop('after')

options.update(kwargs)
Expand All @@ -89,16 +91,16 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg
if 'remote_name' not in kwargs:
kwargs['remote_name'] = project.name
name_to_file = {
'pros' : 'USER902x.bmp',
'pizza' : 'USER003x.bmp',
'planet' : 'USER013x.bmp',
'alien' : 'USER027x.bmp',
'ufo' : 'USER029x.bmp',
'clawbot' : 'USER010x.bmp',
'robot' : 'USER011x.bmp',
'question' : 'USER002x.bmp',
'power' : 'USER012x.bmp',
'X' : 'USER001x.bmp'
'pros': 'USER902x.bmp',
'pizza': 'USER003x.bmp',
'planet': 'USER013x.bmp',
'alien': 'USER027x.bmp',
'ufo': 'USER029x.bmp',
'clawbot': 'USER010x.bmp',
'robot': 'USER011x.bmp',
'question': 'USER002x.bmp',
'power': 'USER012x.bmp',
'X': 'USER001x.bmp'
}
kwargs['icon'] = name_to_file[kwargs['icon']]
if 'target' not in kwargs or kwargs['target'] is None:
Expand All @@ -111,28 +113,28 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg
else:
logger(__name__).debug(f"Invalid target provided: {kwargs['target']}")
logger(__name__).debug('Target should be one of ("v5" or "cortex").')
if not port:
raise dont_send(click.UsageError('No port provided or located. Make sure to specify --target if needed.'))
# if not port:
# raise dont_send(click.UsageError('No port provided or located. Make sure to specify --target if needed.'))
if kwargs['target'] == 'v5':
kwargs['remote_name'] = kwargs['name'] if kwargs.get("name",None) else kwargs['remote_name']
kwargs['remote_name'] = kwargs['name'] if kwargs.get("name", None) else kwargs['remote_name']
if kwargs['remote_name'] is None:
kwargs['remote_name'] = os.path.splitext(os.path.basename(path))[0]
kwargs['remote_name'] = kwargs['remote_name'].replace('@', '_')
kwargs['slot'] -= 1

action_to_kwarg = {
'run' : vex.V5Device.FTCompleteOptions.RUN_IMMEDIATELY,
'screen' : vex.V5Device.FTCompleteOptions.RUN_SCREEN,
'none' : vex.V5Device.FTCompleteOptions.DONT_RUN
}
'run': vex.V5Device.FTCompleteOptions.RUN_IMMEDIATELY,
'screen': vex.V5Device.FTCompleteOptions.RUN_SCREEN,
'none': vex.V5Device.FTCompleteOptions.DONT_RUN
}
after_upload_default = 'screen'
#Determine which FTCompleteOption to assign to run_after
if kwargs['after']==None:
kwargs['after']=after_upload_default
# Determine which FTCompleteOption to assign to run_after
if kwargs['after'] == None:
kwargs['after'] = after_upload_default
if kwargs['run_after']:
kwargs['after']='run'
elif kwargs['run_screen']==False and not kwargs['run_after']:
kwargs['after']='none'
kwargs['after'] = 'run'
elif kwargs['run_screen'] == False and not kwargs['run_after']:
kwargs['after'] = 'none'
kwargs['run_after'] = action_to_kwarg[kwargs['after']]
kwargs.pop('run_screen')
kwargs.pop('after')
Expand All @@ -142,7 +144,8 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg
logger(__name__).debug('Arguments: {}'.format(str(kwargs)))
# Do the actual uploading!
try:
ser = DirectPort(port)
ser = BluetoothPort(port)
# ser = DirectPort(port)
device = None
if kwargs['target'] == 'v5':
device = vex.V5Device(ser)
Expand All @@ -157,6 +160,7 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg
logger(__name__).exception(e, exc_info=True)
exit(1)


@upload_cli.command('lsusb', aliases=['ls-usb', 'ls-devices', 'lsdev', 'list-usb', 'list-devices'])
@click.option('--target', type=click.Choice(['v5', 'cortex']), default=None, required=False)
@default_options
Expand Down Expand Up @@ -207,4 +211,4 @@ def make_upload_terminal(ctx, **upload_kwargs):
analytics.send("upload-terminal")
from .terminal import terminal
ctx.invoke(upload, **upload_kwargs)
ctx.invoke(terminal, request_banner=False)
ctx.invoke(terminal, request_banner=False)
3 changes: 2 additions & 1 deletion pros/serial/ports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

from .base_port import BasePort, PortConnectionException, PortException
from .direct_port import DirectPort
from .ble_port import BluetoothPort
# from .v5_wireless_port import V5WirelessPort


@lru_cache()
def list_all_comports():
ports = list_ports.comports()
logger(__name__).debug('Connected: {}'.format(';'.join([str(p.__dict__) for p in ports])))
return ports
return ports
134 changes: 134 additions & 0 deletions pros/serial/ports/ble_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import sys
import time
from typing import *
import os

import simplepyble

from pros.common import dont_send, logger
from pros.serial.ports.exceptions import (ConnectionRefusedException,
PortNotFoundException)

from .base_port import BasePort, PortConnectionException

MAX_PACKET_SIZE = 244


class SuppressStdout:
def __enter__(self):
with open(os.devnull, 'w') as devnull:
self.orig_stdout_fno = os.dup(sys.stdout.fileno())
os.dup2(devnull.fileno(), 1)

def __exit__(self, *args):
os.dup2(self.orig_stdout_fno, 1)


class BluetoothPort(BasePort):

def __init__(self, port_name: str, **kwargs):

self.UUIDs = {
"SERVICE": "08590f7e-db05-467e-8757-72f6faeb13d5",
"DATA_TX": "08590f7e-db05-467e-8757-72f6faeb1306",
"DATA_RX": "08590f7e-db05-467e-8757-72f6faeb13f5",
"USER_TX": "08590f7e-db05-467e-8757-72f6faeb1316",
"USER_RX": "08590f7e-db05-467e-8757-72f6faeb1326",
"PAIRING": "08590f7e-db05-467e-8757-72f6faeb13e5",
}
self.devices = []

print("scanning for 5 seconds, please wait...")
adapters = simplepyble.Adapter.get_adapters()

if len(adapters) == 0:
print("No adapters found")
exit()
adapter = adapters[0]

adapter.set_callback_on_scan_found(self.scan_found_callback)

# Scan for 5 seconds
# with SuppressStdout():
adapter.scan_start()
print("Scanning...", end='')
while len(self.devices) == 0:
time.sleep(0.5)
print(".", end='', flush=True)
print("")
adapter.scan_stop()
peripherals = adapter.scan_get_results()

peripherals = [peripheral for peripheral in peripherals if "VEX_V5" in peripheral.identifier()]
peripherals = sorted(peripherals, key=lambda peripheral: peripheral.rssi())

self.peripheral = peripherals[0]

self.peripheral.connect()

magic = self.peripheral.read(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"])
if int.from_bytes(magic, "big") != 0xdeadface:
print("No V5 Devices Found")
exit()

self.peripheral.write_request(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"], bytes([0xff, 0xff, 0xff, 0xff]))

# Send pairing code
pairing_bytes = bytes(int(c) for c in "4600")

self.peripheral.write_request(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"], pairing_bytes)
print("Sent pairing code")

cresp = bytes([])
while cresp != pairing_bytes:
cresp = self.peripheral.read(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"])

self.peripheral.notify(self.UUIDs["SERVICE"], self.UUIDs["DATA_TX"], self.handle_notification)

self.buffer: bytearray = bytearray()

def scan_found_callback(self, peripheral):
if "VEX_V5" in peripheral.identifier():
self.devices.append(peripheral.identifier())

def handle_notification(self, data):
# print("Notification received: ", data)
self.buffer.extend(data)

def read(self, n_bytes: int = 0) -> bytes:
if n_bytes <= 0:
msg = bytes(self.buffer)
self.buffer = bytearray()
return msg
else:
if len(self.buffer) < n_bytes:
msg = bytes(self.buffer)
self.buffer = bytearray()
else:
msg, self.buffer = bytes(self.buffer[:n_bytes]), self.buffer[n_bytes:]
return msg

def write(self, data: Union[str, bytes]):
# for line in traceback.format_stack():
# print(line.strip())
if isinstance(data, str):
data = data.encode(encoding='ascii')
else:
data = bytes(data)
for i in range(0, len(data), MAX_PACKET_SIZE):
# print(len(data[i:min(len(data), i+MAX_PACKET_SIZE)]))
self.peripheral.write_command(self.UUIDs["SERVICE"], self.UUIDs["DATA_RX"],
data[i:min(len(data), i + MAX_PACKET_SIZE)])
# time.sleep(0.3)
# self.peripheral.write_command(self.UUIDs["SERVICE"], self.UUIDs["DATA_RX"], bytes([0x00]))

def destroy(self):
logger(__name__).debug(f'Destroying {self.__class__.__name__} to {self.serial.name}')
self.peripheral.disconnect()

@property
def name(self) -> str:
return self.serial.portstr

def __str__(self):
return str("Bluetooth Port")

0 comments on commit aec3435

Please sign in to comment.