diff --git a/README.md b/README.md index d791eda..392ff50 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,47 @@ -# edgepi-rpc-client-py -EdgePi RPC Client Python +![Image](https://user-images.githubusercontent.com/3793563/207438826-bb656ca5-f19d-4699-8cb4-35acccb2ce58.svg) + +# EdgePi RPC Client + +The RPC client exposes functionality of the EdgePi Python SDK to be used over either a network connection or on local sockets, allowing for potential remote control of the EdgePi. + +Since the client directly mirrors the SDK, you can take advantage of the user-friendly SDK functionality with the flexibility of different connection protocols depending on your specific use case. + +You can learn more about the Python SDK [here](https://github.com/EdgePi-Cloud/edgepi-python-sdk). + +# Using the RPC Client + +Install the RPC client through the terminal: + +``` +$ python3 -m pip install edgepi-rpc-client +``` + +Once installed, you can control the modules of the EdgePi directly through the SDK's wide functionality. + +For example, from a remote connection with address `localhost` and port `5555`, initialize a client module as: + +```python +from edgepi_rpc_client.services.adc.client_adc_service import ClientAdcService +from edgepi_rpc_client.services.adc.adc_pb_enums import ADCChannel, ConvMode + +# initialize ADC Client +adc_client = ClientAdcService('tcp://localhost:5555') + +# configure ADC to sample input pin 4 (the input pins are 0-indexed) +adc_client.set_config(adc_1_analog_in=ADCChannel.AIN3, conversion_mode=ConvMode.CONTINUOUS) + +# send command to start automatic conversions +adc_client.start_conversions() + +# perform 10 voltage reads +for _ in range(10): + out = adc_client.read_voltage() + print(out) + +# stop automatic conversions +adc_client.stop_conversions() +``` + +Once the client is initialized you can control the EdgePi exactly as you would through the SDK. + +For details about available modules visit the SDK GitHub repository linked above. diff --git a/edgepi_rpc_client/services/pwm/__init__.py b/edgepi_rpc_client/services/pwm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/edgepi_rpc_client/services/pwm/client_pwm_service.py b/edgepi_rpc_client/services/pwm/client_pwm_service.py new file mode 100644 index 0000000..c205545 --- /dev/null +++ b/edgepi_rpc_client/services/pwm/client_pwm_service.py @@ -0,0 +1,99 @@ +"""Client for PWM service""" +import logging +from edgepirpc.protos import pwm_pb2 as pwm_pb +from edgepi_rpc_client.client_rpc_channel import ClientRpcChannel +from edgepi_rpc_client.util.helpers import ( + filter_arg_values, + create_config_request_from_args, + get_server_response, +) +from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity + +_logger = logging.getLogger(__name__) + + +# pylint: disable=no-member +class ClientPWMService: + """Client Methods for PWM Service""" + + def __init__(self, transport): + self.client_rpc_channel = ClientRpcChannel(transport) + self.service_stub = pwm_pb.PWMService_Stub(self.client_rpc_channel) + self.rpc_controller = None + + # pylint: disable=unused-argument + def set_config( + self, + pwm_num: PWMPins, + frequency: float = None, + duty_cycle: float = None, + polarity: Polarity = None, + ): + """set_config method for SDK PWM module""" + config_args_dict = filter_arg_values(locals(), "self", None) + config_msg = pwm_pb.Config() + arg_msg = pwm_pb.Config().ConfArg() + request = create_config_request_from_args(config_msg, arg_msg, config_args_dict) + return self.perform_call_from_request( + request, self.service_stub.set_config, pwm_pb.SuccessMsg + ).content + + def enable(self, pwm_num: PWMPins): + """enable method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.enable, pwm_pb.SuccessMsg + ).content + + def disable(self, pwm_num: PWMPins): + """disable method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.disable, pwm_pb.SuccessMsg + ).content + + def close(self, pwm_num: PWMPins): + """close method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.close, pwm_pb.SuccessMsg + ).content + + def init_pwm(self, pwm_num: PWMPins): + """init_pwm method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.init_pwm, pwm_pb.SuccessMsg + ).content + + def get_frequency(self, pwm_num: PWMPins): + """get_frequency method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.get_frequency, pwm_pb.GetFrequency + ).frequency + + def get_duty_cycle(self, pwm_num: PWMPins): + """get_duty_cycle method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.get_duty_cycle, pwm_pb.GetDutyCycle + ).duty_cycle + + def get_polarity(self, pwm_num: PWMPins): + """get_polarity method for SDK PWM module""" + polarity = self.perform_rpc_call( + pwm_num, self.service_stub.get_polarity, pwm_pb.GetPolarity + ).polarity + return Polarity(polarity) + + def get_enabled(self, pwm_num: PWMPins): + """get_enabled method for SDK PWM module""" + return self.perform_rpc_call( + pwm_num, self.service_stub.get_enabled, pwm_pb.GetEnabled + ).enabled + + # TODO: Potentially these helpers in other services and put them in a separate file + def perform_rpc_call(self, pwm_num, method, response_type): + """Performs RPC call with PWM number and specified method""" + request = pwm_pb.PWM(pwm_num=pwm_num.value) + return self.perform_call_from_request(request, method, response_type) + + def perform_call_from_request(self, request, method, response_type): + """Executes RPC call using provided request and method""" + rpc_response = method(self.rpc_controller, request) + return get_server_response(rpc_response, response_type) diff --git a/edgepi_rpc_client/services/pwm/pwm_pb_enums.py b/edgepi_rpc_client/services/pwm/pwm_pb_enums.py new file mode 100644 index 0000000..4b442e0 --- /dev/null +++ b/edgepi_rpc_client/services/pwm/pwm_pb_enums.py @@ -0,0 +1,13 @@ +"""Client enums to protobuf PWM enums""" +from enum import Enum +from edgepirpc.protos import pwm_pb2 as pwm_pb + +class PWMPins(Enum): + """PWMPins Enum""" + PWM1 = pwm_pb.PWMPins.PWM1 + PWM2 = pwm_pb.PWMPins.PWM2 + +class Polarity(Enum): + """Polarity Enum""" + NORMAL = pwm_pb.Polarity.NORMAL + INVERSED = pwm_pb.Polarity.INVERSED diff --git a/tests/integration/test_module.py b/tests/integration/test_module.py index d6f64e0..f6f2e6e 100644 --- a/tests/integration/test_module.py +++ b/tests/integration/test_module.py @@ -1,11 +1,11 @@ +"""Integration test for RPC Client module installation""" import subprocess import os -import pytest from tempfile import TemporaryDirectory def test_rpc_client_install(): """Test client import""" - + # Build the wheel subprocess.check_call(["pip3", "install", "build"]) subprocess.check_call(["python", "-m", "build"]) @@ -16,9 +16,12 @@ def test_rpc_client_install(): with TemporaryDirectory() as tempdir: # Install the wheel inside temp directory - subprocess.check_call(["pip3", "install", os.path.join("dist", wheel_file), "--target", tempdir]) + subprocess.check_call( + ["pip3", "install", os.path.join("dist", wheel_file), "--target", tempdir] + ) # Add the temporary directory to sys.path so that we can import from there + # pylint: disable=import-outside-toplevel import sys sys.path.insert(0, tempdir) diff --git a/tests/integration/test_pwm_service.py b/tests/integration/test_pwm_service.py new file mode 100644 index 0000000..e37a927 --- /dev/null +++ b/tests/integration/test_pwm_service.py @@ -0,0 +1,130 @@ +"""PWMService integration test""" +import pytest +import time +from edgepi_rpc_client.services.pwm.client_pwm_service import ClientPWMService +from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity + + +@pytest.fixture(name="pwm_service") +def fixture_test_pwm_service(): + """Inits new PWM service client for testing""" + return ClientPWMService('tcp://localhost:4444') + +@pytest.mark.parametrize( + "args", + [ + {"pwm_num": PWMPins.PWM1, "polarity": Polarity.NORMAL}, + {"pwm_num": PWMPins.PWM2, "polarity": Polarity.INVERSED}, + {"pwm_num": PWMPins.PWM1, "frequency": 1000}, + {"pwm_num": PWMPins.PWM1, "duty_cycle": 0}, + ] +) +def test_set_config(pwm_service, args): + """Test for set_config""" + response_init = pwm_service.init_pwm(pwm_num=args['pwm_num']) + assert response_init == f"Successfully initialized {args['pwm_num']}." + response_config = pwm_service.set_config(**args) + assert response_config == "Successfully applied pwm configurations." + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_enable(pwm_service, pwm_num): + """Test for set_enable""" + pwm_service.init_pwm(pwm_num) + response = pwm_service.enable(pwm_num) + assert response == f"Successfully enabled {pwm_num}." + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_disable(pwm_service, pwm_num): + """Test for set_disable""" + pwm_service.init_pwm(pwm_num) + response = pwm_service.disable(pwm_num) + assert response == f"Successfully disabled {pwm_num}." + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_close(pwm_service, pwm_num): + """Test for close""" + pwm_service.init_pwm(pwm_num) + response = pwm_service.close(pwm_num) + assert response == f"Successfully closed {pwm_num}." + + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_get_frequency(pwm_service, pwm_num): + """Test for get_frequency""" + pwm_service.init_pwm(pwm_num) + frequency = pwm_service.get_frequency(pwm_num) + assert isinstance(frequency, float) + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_get_duty_cycle(pwm_service, pwm_num): + """Test for get_duty_cycle""" + pwm_service.init_pwm(pwm_num) + duty_cycle = pwm_service.get_duty_cycle(pwm_num) + assert isinstance(duty_cycle, float) + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_get_polarity(pwm_service, pwm_num): + """Test for get_polarity""" + pwm_service.init_pwm(pwm_num) + polarity = pwm_service.get_polarity(pwm_num) + assert polarity in Polarity + +@pytest.mark.parametrize( + 'pwm_num', + [ + (PWMPins.PWM1), + (PWMPins.PWM2), + ] +) +def test_get_enabled(pwm_service, pwm_num): + """Test for get_enabled""" + pwm_service.init_pwm(pwm_num) + enabled = pwm_service.get_enabled(pwm_num) + assert isinstance(enabled, bool) + +def test_with_edgepi(pwm_service): + """Manual testing of PWM functionality.""" + pwm_service.init_pwm(pwm_num=PWMPins.PWM1) + pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0) + pwm_service.enable(PWMPins.PWM1) + time.sleep(3) + pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0.5) + time.sleep(3) + pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=1) + \ No newline at end of file