-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Issues with new boolean cover implementation #489
Comments
Just deleted and re-added the device to try again and I couldn't reproduce issues 1 and 3 above. Issue 2 (entity goes immediately into "Closed" state) still happens though. |
The second time I went through manual configuration, but didn't change any of the default settings. Here is the diagnostics for the second time: localtuya-01JHPZCGJGTFRBJY1GS78BRZ12-Garage door -b0cd8b7e5ad5a3ce7ba20da46c5ad83e (1).json |
Enable the debug for integration and device and do the same test you did above close/open and post the logs here. |
Logs from adding the device:
|
Logs from using the Cloud entity's card to open and close the garage door:
|
Logs from using the local entity's card to open and close the garage:
|
❤️ |
This issue was closed because it was resolved on the release: 2025.1.1 |
Try with latest update, reconfigure is required since current position now support boolean state "DPID 3 for your garage" edit: I think the issue that it's instantly return "closed" or "open" is still present can you confirm it? |
Thanks again for tackling these issues so quickly. This update seems to be a bit of a regression, though. I tried both DPID 1 and DPID 3 and neither are working correctly. With DPID 1 (the default), the state never transitions from "Closed". Clicking on the open button will open the garage door, but the close button is never enabled as a result. With DPID 3, clicking on the open button doesn't do anything and the state is also never updated. |
Here are logs for DPID 3 (for adding the device and clicking on the open button, which doesn't seem trigger the door to open):
|
Here are logs for DPID 1 (for adding the device and clicking on the open button, which does open the garage door, but the state is still "Closed", I had to use the Cloud version of the entity to close the door):
|
Try reconfigure the device and check the "position inverted" option. |
Same behavior (the button doesn't do anything). Here's the diagnostic for this device. localtuya-01JHPZCGJGTFRBJY1GS78BRZ12-Garage door -b0cd8b7e5ad5a3ce7ba20da46c5ad83e (2).json |
DPID 3 is the correct DP for the current position field, but it seems I missed something since it doesn't update instantly and only update if garage fully closed. |
Let me know if there is more data or testing I can help provide. |
I downgraded back to |
If there is other information that I can log to help with this, please let me know. I'm pretty comfortable with Python. I'm just not at all familiar with the HA platform or set of APIs, though. |
Speaking of which, should I be using a specific positioning mode and set anything for the |
The current position should be To clarify this can be fixed by assigning is_closed to true or false for "current position" but this way "open/closing" state won't works. and the state of garage would be only close/open, I can see where is the issue and it's fixable. |
Is DP 3 above the same as
If so, then yes, it becomes |
I think I completely misunderstood this. I thought you meant that I should change the main
Nope, with this change, the state doesn't change to "Closed" until the door is fully closed! |
Hello can you test this changes. copy the code block below and replace it with everything in HA config -> cover.py
"""Platform to locally control Tuya-based cover devices."""
import asyncio
import logging
import time
from functools import partial
import voluptuous as vol
from homeassistant.components.cover import (
ATTR_POSITION,
DOMAIN,
CoverEntityFeature,
CoverEntity,
DEVICE_CLASSES_SCHEMA,
)
from homeassistant.const import CONF_DEVICE_CLASS
from .config_flow import col_to_select
from .entity import LocalTuyaEntity, async_setup_entry
from .const import (
CONF_COMMANDS_SET,
CONF_CURRENT_POSITION_DP,
CONF_POSITION_INVERTED,
CONF_POSITIONING_MODE,
CONF_SET_POSITION_DP,
CONF_SPAN_TIME,
CONF_STOP_SWITCH_DP,
)
# cover states.
STATE_OPENING = "opening"
STATE_CLOSING = "closing"
STATE_STOPPED = "stopped"
STATE_SET_CMD = "moving"
STATE_SET_OPENING = "set_opeing"
STATE_SET_CLOSING = "set_closing"
_LOGGER = logging.getLogger(__name__)
COVER_COMMANDS = {
"Open, Close and Stop": "open_close_stop",
"Open, Close and Continue": "open_close_continue",
"ON, OFF and Stop": "on_off_stop",
"fz, zz and Stop": "fz_zz_stop",
"zz, fz and Stop": "zz_fz_stop",
"1, 2 and 3": "1_2_3",
"0, 1 and 2": "0_1_2",
}
MODE_NONE = "none"
MODE_SET_POSITION = "position"
MODE_TIME_BASED = "timed"
COVER_MODES = {
"Neither": MODE_NONE,
"Set Position": MODE_SET_POSITION,
"Time Based": MODE_TIME_BASED,
}
COVER_TIMEOUT_TOLERANCE = 3.0
DEF_CMD_SET = list(COVER_COMMANDS.values())[0]
DEF_POS_MODE = list(COVER_MODES.values())[0]
DEFAULT_SPAN_TIME = 25.0
def flow_schema(dps):
"""Return schema used in config flow."""
return {
vol.Optional(CONF_COMMANDS_SET, default=DEF_CMD_SET): col_to_select(
COVER_COMMANDS
),
vol.Optional(CONF_POSITIONING_MODE, default=DEF_POS_MODE): col_to_select(
COVER_MODES
),
vol.Optional(CONF_CURRENT_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_SET_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_POSITION_INVERTED, default=False): bool,
vol.Optional(CONF_SPAN_TIME, default=DEFAULT_SPAN_TIME): vol.All(
vol.Coerce(float), vol.Range(min=1.0, max=300.0)
),
vol.Optional(CONF_STOP_SWITCH_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
}
class LocalTuyaCover(LocalTuyaEntity, CoverEntity):
"""Tuya cover device."""
def __init__(self, device, config_entry, switchid, **kwargs):
"""Initialize a new LocalTuyaCover."""
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs)
commands_set = DEF_CMD_SET
if self.has_config(CONF_COMMANDS_SET):
commands_set = self._config[CONF_COMMANDS_SET]
self._open_cmd = commands_set.split("_")[0]
self._close_cmd = commands_set.split("_")[1]
self._stop_cmd = commands_set.split("_")[2]
self._timer_start = time.time()
self._state = self._stop_cmd
self._previous_state = self._state
self._current_cover_position = 0
self._current_state_action = STATE_STOPPED # Default.
self._set_new_position = int | None
self._stop_switch = self._config.get(CONF_STOP_SWITCH_DP, None)
self._position_inverted = self._config.get(CONF_POSITION_INVERTED)
self._current_task = None
@property
def supported_features(self):
"""Flag supported features."""
supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
if not isinstance(self._open_cmd, bool):
supported_features |= CoverEntityFeature.STOP
if self._config[CONF_POSITIONING_MODE] != MODE_NONE:
supported_features |= CoverEntityFeature.SET_POSITION
return supported_features
@property
def _current_state(self) -> str:
"""Return the current state of the cover."""
state = self._current_state_action
curr_pos = self._current_cover_position
# Reset STATE when cover is fully closed or fully opened.
if state in (STATE_CLOSING, STATE_OPENING) and curr_pos in (0, 100):
return STATE_STOPPED
if state in (STATE_SET_CLOSING, STATE_SET_OPENING):
set_pos = self._set_new_position
# Reset state whenn cover reached the position.
if curr_pos - set_pos < 5 and curr_pos - set_pos >= -5:
return STATE_STOPPED
return self._current_state_action
@property
def current_cover_position(self):
"""Return current cover position in percent."""
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self._current_cover_position
@property
def is_opening(self):
"""Return if cover is opening."""
return self._current_state in (STATE_OPENING, STATE_SET_OPENING)
@property
def is_closing(self):
"""Return if cover is closing."""
return self._current_state in (STATE_CLOSING, STATE_SET_CLOSING)
@property
def is_closed(self):
"""Return if the cover is closed or not."""
if isinstance(self._open_cmd, (bool, str)):
return self._current_cover_position == 0
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self.current_cover_position == 0 and self._current_state == STATE_STOPPED
async def async_set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
# Update device values IF the device is moving at the moment.
if self._current_state != STATE_STOPPED:
await self.async_stop_cover()
self.debug("Setting cover position: %r", kwargs[ATTR_POSITION])
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
newpos = float(kwargs[ATTR_POSITION])
currpos = self.current_cover_position
posdiff = abs(newpos - currpos)
mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME]
if newpos > currpos:
self.debug("Opening to %f: delay %f", newpos, mydelay)
await self.async_open_cover(delay=mydelay)
self.update_state(STATE_OPENING)
else:
self.debug("Closing to %f: delay %f", newpos, mydelay)
await self.async_close_cover(delay=mydelay)
self.update_state(STATE_CLOSING)
self.debug("Done")
elif self._config[CONF_POSITIONING_MODE] == MODE_SET_POSITION:
converted_position = int(kwargs[ATTR_POSITION])
if self._position_inverted:
converted_position = 100 - converted_position
if 0 <= converted_position <= 100 and self.has_config(CONF_SET_POSITION_DP):
await self._device.set_dp(
converted_position, self._config[CONF_SET_POSITION_DP]
)
# Give it a moment, to make sure hass updated current pos.
await asyncio.sleep(0.1)
self.update_state(STATE_SET_CMD, int(kwargs[ATTR_POSITION]))
async def async_stop_after_timeout(self, delay_sec):
"""Stop the cover if timeout (max movement span) occurred."""
try:
await asyncio.sleep(delay_sec)
self._current_task = None
await self.async_stop_cover()
except asyncio.CancelledError:
self._current_task = None
async def async_open_cover(self, **kwargs):
"""Open the cover."""
self.debug("Launching command %s to cover ", self._open_cmd)
await self._device.set_dp(self._open_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs):
"""Close cover."""
self.debug("Launching command %s to cover ", self._close_cmd)
await self._device.set_dp(self._close_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_CLOSING)
async def async_stop_cover(self, **kwargs):
"""Stop the cover."""
if self._current_task is not None:
self._current_task.cancel()
self.debug("Launching command %s to cover ", self._stop_cmd)
command = {self._dp_id: self._stop_cmd}
if self._stop_switch is not None:
command[self._stop_switch] = True
await self._device.set_dps(command)
self.update_state(STATE_STOPPED)
def status_restored(self, stored_state):
"""Restore the last stored cover status."""
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
stored_pos = stored_state.attributes.get("current_position")
if stored_pos is not None:
self._current_cover_position = stored_pos
self.debug("Restored cover position %s", self._current_cover_position)
def connection_made(self):
super().connection_made()
match self.dp_value(self._dp_id):
case str() as i if i.isupper():
self._open_cmd = self._open_cmd.upper()
self._close_cmd = self._close_cmd.upper()
self._stop_cmd = self._stop_cmd.upper()
case bool():
self._open_cmd = True
self._close_cmd = False
def status_updated(self):
"""Device status was updated."""
self._previous_state = self._state
self._state = self.dp_value(self._dp_id)
if self.has_config(CONF_CURRENT_POSITION_DP):
curr_pos = self.dp_value(CONF_CURRENT_POSITION_DP)
if isinstance(curr_pos, (bool, str)):
closed = curr_pos in (True, "fully_close")
stopped = self._previous_state == self._state
curr_pos = 0 if stopped and closed else (100 if stopped else 50)
if self._position_inverted:
curr_pos = 100 - curr_pos
self._current_cover_position = curr_pos
if (
self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED
and self._state != self._previous_state
):
if self._previous_state != self._stop_cmd:
# the state has changed, and the cover was moving
time_diff = time.time() - self._timer_start
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0)
if self._previous_state == self._close_cmd:
pos_diff = -pos_diff
self._current_cover_position = min(
100, max(0, self._current_cover_position + pos_diff)
)
change = "stopped" if self._state == self._stop_cmd else "inverted"
self.debug(
"Movement %s after %s sec., position difference %s",
change,
time_diff,
pos_diff,
)
# store the time of the last movement change
self._timer_start = time.time()
# Keep record in last_state as long as not during connection/re-connection,
# as last state will be used to restore the previous state
if (self._state is not None) and (not self._device.is_connecting):
self._last_state = self._state
def update_state(self, action, position=None):
"""Update cover current states."""
state = self._current_state_action
# using Commands.
if position is None:
self._current_state_action = action
# Set position cmd, check if target position weither close or open
if action == STATE_SET_CMD and position is not None:
curr_pos = self.current_cover_position
self._set_new_position = position
pos_diff = position - curr_pos
# Prevent stuck state when interrupted on middle of cmd
if state == STATE_STOPPED:
if pos_diff > 0:
self._current_state_action = STATE_SET_OPENING
elif pos_diff < 0:
self._current_state_action = STATE_SET_CLOSING
else:
self._current_state_action = STATE_STOPPED
# Write state data.
self.async_write_ha_state()
async_setup_entry = partial(async_setup_entry, DOMAIN, LocalTuyaCover, flow_schema) |
Yeah, but I won't be able to test until tomorrow. |
Just tested this out and it's much improved! I re-added the device using automatic entity creation. The buttons to open and close the door work correctly and the states are showing up correctly. Even the Screen.Recording.2025-01-21.at.11.23.03.AM.movThere's just one minor bug. When the device was first created, the entity was in an |
Oh, also looks like every time I restart HA, the entity goes back into an |
Have you check invert the set position from localtuya entity configuration, if checking it correct the issue I'll merge the changes. |
I just re-created the device and manually created the entity with the same config as above, but the entity still defaults to an |
I went back to reconfigure it and tried both inverted on and off, but the state remains |
Here are some logs for when the device is added:
|
I know what happen here Thanks for feedback. |
Can you test this. cover.py
"""Platform to locally control Tuya-based cover devices."""
import asyncio
import logging
import time
from functools import partial
import voluptuous as vol
from homeassistant.components.cover import (
ATTR_POSITION,
DOMAIN,
CoverEntityFeature,
CoverEntity,
DEVICE_CLASSES_SCHEMA,
)
from homeassistant.const import CONF_DEVICE_CLASS
from .config_flow import col_to_select
from .entity import LocalTuyaEntity, async_setup_entry
from .const import (
CONF_COMMANDS_SET,
CONF_CURRENT_POSITION_DP,
CONF_POSITION_INVERTED,
CONF_POSITIONING_MODE,
CONF_SET_POSITION_DP,
CONF_SPAN_TIME,
CONF_STOP_SWITCH_DP,
)
# cover states.
STATE_OPENING = "opening"
STATE_CLOSING = "closing"
STATE_STOPPED = "stopped"
STATE_SET_CMD = "moving"
STATE_SET_OPENING = "set_opeing"
STATE_SET_CLOSING = "set_closing"
_LOGGER = logging.getLogger(__name__)
COVER_COMMANDS = {
"Open, Close and Stop": "open_close_stop",
"Open, Close and Continue": "open_close_continue",
"ON, OFF and Stop": "on_off_stop",
"fz, zz and Stop": "fz_zz_stop",
"zz, fz and Stop": "zz_fz_stop",
"1, 2 and 3": "1_2_3",
"0, 1 and 2": "0_1_2",
}
MODE_NONE = "none"
MODE_SET_POSITION = "position"
MODE_TIME_BASED = "timed"
COVER_MODES = {
"Neither": MODE_NONE,
"Set Position": MODE_SET_POSITION,
"Time Based": MODE_TIME_BASED,
}
COVER_TIMEOUT_TOLERANCE = 3.0
DEF_CMD_SET = list(COVER_COMMANDS.values())[0]
DEF_POS_MODE = list(COVER_MODES.values())[0]
DEFAULT_SPAN_TIME = 25.0
def flow_schema(dps):
"""Return schema used in config flow."""
return {
vol.Optional(CONF_COMMANDS_SET, default=DEF_CMD_SET): col_to_select(
COVER_COMMANDS
),
vol.Optional(CONF_POSITIONING_MODE, default=DEF_POS_MODE): col_to_select(
COVER_MODES
),
vol.Optional(CONF_CURRENT_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_SET_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_POSITION_INVERTED, default=False): bool,
vol.Optional(CONF_SPAN_TIME, default=DEFAULT_SPAN_TIME): vol.All(
vol.Coerce(float), vol.Range(min=1.0, max=300.0)
),
vol.Optional(CONF_STOP_SWITCH_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
}
class LocalTuyaCover(LocalTuyaEntity, CoverEntity):
"""Tuya cover device."""
def __init__(self, device, config_entry, switchid, **kwargs):
"""Initialize a new LocalTuyaCover."""
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs)
commands_set = DEF_CMD_SET
if self.has_config(CONF_COMMANDS_SET):
commands_set = self._config[CONF_COMMANDS_SET]
self._open_cmd = commands_set.split("_")[0]
self._close_cmd = commands_set.split("_")[1]
self._stop_cmd = commands_set.split("_")[2]
self._timer_start = time.time()
self._state = None
self._previous_state = None
self._current_cover_position = 0
self._current_state_action = STATE_STOPPED # Default.
self._set_new_position = int | None
self._stop_switch = self._config.get(CONF_STOP_SWITCH_DP)
self._position_inverted = self._config.get(CONF_POSITION_INVERTED)
self._current_task = None
@property
def supported_features(self):
"""Flag supported features."""
supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
if not isinstance(self._open_cmd, bool):
supported_features |= CoverEntityFeature.STOP
if self._config[CONF_POSITIONING_MODE] != MODE_NONE:
supported_features |= CoverEntityFeature.SET_POSITION
return supported_features
@property
def _current_state(self) -> str:
"""Return the current state of the cover."""
state = self._current_state_action
curr_pos = self._current_cover_position
# Reset STATE when cover is fully closed or fully opened.
if (state == STATE_CLOSING and curr_pos == 0) or (
state == STATE_OPENING and curr_pos == 100
):
return STATE_STOPPED
if state in (STATE_SET_CLOSING, STATE_SET_OPENING):
set_pos = self._set_new_position
# Reset state whenn cover reached the position.
if curr_pos - set_pos < 5 and curr_pos - set_pos >= -5:
return STATE_STOPPED
return self._current_state_action
@property
def current_cover_position(self):
"""Return current cover position in percent."""
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self._current_cover_position
@property
def is_opening(self):
"""Return if cover is opening."""
return self._current_state in (STATE_OPENING, STATE_SET_OPENING)
@property
def is_closing(self):
"""Return if cover is closing."""
return self._current_state in (STATE_CLOSING, STATE_SET_CLOSING)
@property
def is_closed(self):
"""Return if the cover is closed or not."""
if isinstance(self._open_cmd, (bool, str)):
return self._current_cover_position == 0
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self.current_cover_position == 0 and self._current_state == STATE_STOPPED
async def async_set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
# Update device values IF the device is moving at the moment.
if self._current_state != STATE_STOPPED:
await self.async_stop_cover()
self.debug("Setting cover position: %r", kwargs[ATTR_POSITION])
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
newpos = float(kwargs[ATTR_POSITION])
currpos = self.current_cover_position
posdiff = abs(newpos - currpos)
mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME]
if newpos > currpos:
self.debug("Opening to %f: delay %f", newpos, mydelay)
await self.async_open_cover(delay=mydelay)
self.update_state(STATE_OPENING)
else:
self.debug("Closing to %f: delay %f", newpos, mydelay)
await self.async_close_cover(delay=mydelay)
self.update_state(STATE_CLOSING)
self.debug("Done")
elif self._config[CONF_POSITIONING_MODE] == MODE_SET_POSITION:
converted_position = int(kwargs[ATTR_POSITION])
if self._position_inverted:
converted_position = 100 - converted_position
if 0 <= converted_position <= 100 and self.has_config(CONF_SET_POSITION_DP):
await self._device.set_dp(
converted_position, self._config[CONF_SET_POSITION_DP]
)
# Give it a moment, to make sure hass updated current pos.
await asyncio.sleep(0.1)
self.update_state(STATE_SET_CMD, int(kwargs[ATTR_POSITION]))
async def async_stop_after_timeout(self, delay_sec):
"""Stop the cover if timeout (max movement span) occurred."""
try:
await asyncio.sleep(delay_sec)
self._current_task = None
await self.async_stop_cover()
except asyncio.CancelledError:
self._current_task = None
async def async_open_cover(self, **kwargs):
"""Open the cover."""
self.debug("Launching command %s to cover ", self._open_cmd)
await self._device.set_dp(self._open_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs):
"""Close cover."""
self.debug("Launching command %s to cover ", self._close_cmd)
await self._device.set_dp(self._close_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_CLOSING)
async def async_stop_cover(self, **kwargs):
"""Stop the cover."""
if self._current_task is not None:
self._current_task.cancel()
self.debug("Launching command %s to cover ", self._stop_cmd)
command = {self._dp_id: self._stop_cmd}
if self._stop_switch is not None:
command[self._stop_switch] = True
await self._device.set_dps(command)
self.update_state(STATE_STOPPED)
def status_restored(self, stored_state):
"""Restore the last stored cover status."""
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
stored_pos = stored_state.attributes.get("current_position")
if stored_pos is not None:
self._current_cover_position = stored_pos
self.debug("Restored cover position %s", self._current_cover_position)
def connection_made(self):
super().connection_made()
match self.dp_value(self._dp_id):
case str() as i if i.isupper():
self._open_cmd = self._open_cmd.upper()
self._close_cmd = self._close_cmd.upper()
self._stop_cmd = self._stop_cmd.upper()
case bool():
self._open_cmd = True
self._close_cmd = False
def status_updated(self):
"""Device status was updated."""
self._previous_state = self._state
self._state = self.dp_value(self._dp_id)
if self.has_config(CONF_CURRENT_POSITION_DP):
curr_pos = self.dp_value(CONF_CURRENT_POSITION_DP)
if isinstance(curr_pos, (bool, str)):
closed = curr_pos in (True, "fully_close")
stopped = (
self._previous_state is None or self._previous_state == self._state
)
curr_pos = 0 if stopped and closed else (100 if stopped else 50)
if self._position_inverted:
curr_pos = 100 - curr_pos
self._current_cover_position = curr_pos
if (
self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED
and self._state != self._previous_state
):
if self._previous_state != self._stop_cmd:
# the state has changed, and the cover was moving
time_diff = time.time() - self._timer_start
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0)
if self._previous_state == self._close_cmd:
pos_diff = -pos_diff
self._current_cover_position = min(
100, max(0, self._current_cover_position + pos_diff)
)
change = "stopped" if self._state == self._stop_cmd else "inverted"
self.debug(
"Movement %s after %s sec., position difference %s",
change,
time_diff,
pos_diff,
)
# store the time of the last movement change
self._timer_start = time.time()
# Keep record in last_state as long as not during connection/re-connection,
# as last state will be used to restore the previous state
if (self._state is not None) and (not self._device.is_connecting):
self._last_state = self._state
def update_state(self, action, position=None):
"""Update cover current states."""
state = self._current_state_action
# using Commands.
if position is None:
self._current_state_action = action
# Set position cmd, check if target position weither close or open
if action == STATE_SET_CMD and position is not None:
curr_pos = self.current_cover_position
self._set_new_position = position
pos_diff = position - curr_pos
# Prevent stuck state when interrupted on middle of cmd
if state == STATE_STOPPED:
if pos_diff > 0:
self._current_state_action = STATE_SET_OPENING
elif pos_diff < 0:
self._current_state_action = STATE_SET_CLOSING
else:
self._current_state_action = STATE_STOPPED
# Write state data.
self.async_write_ha_state()
async_setup_entry = partial(async_setup_entry, DOMAIN, LocalTuyaCover, flow_schema) |
Re-created the device using automatic entity creation and it works perfectly! The initial state is correctly reflected. I even restarted HA while the door was open and when HA came back up, the door was correctly in the |
Just noticed a weird issue this morning. Opening and closing the door with the HA entity's buttons works fine. However, opening and closing the door with the physical door controls is off. When I close the garage door with the physical switch, the entity goes into |
Yeah, definitely some weird behavior. Operating the door with the entity buttons work perfectly fine. However, operating the door with the physical garage door button seems inverted. Here's a video showing the entity state through that process: Screen.Recording.2025-01-22.at.10.59.41.AM.movAt 1s and 11s, I'm opening and closing the door with the entity button and that works fine. At 27s, I'm opening the door with the physical button and the entity enters a This is with the inverted option checked. I tried again with the option unchecked and it's even worse (the states are always wrong). |
Here are the logs for that entire test (opening and closing with the entity buttons and then opening and closing with the physical button):
|
Tested |
The issue is that the garage doesn't support opening/closing it's only supports open/closed, try this. cover.py
"""Platform to locally control Tuya-based cover devices."""
import asyncio
import logging
import time
from functools import partial
import voluptuous as vol
from homeassistant.components.cover import (
ATTR_POSITION,
DOMAIN,
CoverEntityFeature,
CoverEntity,
DEVICE_CLASSES_SCHEMA,
)
from homeassistant.const import CONF_DEVICE_CLASS
from .config_flow import col_to_select
from .entity import LocalTuyaEntity, async_setup_entry
from .const import (
CONF_COMMANDS_SET,
CONF_CURRENT_POSITION_DP,
CONF_POSITION_INVERTED,
CONF_POSITIONING_MODE,
CONF_SET_POSITION_DP,
CONF_SPAN_TIME,
CONF_STOP_SWITCH_DP,
)
# cover states.
STATE_OPENING = "opening"
STATE_CLOSING = "closing"
STATE_STOPPED = "stopped"
STATE_SET_CMD = "moving"
STATE_SET_OPENING = "set_opeing"
STATE_SET_CLOSING = "set_closing"
_LOGGER = logging.getLogger(__name__)
COVER_COMMANDS = {
"Open, Close and Stop": "open_close_stop",
"Open, Close and Continue": "open_close_continue",
"ON, OFF and Stop": "on_off_stop",
"fz, zz and Stop": "fz_zz_stop",
"zz, fz and Stop": "zz_fz_stop",
"1, 2 and 3": "1_2_3",
"0, 1 and 2": "0_1_2",
}
MODE_NONE = "none"
MODE_SET_POSITION = "position"
MODE_TIME_BASED = "timed"
COVER_MODES = {
"Neither": MODE_NONE,
"Set Position": MODE_SET_POSITION,
"Time Based": MODE_TIME_BASED,
}
COVER_TIMEOUT_TOLERANCE = 3.0
DEF_CMD_SET = list(COVER_COMMANDS.values())[0]
DEF_POS_MODE = list(COVER_MODES.values())[0]
DEFAULT_SPAN_TIME = 25.0
def flow_schema(dps):
"""Return schema used in config flow."""
return {
vol.Optional(CONF_COMMANDS_SET, default=DEF_CMD_SET): col_to_select(
COVER_COMMANDS
),
vol.Optional(CONF_POSITIONING_MODE, default=DEF_POS_MODE): col_to_select(
COVER_MODES
),
vol.Optional(CONF_CURRENT_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_SET_POSITION_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_POSITION_INVERTED, default=False): bool,
vol.Optional(CONF_SPAN_TIME, default=DEFAULT_SPAN_TIME): vol.All(
vol.Coerce(float), vol.Range(min=1.0, max=300.0)
),
vol.Optional(CONF_STOP_SWITCH_DP): col_to_select(dps, is_dps=True),
vol.Optional(CONF_DEVICE_CLASS): DEVICE_CLASSES_SCHEMA,
}
class LocalTuyaCover(LocalTuyaEntity, CoverEntity):
"""Tuya cover device."""
def __init__(self, device, config_entry, switchid, **kwargs):
"""Initialize a new LocalTuyaCover."""
super().__init__(device, config_entry, switchid, _LOGGER, **kwargs)
commands_set = DEF_CMD_SET
if self.has_config(CONF_COMMANDS_SET):
commands_set = self._config[CONF_COMMANDS_SET]
self._open_cmd = commands_set.split("_")[0]
self._close_cmd = commands_set.split("_")[1]
self._stop_cmd = commands_set.split("_")[2]
self._timer_start = time.time()
self._state = None
self._previous_state = None
self._current_cover_position = 0
self._current_state_action = STATE_STOPPED # Default.
self._set_new_position = int | None
self._stop_switch = self._config.get(CONF_STOP_SWITCH_DP)
self._position_inverted = self._config.get(CONF_POSITION_INVERTED)
self._current_task = None
@property
def supported_features(self):
"""Flag supported features."""
supported_features = CoverEntityFeature.OPEN | CoverEntityFeature.CLOSE
if not isinstance(self._open_cmd, bool):
supported_features |= CoverEntityFeature.STOP
if self._config[CONF_POSITIONING_MODE] != MODE_NONE:
supported_features |= CoverEntityFeature.SET_POSITION
return supported_features
@property
def _current_state(self) -> str:
"""Return the current state of the cover."""
state = self._current_state_action
curr_pos = self._current_cover_position
# Reset STATE when cover is fully closed or fully opened.
if (state == STATE_CLOSING and curr_pos == 0) or (
state == STATE_OPENING and curr_pos == 100
):
self._current_state_action = STATE_STOPPED
if state in (STATE_SET_CLOSING, STATE_SET_OPENING):
set_pos = self._set_new_position
# Reset state whenn cover reached the position.
if curr_pos - set_pos < 5 and curr_pos - set_pos >= -5:
self._current_state_action = STATE_STOPPED
return self._current_state_action
@property
def current_cover_position(self):
"""Return current cover position in percent."""
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self._current_cover_position
@property
def is_opening(self):
"""Return if cover is opening."""
return self._current_state in (STATE_OPENING, STATE_SET_OPENING)
@property
def is_closing(self):
"""Return if cover is closing."""
return self._current_state in (STATE_CLOSING, STATE_SET_CLOSING)
@property
def is_closed(self):
"""Return if the cover is closed or not."""
if isinstance(self._open_cmd, (bool, str)):
return self._current_cover_position == 0
if self._config[CONF_POSITIONING_MODE] == MODE_NONE:
return None
return self.current_cover_position == 0 and self._current_state == STATE_STOPPED
async def async_set_cover_position(self, **kwargs):
"""Move the cover to a specific position."""
# Update device values IF the device is moving at the moment.
if self._current_state != STATE_STOPPED:
await self.async_stop_cover()
self.debug("Setting cover position: %r", kwargs[ATTR_POSITION])
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
newpos = float(kwargs[ATTR_POSITION])
currpos = self.current_cover_position
posdiff = abs(newpos - currpos)
mydelay = posdiff / 100.0 * self._config[CONF_SPAN_TIME]
if newpos > currpos:
self.debug("Opening to %f: delay %f", newpos, mydelay)
await self.async_open_cover(delay=mydelay)
self.update_state(STATE_OPENING)
else:
self.debug("Closing to %f: delay %f", newpos, mydelay)
await self.async_close_cover(delay=mydelay)
self.update_state(STATE_CLOSING)
self.debug("Done")
elif self._config[CONF_POSITIONING_MODE] == MODE_SET_POSITION:
converted_position = int(kwargs[ATTR_POSITION])
if self._position_inverted:
converted_position = 100 - converted_position
if 0 <= converted_position <= 100 and self.has_config(CONF_SET_POSITION_DP):
await self._device.set_dp(
converted_position, self._config[CONF_SET_POSITION_DP]
)
# Give it a moment, to make sure hass updated current pos.
await asyncio.sleep(0.1)
self.update_state(STATE_SET_CMD, int(kwargs[ATTR_POSITION]))
async def async_stop_after_timeout(self, delay_sec):
"""Stop the cover if timeout (max movement span) occurred."""
try:
await asyncio.sleep(delay_sec)
self._current_task = None
await self.async_stop_cover()
except asyncio.CancelledError:
self._current_task = None
async def async_open_cover(self, **kwargs):
"""Open the cover."""
self.debug("Launching command %s to cover ", self._open_cmd)
await self._device.set_dp(self._open_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_OPENING)
async def async_close_cover(self, **kwargs):
"""Close cover."""
self.debug("Launching command %s to cover ", self._close_cmd)
await self._device.set_dp(self._close_cmd, self._dp_id)
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
if self._current_task is not None:
self._current_task.cancel()
# for timed positioning, stop the cover after a full opening timespan
# instead of waiting the internal timeout
self._current_task = self.hass.async_create_task(
self.async_stop_after_timeout(
kwargs.get(
"delay", self._config[CONF_SPAN_TIME] + COVER_TIMEOUT_TOLERANCE
)
)
)
self.update_state(STATE_CLOSING)
async def async_stop_cover(self, **kwargs):
"""Stop the cover."""
if self._current_task is not None:
self._current_task.cancel()
self.debug("Launching command %s to cover ", self._stop_cmd)
command = {self._dp_id: self._stop_cmd}
if self._stop_switch is not None:
command[self._stop_switch] = True
await self._device.set_dps(command)
self.update_state(STATE_STOPPED)
def status_restored(self, stored_state):
"""Restore the last stored cover status."""
if self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED:
stored_pos = stored_state.attributes.get("current_position")
if stored_pos is not None:
self._current_cover_position = stored_pos
self.debug("Restored cover position %s", self._current_cover_position)
def connection_made(self):
super().connection_made()
match self.dp_value(self._dp_id):
case str() as i if i.isupper():
self._open_cmd = self._open_cmd.upper()
self._close_cmd = self._close_cmd.upper()
self._stop_cmd = self._stop_cmd.upper()
case bool():
self._open_cmd = True
self._close_cmd = False
def status_updated(self):
"""Device status was updated."""
self._previous_state = self._state
self._state = self.dp_value(self._dp_id)
if self.has_config(CONF_CURRENT_POSITION_DP):
curr_pos = self.dp_value(CONF_CURRENT_POSITION_DP)
if isinstance(curr_pos, (bool, str)):
closed = curr_pos in (True, "fully_close")
stopped = (
self._previous_state is None or self._previous_state == self._state
)
curr_pos = 0 if stopped and closed else (100 if stopped else 50)
if self._position_inverted:
curr_pos = 100 - curr_pos
self._current_cover_position = curr_pos
if (
self._config[CONF_POSITIONING_MODE] == MODE_TIME_BASED
and self._state != self._previous_state
):
if self._previous_state != self._stop_cmd:
# the state has changed, and the cover was moving
time_diff = time.time() - self._timer_start
pos_diff = round(time_diff / self._config[CONF_SPAN_TIME] * 100.0)
if self._previous_state == self._close_cmd:
pos_diff = -pos_diff
self._current_cover_position = min(
100, max(0, self._current_cover_position + pos_diff)
)
change = "stopped" if self._state == self._stop_cmd else "inverted"
self.debug(
"Movement %s after %s sec., position difference %s",
change,
time_diff,
pos_diff,
)
# store the time of the last movement change
self._timer_start = time.time()
# Keep record in last_state as long as not during connection/re-connection,
# as last state will be used to restore the previous state
if (self._state is not None) and (not self._device.is_connecting):
self._last_state = self._state
def update_state(self, action, position=None):
"""Update cover current states."""
if (state := self._current_state_action) == action:
return
# using Commands.
if position is None:
self._current_state_action = action
# Set position cmd, check if target position weither close or open
if action == STATE_SET_CMD and position is not None:
curr_pos = self.current_cover_position
self._set_new_position = position
pos_diff = position - curr_pos
# Prevent stuck state when interrupted on middle of cmd
if state == STATE_STOPPED:
if pos_diff > 0:
self._current_state_action = STATE_SET_OPENING
elif pos_diff < 0:
self._current_state_action = STATE_SET_CLOSING
else:
self._current_state_action = STATE_STOPPED
# Write state data.
self.schedule_update_ha_state()
async_setup_entry = partial(async_setup_entry, DOMAIN, LocalTuyaCover, flow_schema) |
That last change worked! The door states are now in sync when using the physical button and the HA entity buttons still work correctly. Thanks for the quick fix! |
Been using this the past week and it's been really solid. I think these changes can be merged into main for the next release. Thanks! |
Well, this shouldn't affect localtuya , but try using cover services or ha native entity card instead of custom cards. |
LocalTuya Version
2025.1.1
Home Assistant Version
2025.1.2
Environment
What happened?
Thanks for addressing #482 so quickly! Local Tuya is working better with my garage opener, but there are still a couple of issues:
See attached video showing how the cloud and local entities are used and how they differ:
Screen.Recording.2025-01-17.at.10.23.39.AM.mov
Steps to reproduce.
Relevant log output
Diagnostics information.
localtuya-01JHPZCGJGTFRBJY1GS78BRZ12-Garage door -b0cd8b7e5ad5a3ce7ba20da46c5ad83e.json
The text was updated successfully, but these errors were encountered: