diff --git a/pros/cli/upload.py b/pros/cli/upload.py index 545609a4..ea4ce527 100644 --- a/pros/cli/upload.py +++ b/pros/cli/upload.py @@ -7,6 +7,7 @@ from .common import * from pros.ga.analytics import analytics + @pros_root def upload_cli(): pass @@ -20,16 +21,19 @@ def upload_cli(): @project_option(required=False, allow_none=True) @click.option('--run-after/--no-run-after', 'run_after', default=None, help='Immediately run the uploaded program.', cls=PROSDeprecated, replacement='after') -@click.option('--run-screen/--execute', 'run_screen', default=None, help='Display run program screen on the brain after upload.', +@click.option('--run-screen/--execute', 'run_screen', default=None, + help='Display run program screen on the brain after upload.', cls=PROSDeprecated, replacement='after') -@click.option('-af', '--after', type=click.Choice(['run','screen','none']), default=None, help='Action to perform on the brain after upload.', +@click.option('-af', '--after', type=click.Choice(['run', 'screen', 'none']), default=None, + help='Action to perform on the brain after upload.', cls=PROSOption, group='V5 Options') @click.option('--quirk', type=int, default=0) @click.option('--name', 'remote_name', type=str, default=None, required=False, help='Remote program name.', cls=PROSOption, group='V5 Options') @click.option('--slot', default=None, type=click.IntRange(min=1, max=8), help='Program slot on the GUI.', cls=PROSOption, group='V5 Options') -@click.option('--icon', type=click.Choice(['pros','pizza','planet','alien','ufo','robot','clawbot','question','X','power']), default='pros', +@click.option('--icon', type=click.Choice( + ['pros', 'pizza', 'planet', 'alien', 'ufo', 'robot', 'clawbot', 'question', 'X', 'power']), default='pros', help="Change Program's icon on the V5 Brain", cls=PROSOption, group='V5 Options') @click.option('--program-version', default=None, type=str, help='Specify version metadata for program.', cls=PROSOption, group='V5 Options', hidden=True) @@ -37,11 +41,10 @@ def upload_cli(): cls=PROSOption, group='V5 Options', hidden=True) @click.option('--compress-bin/--no-compress-bin', 'compress_bin', cls=PROSOption, group='V5 Options', default=True, help='Compress the program binary before uploading.') -@click.option('--description', default="Made with PROS", type=str, cls=PROSOption, group='V5 Options', +@click.option('--description', default="Made with PROS", type=str, cls=PROSOption, group='V5 Options', help='Change the description displayed for the program.') -@click.option('--name', default=None, type=str, cls=PROSOption, group='V5 Options', +@click.option('--name', default=None, type=str, cls=PROSOption, group='V5 Options', help='Change the name of the program.') - @default_options def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwargs): """ @@ -56,7 +59,8 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg analytics.send("upload") import pros.serial.devices.vex as vex from pros.serial.ports import DirectPort - kwargs['ide_version'] = project.kernel if not project==None else "None" + from pros.serial.ports import BluetoothPort + kwargs['ide_version'] = project.kernel if not project == None else "None" kwargs['ide'] = 'PROS' if path is None or os.path.isdir(path): if project is None: @@ -70,15 +74,13 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg # apply upload_options as a template options = dict(**project.upload_options) - if 'port' in options and port is None: - port = options.get('port', None) if 'slot' in options and kwargs.get('slot', None) is None: kwargs.pop('slot') elif kwargs.get('slot', None) is None: kwargs['slot'] = 1 - if 'icon' in options and kwargs.get('icon','pros') == 'pros': + if 'icon' in options and kwargs.get('icon', 'pros') == 'pros': kwargs.pop('icon') - if 'after' in options and kwargs.get('after','screen') is None: + if 'after' in options and kwargs.get('after', 'screen') is None: kwargs.pop('after') options.update(kwargs) @@ -89,16 +91,16 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg if 'remote_name' not in kwargs: kwargs['remote_name'] = project.name name_to_file = { - 'pros' : 'USER902x.bmp', - 'pizza' : 'USER003x.bmp', - 'planet' : 'USER013x.bmp', - 'alien' : 'USER027x.bmp', - 'ufo' : 'USER029x.bmp', - 'clawbot' : 'USER010x.bmp', - 'robot' : 'USER011x.bmp', - 'question' : 'USER002x.bmp', - 'power' : 'USER012x.bmp', - 'X' : 'USER001x.bmp' + 'pros': 'USER902x.bmp', + 'pizza': 'USER003x.bmp', + 'planet': 'USER013x.bmp', + 'alien': 'USER027x.bmp', + 'ufo': 'USER029x.bmp', + 'clawbot': 'USER010x.bmp', + 'robot': 'USER011x.bmp', + 'question': 'USER002x.bmp', + 'power': 'USER012x.bmp', + 'X': 'USER001x.bmp' } kwargs['icon'] = name_to_file[kwargs['icon']] if 'target' not in kwargs or kwargs['target'] is None: @@ -111,28 +113,28 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg else: logger(__name__).debug(f"Invalid target provided: {kwargs['target']}") logger(__name__).debug('Target should be one of ("v5" or "cortex").') - if not port: - raise dont_send(click.UsageError('No port provided or located. Make sure to specify --target if needed.')) + # if not port: + # raise dont_send(click.UsageError('No port provided or located. Make sure to specify --target if needed.')) if kwargs['target'] == 'v5': - kwargs['remote_name'] = kwargs['name'] if kwargs.get("name",None) else kwargs['remote_name'] + kwargs['remote_name'] = kwargs['name'] if kwargs.get("name", None) else kwargs['remote_name'] if kwargs['remote_name'] is None: kwargs['remote_name'] = os.path.splitext(os.path.basename(path))[0] kwargs['remote_name'] = kwargs['remote_name'].replace('@', '_') kwargs['slot'] -= 1 - + action_to_kwarg = { - 'run' : vex.V5Device.FTCompleteOptions.RUN_IMMEDIATELY, - 'screen' : vex.V5Device.FTCompleteOptions.RUN_SCREEN, - 'none' : vex.V5Device.FTCompleteOptions.DONT_RUN - } + 'run': vex.V5Device.FTCompleteOptions.RUN_IMMEDIATELY, + 'screen': vex.V5Device.FTCompleteOptions.RUN_SCREEN, + 'none': vex.V5Device.FTCompleteOptions.DONT_RUN + } after_upload_default = 'screen' - #Determine which FTCompleteOption to assign to run_after - if kwargs['after']==None: - kwargs['after']=after_upload_default + # Determine which FTCompleteOption to assign to run_after + if kwargs['after'] == None: + kwargs['after'] = after_upload_default if kwargs['run_after']: - kwargs['after']='run' - elif kwargs['run_screen']==False and not kwargs['run_after']: - kwargs['after']='none' + kwargs['after'] = 'run' + elif kwargs['run_screen'] == False and not kwargs['run_after']: + kwargs['after'] = 'none' kwargs['run_after'] = action_to_kwarg[kwargs['after']] kwargs.pop('run_screen') kwargs.pop('after') @@ -142,7 +144,8 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg logger(__name__).debug('Arguments: {}'.format(str(kwargs))) # Do the actual uploading! try: - ser = DirectPort(port) + ser = BluetoothPort(port) + # ser = DirectPort(port) device = None if kwargs['target'] == 'v5': device = vex.V5Device(ser) @@ -157,6 +160,7 @@ def upload(path: Optional[str], project: Optional[c.Project], port: str, **kwarg logger(__name__).exception(e, exc_info=True) exit(1) + @upload_cli.command('lsusb', aliases=['ls-usb', 'ls-devices', 'lsdev', 'list-usb', 'list-devices']) @click.option('--target', type=click.Choice(['v5', 'cortex']), default=None, required=False) @default_options @@ -207,4 +211,4 @@ def make_upload_terminal(ctx, **upload_kwargs): analytics.send("upload-terminal") from .terminal import terminal ctx.invoke(upload, **upload_kwargs) - ctx.invoke(terminal, request_banner=False) + ctx.invoke(terminal, request_banner=False) \ No newline at end of file diff --git a/pros/serial/ports/__init__.py b/pros/serial/ports/__init__.py index be344a79..c4d452f7 100644 --- a/pros/serial/ports/__init__.py +++ b/pros/serial/ports/__init__.py @@ -5,6 +5,7 @@ from .base_port import BasePort, PortConnectionException, PortException from .direct_port import DirectPort +from .ble_port import BluetoothPort # from .v5_wireless_port import V5WirelessPort @@ -12,4 +13,4 @@ def list_all_comports(): ports = list_ports.comports() logger(__name__).debug('Connected: {}'.format(';'.join([str(p.__dict__) for p in ports]))) - return ports + return ports \ No newline at end of file diff --git a/pros/serial/ports/ble_port.py b/pros/serial/ports/ble_port.py new file mode 100644 index 00000000..652c459e --- /dev/null +++ b/pros/serial/ports/ble_port.py @@ -0,0 +1,134 @@ +import sys +import time +from typing import * +import os + +import simplepyble + +from pros.common import dont_send, logger +from pros.serial.ports.exceptions import (ConnectionRefusedException, + PortNotFoundException) + +from .base_port import BasePort, PortConnectionException + +MAX_PACKET_SIZE = 244 + + +class SuppressStdout: + def __enter__(self): + with open(os.devnull, 'w') as devnull: + self.orig_stdout_fno = os.dup(sys.stdout.fileno()) + os.dup2(devnull.fileno(), 1) + + def __exit__(self, *args): + os.dup2(self.orig_stdout_fno, 1) + + +class BluetoothPort(BasePort): + + def __init__(self, port_name: str, **kwargs): + + self.UUIDs = { + "SERVICE": "08590f7e-db05-467e-8757-72f6faeb13d5", + "DATA_TX": "08590f7e-db05-467e-8757-72f6faeb1306", + "DATA_RX": "08590f7e-db05-467e-8757-72f6faeb13f5", + "USER_TX": "08590f7e-db05-467e-8757-72f6faeb1316", + "USER_RX": "08590f7e-db05-467e-8757-72f6faeb1326", + "PAIRING": "08590f7e-db05-467e-8757-72f6faeb13e5", + } + self.devices = [] + + print("scanning for 5 seconds, please wait...") + adapters = simplepyble.Adapter.get_adapters() + + if len(adapters) == 0: + print("No adapters found") + exit() + adapter = adapters[0] + + adapter.set_callback_on_scan_found(self.scan_found_callback) + + # Scan for 5 seconds + # with SuppressStdout(): + adapter.scan_start() + print("Scanning...", end='') + while len(self.devices) == 0: + time.sleep(0.5) + print(".", end='', flush=True) + print("") + adapter.scan_stop() + peripherals = adapter.scan_get_results() + + peripherals = [peripheral for peripheral in peripherals if "VEX_V5" in peripheral.identifier()] + peripherals = sorted(peripherals, key=lambda peripheral: peripheral.rssi()) + + self.peripheral = peripherals[0] + + self.peripheral.connect() + + magic = self.peripheral.read(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"]) + if int.from_bytes(magic, "big") != 0xdeadface: + print("No V5 Devices Found") + exit() + + self.peripheral.write_request(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"], bytes([0xff, 0xff, 0xff, 0xff])) + + # Send pairing code + pairing_bytes = bytes(int(c) for c in "4600") + + self.peripheral.write_request(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"], pairing_bytes) + print("Sent pairing code") + + cresp = bytes([]) + while cresp != pairing_bytes: + cresp = self.peripheral.read(self.UUIDs["SERVICE"], self.UUIDs["PAIRING"]) + + self.peripheral.notify(self.UUIDs["SERVICE"], self.UUIDs["DATA_TX"], self.handle_notification) + + self.buffer: bytearray = bytearray() + + def scan_found_callback(self, peripheral): + if "VEX_V5" in peripheral.identifier(): + self.devices.append(peripheral.identifier()) + + def handle_notification(self, data): + # print("Notification received: ", data) + self.buffer.extend(data) + + def read(self, n_bytes: int = 0) -> bytes: + if n_bytes <= 0: + msg = bytes(self.buffer) + self.buffer = bytearray() + return msg + else: + if len(self.buffer) < n_bytes: + msg = bytes(self.buffer) + self.buffer = bytearray() + else: + msg, self.buffer = bytes(self.buffer[:n_bytes]), self.buffer[n_bytes:] + return msg + + def write(self, data: Union[str, bytes]): + # for line in traceback.format_stack(): + # print(line.strip()) + if isinstance(data, str): + data = data.encode(encoding='ascii') + else: + data = bytes(data) + for i in range(0, len(data), MAX_PACKET_SIZE): + # print(len(data[i:min(len(data), i+MAX_PACKET_SIZE)])) + self.peripheral.write_command(self.UUIDs["SERVICE"], self.UUIDs["DATA_RX"], + data[i:min(len(data), i + MAX_PACKET_SIZE)]) + # time.sleep(0.3) + # self.peripheral.write_command(self.UUIDs["SERVICE"], self.UUIDs["DATA_RX"], bytes([0x00])) + + def destroy(self): + logger(__name__).debug(f'Destroying {self.__class__.__name__} to {self.serial.name}') + self.peripheral.disconnect() + + @property + def name(self) -> str: + return self.serial.portstr + + def __str__(self): + return str("Bluetooth Port") \ No newline at end of file