-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
295 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,47 @@ | ||
# edgepi-rpc-client-py | ||
EdgePi RPC Client Python | ||
![Image](https://user-images.githubusercontent.com/3793563/207438826-bb656ca5-f19d-4699-8cb4-35acccb2ce58.svg) | ||
|
||
# EdgePi RPC Client | ||
|
||
The RPC client exposes functionality of the EdgePi Python SDK to be used over either a network connection or on local sockets, allowing for potential remote control of the EdgePi. | ||
|
||
Since the client directly mirrors the SDK, you can take advantage of the user-friendly SDK functionality with the flexibility of different connection protocols depending on your specific use case. | ||
|
||
You can learn more about the Python SDK [here](https://github.com/EdgePi-Cloud/edgepi-python-sdk). | ||
|
||
# Using the RPC Client | ||
|
||
Install the RPC client through the terminal: | ||
|
||
``` | ||
$ python3 -m pip install edgepi-rpc-client | ||
``` | ||
|
||
Once installed, you can control the modules of the EdgePi directly through the SDK's wide functionality. | ||
|
||
For example, from a remote connection with address `localhost` and port `5555`, initialize a client module as: | ||
|
||
```python | ||
from edgepi_rpc_client.services.adc.client_adc_service import ClientAdcService | ||
from edgepi_rpc_client.services.adc.adc_pb_enums import ADCChannel, ConvMode | ||
|
||
# initialize ADC Client | ||
adc_client = ClientAdcService('tcp://localhost:5555') | ||
|
||
# configure ADC to sample input pin 4 (the input pins are 0-indexed) | ||
adc_client.set_config(adc_1_analog_in=ADCChannel.AIN3, conversion_mode=ConvMode.CONTINUOUS) | ||
|
||
# send command to start automatic conversions | ||
adc_client.start_conversions() | ||
|
||
# perform 10 voltage reads | ||
for _ in range(10): | ||
out = adc_client.read_voltage() | ||
print(out) | ||
|
||
# stop automatic conversions | ||
adc_client.stop_conversions() | ||
``` | ||
|
||
Once the client is initialized you can control the EdgePi exactly as you would through the SDK. | ||
|
||
For details about available modules visit the SDK GitHub repository linked above. |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
"""Client for PWM service""" | ||
import logging | ||
from edgepirpc.protos import pwm_pb2 as pwm_pb | ||
from edgepi_rpc_client.client_rpc_channel import ClientRpcChannel | ||
from edgepi_rpc_client.util.helpers import ( | ||
filter_arg_values, | ||
create_config_request_from_args, | ||
get_server_response, | ||
) | ||
from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity | ||
|
||
_logger = logging.getLogger(__name__) | ||
|
||
|
||
# pylint: disable=no-member | ||
class ClientPWMService: | ||
"""Client Methods for PWM Service""" | ||
|
||
def __init__(self, transport): | ||
self.client_rpc_channel = ClientRpcChannel(transport) | ||
self.service_stub = pwm_pb.PWMService_Stub(self.client_rpc_channel) | ||
self.rpc_controller = None | ||
|
||
# pylint: disable=unused-argument | ||
def set_config( | ||
self, | ||
pwm_num: PWMPins, | ||
frequency: float = None, | ||
duty_cycle: float = None, | ||
polarity: Polarity = None, | ||
): | ||
"""set_config method for SDK PWM module""" | ||
config_args_dict = filter_arg_values(locals(), "self", None) | ||
config_msg = pwm_pb.Config() | ||
arg_msg = pwm_pb.Config().ConfArg() | ||
request = create_config_request_from_args(config_msg, arg_msg, config_args_dict) | ||
return self.perform_call_from_request( | ||
request, self.service_stub.set_config, pwm_pb.SuccessMsg | ||
).content | ||
|
||
def enable(self, pwm_num: PWMPins): | ||
"""enable method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.enable, pwm_pb.SuccessMsg | ||
).content | ||
|
||
def disable(self, pwm_num: PWMPins): | ||
"""disable method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.disable, pwm_pb.SuccessMsg | ||
).content | ||
|
||
def close(self, pwm_num: PWMPins): | ||
"""close method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.close, pwm_pb.SuccessMsg | ||
).content | ||
|
||
def init_pwm(self, pwm_num: PWMPins): | ||
"""init_pwm method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.init_pwm, pwm_pb.SuccessMsg | ||
).content | ||
|
||
def get_frequency(self, pwm_num: PWMPins): | ||
"""get_frequency method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.get_frequency, pwm_pb.GetFrequency | ||
).frequency | ||
|
||
def get_duty_cycle(self, pwm_num: PWMPins): | ||
"""get_duty_cycle method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.get_duty_cycle, pwm_pb.GetDutyCycle | ||
).duty_cycle | ||
|
||
def get_polarity(self, pwm_num: PWMPins): | ||
"""get_polarity method for SDK PWM module""" | ||
polarity = self.perform_rpc_call( | ||
pwm_num, self.service_stub.get_polarity, pwm_pb.GetPolarity | ||
).polarity | ||
return Polarity(polarity) | ||
|
||
def get_enabled(self, pwm_num: PWMPins): | ||
"""get_enabled method for SDK PWM module""" | ||
return self.perform_rpc_call( | ||
pwm_num, self.service_stub.get_enabled, pwm_pb.GetEnabled | ||
).enabled | ||
|
||
# TODO: Potentially these helpers in other services and put them in a separate file | ||
def perform_rpc_call(self, pwm_num, method, response_type): | ||
"""Performs RPC call with PWM number and specified method""" | ||
request = pwm_pb.PWM(pwm_num=pwm_num.value) | ||
return self.perform_call_from_request(request, method, response_type) | ||
|
||
def perform_call_from_request(self, request, method, response_type): | ||
"""Executes RPC call using provided request and method""" | ||
rpc_response = method(self.rpc_controller, request) | ||
return get_server_response(rpc_response, response_type) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
"""Client enums to protobuf PWM enums""" | ||
from enum import Enum | ||
from edgepirpc.protos import pwm_pb2 as pwm_pb | ||
|
||
class PWMPins(Enum): | ||
"""PWMPins Enum""" | ||
PWM1 = pwm_pb.PWMPins.PWM1 | ||
PWM2 = pwm_pb.PWMPins.PWM2 | ||
|
||
class Polarity(Enum): | ||
"""Polarity Enum""" | ||
NORMAL = pwm_pb.Polarity.NORMAL | ||
INVERSED = pwm_pb.Polarity.INVERSED |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
"""PWMService integration test""" | ||
import pytest | ||
import time | ||
from edgepi_rpc_client.services.pwm.client_pwm_service import ClientPWMService | ||
from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity | ||
|
||
|
||
@pytest.fixture(name="pwm_service") | ||
def fixture_test_pwm_service(): | ||
"""Inits new PWM service client for testing""" | ||
return ClientPWMService('tcp://localhost:4444') | ||
|
||
@pytest.mark.parametrize( | ||
"args", | ||
[ | ||
{"pwm_num": PWMPins.PWM1, "polarity": Polarity.NORMAL}, | ||
{"pwm_num": PWMPins.PWM2, "polarity": Polarity.INVERSED}, | ||
{"pwm_num": PWMPins.PWM1, "frequency": 1000}, | ||
{"pwm_num": PWMPins.PWM1, "duty_cycle": 0}, | ||
] | ||
) | ||
def test_set_config(pwm_service, args): | ||
"""Test for set_config""" | ||
response_init = pwm_service.init_pwm(pwm_num=args['pwm_num']) | ||
assert response_init == f"Successfully initialized {args['pwm_num']}." | ||
response_config = pwm_service.set_config(**args) | ||
assert response_config == "Successfully applied pwm configurations." | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_enable(pwm_service, pwm_num): | ||
"""Test for set_enable""" | ||
pwm_service.init_pwm(pwm_num) | ||
response = pwm_service.enable(pwm_num) | ||
assert response == f"Successfully enabled {pwm_num}." | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_disable(pwm_service, pwm_num): | ||
"""Test for set_disable""" | ||
pwm_service.init_pwm(pwm_num) | ||
response = pwm_service.disable(pwm_num) | ||
assert response == f"Successfully disabled {pwm_num}." | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_close(pwm_service, pwm_num): | ||
"""Test for close""" | ||
pwm_service.init_pwm(pwm_num) | ||
response = pwm_service.close(pwm_num) | ||
assert response == f"Successfully closed {pwm_num}." | ||
|
||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_get_frequency(pwm_service, pwm_num): | ||
"""Test for get_frequency""" | ||
pwm_service.init_pwm(pwm_num) | ||
frequency = pwm_service.get_frequency(pwm_num) | ||
assert isinstance(frequency, float) | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_get_duty_cycle(pwm_service, pwm_num): | ||
"""Test for get_duty_cycle""" | ||
pwm_service.init_pwm(pwm_num) | ||
duty_cycle = pwm_service.get_duty_cycle(pwm_num) | ||
assert isinstance(duty_cycle, float) | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_get_polarity(pwm_service, pwm_num): | ||
"""Test for get_polarity""" | ||
pwm_service.init_pwm(pwm_num) | ||
polarity = pwm_service.get_polarity(pwm_num) | ||
assert polarity in Polarity | ||
|
||
@pytest.mark.parametrize( | ||
'pwm_num', | ||
[ | ||
(PWMPins.PWM1), | ||
(PWMPins.PWM2), | ||
] | ||
) | ||
def test_get_enabled(pwm_service, pwm_num): | ||
"""Test for get_enabled""" | ||
pwm_service.init_pwm(pwm_num) | ||
enabled = pwm_service.get_enabled(pwm_num) | ||
assert isinstance(enabled, bool) | ||
|
||
def test_with_edgepi(pwm_service): | ||
"""Manual testing of PWM functionality.""" | ||
pwm_service.init_pwm(pwm_num=PWMPins.PWM1) | ||
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0) | ||
pwm_service.enable(PWMPins.PWM1) | ||
time.sleep(3) | ||
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0.5) | ||
time.sleep(3) | ||
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=1) | ||
|