Skip to content

Commit

Permalink
PEP8 fixes, configurable retry timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
myhomeiot committed Mar 27, 2021
1 parent 4f3ef7e commit d9ded19
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 66 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[![Validate](https://github.com/myhomeiot/DahuaVTO/workflows/Validate/badge.svg)](https://github.com/myhomeiot/DahuaVTO/actions)

A Home Assistant custom integration for control Dahua VTO/VTH devices.

The following models are reported as working:
Expand Down Expand Up @@ -44,10 +46,11 @@ sensor:
- platform: dahua_vto
name: NAME_HERE
host: HOST_HERE
port: PORT_HERE optional, default is 5000
timeout: TIMEOUT_HERE optional, default 10
port: PORT_HERE optional, default 5000
username: USERNAME_HERE_OR_secrets.yaml
password: PASSWORD_HERE_OR_secrets.yaml
scan_interval: INTERVAL_HERE optional, default 60
scan_interval: SCAN_INTERVAL_HERE optional, default 60
```
Example:
Expand Down
10 changes: 7 additions & 3 deletions custom_components/dahua_vto/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
{
vol.Required(CONF_ENTITY_ID): cv.string,
vol.Required(CONF_CHANNEL): int,
vol.Optional(CONF_SHORT_NUMBER, default=DEFAULT_SHORT_NUMBER): cv.string,
vol.Optional(CONF_SHORT_NUMBER,
default=DEFAULT_SHORT_NUMBER): cv.string,
}
)

Expand All @@ -32,14 +33,17 @@ async def service_open_door(event):
if entity.protocol is None:
raise HomeAssistantError("not connected")
try:
return await entity.protocol.open_door(event.data[CONF_CHANNEL] - 1, event.data[CONF_SHORT_NUMBER])
return await entity.protocol.open_door(
event.data[CONF_CHANNEL] - 1,
event.data[CONF_SHORT_NUMBER])
except asyncio.TimeoutError:
raise HomeAssistantError("timeout")
else:
raise HomeAssistantError("entity not found")

hass.data.setdefault(DOMAIN, {})
hass.helpers.service.async_register_admin_service(
DOMAIN, SERVICE_OPEN_DOOR, service_open_door, schema=SERVICE_OPEN_DOOR_SCHEMA
DOMAIN, SERVICE_OPEN_DOOR, service_open_door,
schema=SERVICE_OPEN_DOOR_SCHEMA
)
return True
149 changes: 88 additions & 61 deletions custom_components/dahua_vto/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_NAME, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD
from homeassistant.const import CONF_NAME, CONF_HOST, CONF_PORT, \
CONF_USERNAME, CONF_PASSWORD, CONF_TIMEOUT

DOMAIN = "dahua_vto"
DAHUA_PROTO_DHIP = 0x5049484400000020
DAHUA_HEADER_FORMAT = "<QLLQQ"
DAHUA_REALM_DHIP = 268632079 # DHIP REALM Login Challenge
DAHUA_LOGIN_PARAMS = {
"clientType": "", "ipAddr": "(null)", "loginType": "Direct"}

DEFAULT_NAME = "Dahua VTO"
DEFAULT_PORT = 5000
DEFAULT_TIMEOUT = 10

_LOGGER = logging.getLogger(__name__)

Expand All @@ -26,12 +30,15 @@
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(CONF_HOST): cv.string,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.positive_int,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
})


async def async_setup_platform(hass, config, add_entities, discovery_info=None):
async def async_setup_platform(
hass, config, add_entities, discovery_info=None
):
"""Set up the sensor platform."""
name = config[CONF_NAME]
entity = DahuaVTO(hass, name, config)
Expand Down Expand Up @@ -64,7 +71,49 @@ def __init__(self, hass, name, username, password, on_connection_lost):

def connection_made(self, transport):
self.transport = transport
self.send({"method": "global.login", "params": {"clientType": "", "ipAddr": "(null)", "loginType": "Direct"}})
self.send({"method": "global.login", "params": DAHUA_LOGIN_PARAMS})

def connection_lost(self, exc):
if self.heartbeat is not None:
self.heartbeat.cancel()
self.heartbeat = None
if not self.on_connection_lost.done():
self.on_connection_lost.set_result(True)

def hashed_password(self, random, realm):
h = hashlib.md5(f"{self.username}:{realm}:{self.password}".encode(
"utf-8")).hexdigest().upper()
return hashlib.md5(f"{self.username}:{random}:{h}".encode(
"utf-8")).hexdigest().upper()

def receive(self, message):
params = message.get("params")
error = message.get("error")

if error is not None:
if error["code"] == DAHUA_REALM_DHIP:
self.sessionId = message["session"]
login = DAHUA_LOGIN_PARAMS
login["userName"] = self.username
login["password"] = self.hashed_password(
params["random"], params["realm"])
self.send({"method": "global.login", "params": login})
else:
raise Exception("{}: {}".format(
error.get("code"), error.get("message")))
elif message["id"] == 2:
self.keepAliveInterval = params.get("keepAliveInterval")
if self.keepAliveInterval is None:
raise Exception("keepAliveInterval")
if self.heartbeat is not None:
raise Exception("Heartbeat already run")
self.heartbeat = self.loop.create_task(self.heartbeat_loop())
self.send({"method": "eventManager.attach",
"params": {"codes": ["All"]}})
elif message.get("method") == "client.notifyEventStream":
for message in params.get("eventList"):
message["name"] = self.name
self.hass.bus.fire(DOMAIN, message)

def data_received(self, data):
try:
Expand All @@ -75,13 +124,13 @@ def data_received(self, data):
if self.chunk_remaining > 0:
return
elif self.chunk_remaining < 0:
raise Exception(f"Protocol error, remaining bytes {self.chunk_remaining}")
raise Exception(f"Remaining bytes {self.chunk_remaining}")
packet = self.chunk
self.chunk = None
else:
header = struct.unpack(DAHUA_HEADER_FORMAT, data[0:32])
if header[0] != DAHUA_PROTO_DHIP:
raise Exception("Protocol error, wrong proto")
raise Exception("Wrong proto")
packet = data[32:].decode("utf-8", "ignore")
if header[4] > len(packet):
self.chunk = packet
Expand All @@ -90,53 +139,26 @@ def data_received(self, data):

_LOGGER.debug("<<< {}".format(packet.strip("\n")))
message = json.loads(packet)
message_id = message.get("id")

if self.on_response is not None and self.on_response_id == message_id:
if self.on_response is not None \
and self.on_response_id == message["id"]:
self.on_response.set_result(message)
return

params = message.get("params")
error = message.get("error")

if error is not None:
if error.get("code") == DAHUA_REALM_DHIP:
self.sessionId = message.get("session")
self.send({"method": "global.login", "params": {"clientType": "", "ipAddr": "(null)",
"loginType": "Direct", "userName": self.username,
"password": hashlib.md5("{}:{}:{}".format(self.username, params.get("random"),
hashlib.md5("{}:{}:{}".format(self.username, params.get("realm"), self.password).encode("utf-8")).hexdigest().upper()).encode("utf-8")).hexdigest().upper()}})
else:
raise Exception("{}: {}".format(error.get("code"), error.get("message")))
elif message_id == 2:
self.keepAliveInterval = params.get("keepAliveInterval")
if self.keepAliveInterval is None or self.heartbeat is not None:
raise Exception("No keepAliveInterval or heartbeat already run")
self.heartbeat = self.loop.create_task(self.heartbeat_loop())
self.send({"method": "eventManager.attach", "params": {"codes": ["All"]}})
elif message.get("method") == "client.notifyEventStream":
for message in params.get("eventList"):
message["name"] = self.name
self.hass.bus.fire(DOMAIN, message)
else:
self.receive(message)
except Exception as e:
self.on_connection_lost.set_exception(e)

def connection_lost(self, exc):
if self.heartbeat is not None:
self.heartbeat.cancel()
self.heartbeat = None
if not self.on_connection_lost.done():
self.on_connection_lost.set_result(True)

def send(self, message):
self.request_id += 1
# Removed: "magic": DAHUA_MAGIC ("0x1234")
message["id"] = self.request_id
message["session"] = self.sessionId
data = json.dumps(message, separators=(',', ':'))
_LOGGER.debug(f">>> {data}")
self.transport.write(struct.pack(DAHUA_HEADER_FORMAT, DAHUA_PROTO_DHIP, self.sessionId, self.request_id,
len(data), len(data)) + data.encode("utf-8", "ignore"))
self.transport.write(
struct.pack(DAHUA_HEADER_FORMAT, DAHUA_PROTO_DHIP,
self.sessionId, self.request_id, len(data), len(data))
+ data.encode("utf-8", "ignore"))
return self.request_id

async def command(self, message):
Expand All @@ -148,30 +170,31 @@ async def command(self, message):
self.on_response = self.on_response_id = None

async def open_door(self, channel, short_number):
object_id = (await self.command({"method": "accessControl.factory.instance",
"params": {"channel": channel}})).get("result")
if object_id:
object_id = await self.command({
"method": "accessControl.factory.instance",
"params": {"channel": channel}})
if object_id.get("result"):
try:
await self.command({"method": "accessControl.openDoor", "object": object_id,
await self.command({
"method": "accessControl.openDoor", "object": object_id,
"params": {"DoorIndex": 0, "ShortNumber": short_number}})
finally:
await self.command({"method": "accessControl.destroy", "object": object_id})
# Examples:
# {"method": "accessControl.getDoorStatus", "object": object_id, "params": {"DoorState": 1}}
# {"method": "magicBox.getSystemInfo"}
# {"method": "system.methodHelp", "params": {"methodName": methodName}}
# {"method": "system.methodSignature", "params": {"methodName": methodName}}
# {"method": "system.listService"}
await self.command({
"method": "accessControl.destroy", "object": object_id})

async def heartbeat_loop(self):
result = await self.command({"method": "magicBox.getSystemInfo"})
if result.get("result"):
params = result.get("params")
self.attrs = {"deviceType": params.get("deviceType"), "serialNumber": params.get("serialNumber")}
self.attrs = {"deviceType": params.get("deviceType"),
"serialNumber": params.get("serialNumber")}
while True:
try:
await asyncio.sleep(self.keepAliveInterval)
await self.command({"method": "global.keepAlive", "params": {"timeout": self.keepAliveInterval, "action": True}})
await self.command({
"method": "global.keepAlive",
"params": {"timeout": self.keepAliveInterval,
"action": True}})
except asyncio.CancelledError:
raise
except Exception:
Expand All @@ -196,24 +219,28 @@ def __init__(self, hass, name, config):
async def async_run(self):
while True:
try:
_LOGGER.debug(f"Connecting {self.config[CONF_HOST]}:{self.config[CONF_PORT]}, username {self.config[CONF_USERNAME]}")
_LOGGER.debug("Connecting {}:{}, username {}".format(
self.config[CONF_HOST], self.config[CONF_PORT],
self.config[CONF_USERNAME]))
on_connection_lost = self.hass.loop.create_future()
transport, self.protocol = await self.hass.loop.create_connection(lambda: DahuaVTOClient(self.hass,
self._name, self.config[CONF_USERNAME], self.config[CONF_PASSWORD], on_connection_lost),
t, self.protocol = await self.hass.loop.create_connection(
lambda: DahuaVTOClient(
self.hass, self._name, self.config[CONF_USERNAME],
self.config[CONF_PASSWORD], on_connection_lost),
self.config[CONF_HOST], self.config[CONF_PORT])
try:
await on_connection_lost
raise Exception("Connection closed")
finally:
self.protocol = None
transport.close()
t.close()
await asyncio.sleep(1)
_LOGGER.error(f"{self.name}: Reconnect")
await asyncio.sleep(5)
except asyncio.CancelledError:
raise
except Exception as e:
_LOGGER.error(f"{self.name}: {e}")
await asyncio.sleep(30)
_LOGGER.error("{}: {}, retry in {} seconds".format(
self.name, e, self.config[CONF_TIMEOUT]))
await asyncio.sleep(self.config[CONF_TIMEOUT])

@property
def should_poll(self) -> bool:
Expand Down

0 comments on commit d9ded19

Please sign in to comment.