Skip to content

Commit

Permalink
Merge pull request #35 from EdgePi-Cloud/add-pwm
Browse files Browse the repository at this point in the history
Add a service for the PWM module
  • Loading branch information
josiah-tesfu authored Nov 14, 2023
2 parents 85a48b9 + 9b70df0 commit aa20428
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 0 deletions.
Empty file.
99 changes: 99 additions & 0 deletions edgepi_rpc_client/services/pwm/client_pwm_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Client for PWM service"""
import logging
from edgepirpc.protos import pwm_pb2 as pwm_pb
from edgepi_rpc_client.client_rpc_channel import ClientRpcChannel
from edgepi_rpc_client.util.helpers import (
filter_arg_values,
create_config_request_from_args,
get_server_response,
)
from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity

_logger = logging.getLogger(__name__)


# pylint: disable=no-member
class ClientPWMService:
"""Client Methods for PWM Service"""

def __init__(self, transport):
self.client_rpc_channel = ClientRpcChannel(transport)
self.service_stub = pwm_pb.PWMService_Stub(self.client_rpc_channel)
self.rpc_controller = None

# pylint: disable=unused-argument
def set_config(
self,
pwm_num: PWMPins,
frequency: float = None,
duty_cycle: float = None,
polarity: Polarity = None,
):
"""set_config method for SDK PWM module"""
config_args_dict = filter_arg_values(locals(), "self", None)
config_msg = pwm_pb.Config()
arg_msg = pwm_pb.Config().ConfArg()
request = create_config_request_from_args(config_msg, arg_msg, config_args_dict)
return self.perform_call_from_request(
request, self.service_stub.set_config, pwm_pb.SuccessMsg
).content

def enable(self, pwm_num: PWMPins):
"""enable method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.enable, pwm_pb.SuccessMsg
).content

def disable(self, pwm_num: PWMPins):
"""disable method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.disable, pwm_pb.SuccessMsg
).content

def close(self, pwm_num: PWMPins):
"""close method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.close, pwm_pb.SuccessMsg
).content

def init_pwm(self, pwm_num: PWMPins):
"""init_pwm method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.init_pwm, pwm_pb.SuccessMsg
).content

def get_frequency(self, pwm_num: PWMPins):
"""get_frequency method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.get_frequency, pwm_pb.GetFrequency
).frequency

def get_duty_cycle(self, pwm_num: PWMPins):
"""get_duty_cycle method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.get_duty_cycle, pwm_pb.GetDutyCycle
).duty_cycle

def get_polarity(self, pwm_num: PWMPins):
"""get_polarity method for SDK PWM module"""
polarity = self.perform_rpc_call(
pwm_num, self.service_stub.get_polarity, pwm_pb.GetPolarity
).polarity
return Polarity(polarity)

def get_enabled(self, pwm_num: PWMPins):
"""get_enabled method for SDK PWM module"""
return self.perform_rpc_call(
pwm_num, self.service_stub.get_enabled, pwm_pb.GetEnabled
).enabled

# TODO: Potentially these helpers in other services and put them in a separate file
def perform_rpc_call(self, pwm_num, method, response_type):
"""Performs RPC call with PWM number and specified method"""
request = pwm_pb.PWM(pwm_num=pwm_num.value)
return self.perform_call_from_request(request, method, response_type)

def perform_call_from_request(self, request, method, response_type):
"""Executes RPC call using provided request and method"""
rpc_response = method(self.rpc_controller, request)
return get_server_response(rpc_response, response_type)
13 changes: 13 additions & 0 deletions edgepi_rpc_client/services/pwm/pwm_pb_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Client enums to protobuf PWM enums"""
from enum import Enum
from edgepirpc.protos import pwm_pb2 as pwm_pb

class PWMPins(Enum):
"""PWMPins Enum"""
PWM1 = pwm_pb.PWMPins.PWM1
PWM2 = pwm_pb.PWMPins.PWM2

class Polarity(Enum):
"""Polarity Enum"""
NORMAL = pwm_pb.Polarity.NORMAL
INVERSED = pwm_pb.Polarity.INVERSED
130 changes: 130 additions & 0 deletions tests/integration/test_pwm_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""PWMService integration test"""
import pytest
import time
from edgepi_rpc_client.services.pwm.client_pwm_service import ClientPWMService
from edgepi_rpc_client.services.pwm.pwm_pb_enums import PWMPins, Polarity


@pytest.fixture(name="pwm_service")
def fixture_test_pwm_service():
"""Inits new PWM service client for testing"""
return ClientPWMService('tcp://localhost:4444')

@pytest.mark.parametrize(
"args",
[
{"pwm_num": PWMPins.PWM1, "polarity": Polarity.NORMAL},
{"pwm_num": PWMPins.PWM2, "polarity": Polarity.INVERSED},
{"pwm_num": PWMPins.PWM1, "frequency": 1000},
{"pwm_num": PWMPins.PWM1, "duty_cycle": 0},
]
)
def test_set_config(pwm_service, args):
"""Test for set_config"""
response_init = pwm_service.init_pwm(pwm_num=args['pwm_num'])
assert response_init == f"Successfully initialized {args['pwm_num']}."
response_config = pwm_service.set_config(**args)
assert response_config == "Successfully applied pwm configurations."

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_enable(pwm_service, pwm_num):
"""Test for set_enable"""
pwm_service.init_pwm(pwm_num)
response = pwm_service.enable(pwm_num)
assert response == f"Successfully enabled {pwm_num}."

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_disable(pwm_service, pwm_num):
"""Test for set_disable"""
pwm_service.init_pwm(pwm_num)
response = pwm_service.disable(pwm_num)
assert response == f"Successfully disabled {pwm_num}."

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_close(pwm_service, pwm_num):
"""Test for close"""
pwm_service.init_pwm(pwm_num)
response = pwm_service.close(pwm_num)
assert response == f"Successfully closed {pwm_num}."


@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_get_frequency(pwm_service, pwm_num):
"""Test for get_frequency"""
pwm_service.init_pwm(pwm_num)
frequency = pwm_service.get_frequency(pwm_num)
assert isinstance(frequency, float)

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_get_duty_cycle(pwm_service, pwm_num):
"""Test for get_duty_cycle"""
pwm_service.init_pwm(pwm_num)
duty_cycle = pwm_service.get_duty_cycle(pwm_num)
assert isinstance(duty_cycle, float)

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_get_polarity(pwm_service, pwm_num):
"""Test for get_polarity"""
pwm_service.init_pwm(pwm_num)
polarity = pwm_service.get_polarity(pwm_num)
assert polarity in Polarity

@pytest.mark.parametrize(
'pwm_num',
[
(PWMPins.PWM1),
(PWMPins.PWM2),
]
)
def test_get_enabled(pwm_service, pwm_num):
"""Test for get_enabled"""
pwm_service.init_pwm(pwm_num)
enabled = pwm_service.get_enabled(pwm_num)
assert isinstance(enabled, bool)

def test_with_edgepi(pwm_service):
"""Manual testing of PWM functionality."""
pwm_service.init_pwm(pwm_num=PWMPins.PWM1)
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0)
pwm_service.enable(PWMPins.PWM1)
time.sleep(3)
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=0.5)
time.sleep(3)
pwm_service.set_config(pwm_num=PWMPins.PWM1, duty_cycle=1)

0 comments on commit aa20428

Please sign in to comment.