Skip to content

Commit

Permalink
Implement separate login for web mode #96
Browse files Browse the repository at this point in the history
  • Loading branch information
erikkastelec committed Jan 12, 2025
1 parent da39428 commit 3824eb4
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 33 deletions.
11 changes: 6 additions & 5 deletions custom_components/wemportal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import homeassistant.helpers.entity_registry as entity_registry
from homeassistant.helpers import device_registry as device_registry


def get_wemportal_unique_id(config_entry_id: str, device_id: str, name: str):
"""Return unique ID for WEM Portal."""
return f"{config_entry_id}:{device_id}:{name}"
Expand Down Expand Up @@ -87,16 +86,15 @@ async def get_integration_device_ids(hass, domain):

return device_ids


async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up the wemportal component."""
# Set proper update_interval, based on selected mode
if entry.options.get(CONF_MODE) == "web":
if entry.data.get(CONF_MODE) == "web":
update_interval = entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_CONF_SCAN_INTERVAL_VALUE
)

elif entry.options.get(CONF_MODE) == "api":
elif entry.data.get(CONF_MODE) == "api":
update_interval = entry.options.get(
CONF_SCAN_INTERVAL_API, DEFAULT_CONF_SCAN_INTERVAL_API_VALUE
)
Expand All @@ -119,7 +117,10 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:

# Creating API object
api = WemPortalApi(
entry.data.get(CONF_USERNAME), entry.data.get(CONF_PASSWORD), device_id, entry.options
entry.data.get(CONF_USERNAME),
entry.data.get(CONF_PASSWORD),
device_id,
config=entry.data
)
# Create custom coordinator
coordinator = WemPortalDataUpdateCoordinator(
Expand Down
55 changes: 36 additions & 19 deletions custom_components/wemportal/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
CONF_LANGUAGE,
CONF_MODE,
CONF_SCAN_INTERVAL_API,
DEFAULT_MODE,
AVAILABLE_MODES
)
from .exceptions import AuthError, UnknownAuthError

Expand All @@ -24,27 +26,41 @@
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): str,
vol.Required(CONF_MODE, default=DEFAULT_MODE): vol.In(AVAILABLE_MODES),
}
)


async def validate_input(hass: core.HomeAssistant, data):
"""Validate the user input allows us to connect."""

# Create API object
api = WemPortalApi(data[CONF_USERNAME], data[CONF_PASSWORD], "0000")

# Try to login
try:
await hass.async_add_executor_job(api.api_login)
except AuthError:
raise InvalidAuth from AuthError
except UnknownAuthError:
raise CannotConnect from UnknownAuthError
api = WemPortalApi(data[CONF_USERNAME], data[CONF_PASSWORD], "placeholder")

# Try mobile API login
if data[CONF_MODE] == "api" or data[CONF_MODE] == "both":
try:
await hass.async_add_executor_job(api.api_login)
except AuthError:
# If API login fails, try WEB API login
_LOGGER.warning("Mobile API login failed, trying web login...")
try:
await hass.async_add_executor_job(api.web_login)
except AuthError as exc:
raise InvalidAuth from exc
except Exception as exc:
raise CannotConnect from exc
except UnknownAuthError as exc:
raise CannotConnect from exc
elif data[CONF_MODE] == "web":
try:
await hass.async_add_executor_job(api.web_login)
except AuthError as exc:
raise InvalidAuth from exc
except Exception as exc:
raise CannotConnect from exc

return data


class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for wemportal."""

Expand Down Expand Up @@ -109,24 +125,25 @@ async def async_step_init(self, user_input=None):
step_id="init",
data_schema=vol.Schema(
{
vol.Optional(
vol.Required(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(CONF_SCAN_INTERVAL, 1800),
): config_validation.positive_int,
vol.Optional(
vol.Required(
CONF_SCAN_INTERVAL_API,
default=self.config_entry.options.get(
CONF_SCAN_INTERVAL_API, 300
),
): config_validation.positive_int,
vol.Optional(
vol.Required(
CONF_LANGUAGE,
default=self.config_entry.options.get(CONF_LANGUAGE, "en"),
): config_validation.string,
vol.Optional(
CONF_MODE,
default=self.config_entry.options.get(CONF_MODE, "api"),
): config_validation.string,

vol.Required(
CONF_MODE, default=self.config_entry.data.get(CONF_MODE, DEFAULT_MODE)
): vol.In(AVAILABLE_MODES),

}
),
)
2 changes: 2 additions & 0 deletions custom_components/wemportal/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
CONF_SCAN_INTERVAL_API: Final = "api_scan_interval"
CONF_LANGUAGE: Final = "language"
CONF_MODE: Final = "mode"
DEFAULT_MODE: Final = "api"
AVAILABLE_MODES: Final = ["api", "web", "both"]
PLATFORMS = ["number", "select", "sensor", "switch"]
REFRESH_WAIT_TIME: Final = 360
DATA_GATHERING_ERROR: Final = "An error occurred while gathering data.This issue should resolve by itself. If this problem persists,open an issue at https://github.com/erikkastelec/hass-WEM-Portal/issues"
Expand Down
2 changes: 1 addition & 1 deletion custom_components/wemportal/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def _async_update_data(self):
self.config_entry.data.get(CONF_USERNAME),
self.config_entry.data.get(CONF_PASSWORD),
"0000", # Doesn't matter as we will take it from existing data
self.config_entry.options,
config=self.config_entry.data,
existing_data=copy.deepcopy(self.api.data)
)
del self.api
Expand Down
104 changes: 96 additions & 8 deletions custom_components/wemportal/wemportalapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from datetime import datetime, timedelta

from bs4 import BeautifulSoup
import requests as reqs
import scrapyscript
from fuzzywuzzy import fuzz
Expand Down Expand Up @@ -80,6 +81,7 @@ def __init__(self, username, password, device_id, config={}, existing_data=None)
"User-Agent": "WeishauptWEMApp",
"X-Api-Version": "3.1.3.0",
"Accept": "*/*",
"Host": "www.wemportal.com"
}
self.scraping_mapper = {}

Expand Down Expand Up @@ -210,7 +212,6 @@ def fetch_webscraping_data(self):
# Return the scraped data
return data


def api_login(self):
payload = {
"Name": self.username,
Expand Down Expand Up @@ -256,6 +257,70 @@ def api_login(self):
# Everything went fine, set valid_login to True
self.valid_login = True


def web_login(self):
"""
Logs into the WEM Portal web interface by mimicking browser behavior.
Args:
username (str): The user's username (email).
password (str): The user's password.
Returns:
dict: Session cookies for the authenticated session.
Raises:
AuthError: If the login credentials are invalid.
ForbiddenError: If access is forbidden.
UnknownAuthError: For other unknown login errors.
"""
session = reqs.Session()
login_url = "https://www.wemportal.com/Web/Login.aspx"

headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "de,en;q=0.9",
}

# Step 1: Fetch the login page
try:
initial_response = session.get(login_url, headers=headers)
initial_response.raise_for_status()
except reqs.exceptions.HTTPError as exc:
raise UnknownAuthError("Failed to load the login page.") from exc

# Step 2: Parse the login page and extract hidden form fields
soup = BeautifulSoup(initial_response.text, "html.parser")
form_data = {}
for input_tag in soup.find_all("input"):
if input_tag.get("type") == "hidden" and input_tag.get("name"):
form_data[input_tag["name"]] = input_tag.get("value", "")

# Add username and password to the form data
form_data["ctl00$content$tbxUserName"] = self.username
form_data["ctl00$content$tbxPassword"] = self.password
form_data["ctl00$content$btnLogin"] = "Anmelden" # Login button value

# Step 3: Submit the login form
try:
response = session.post(
login_url,
data=form_data,
headers={
**headers,
"Content-Type": "application/x-www-form-urlencoded",
},
)
response.raise_for_status()

# Step 4: Check if login was successful
if "ctl00_btnLogout" in response.text:
return
else:
raise AuthError("Login failed: Invalid username or password.")
except reqs.exceptions.HTTPError as exc:
if response.status_code == 403:
raise ForbiddenError("Access forbidden during login.") from exc
raise UnknownAuthError("Failed to submit the login form.") from exc

def get_response_details(self, response: reqs.Response):
server_status = ""
server_message = ""
Expand All @@ -271,7 +336,7 @@ def get_response_details(self, response: reqs.Response):
return server_status, server_message

def make_api_call(
self, url: str, headers=None, data=None, login_retry=False, delay=1
self, url: str, headers=None, data=None, do_retry=True, delay=5
) -> reqs.Response:
response = None
try:
Expand All @@ -291,15 +356,15 @@ def make_api_call(

response.raise_for_status()
except Exception as exc:
if response and response.status_code in (401, 403) and not login_retry:
if response and response.status_code in (401, 403) and not do_retry:
self.api_login()
headers = headers or self.headers
time.sleep(delay)
response = self.make_api_call(
url,
headers=headers,
data=data,
login_retry=True,
do_retry=False,
delay=delay,
)
else:
Expand All @@ -320,7 +385,7 @@ def get_devices(self):
_LOGGER.debug("Fetching api device data")
self.modules = {}
self.data = {}
data = self.make_api_call(API_DEVICE_READ_URL).json()
data = self.make_api_call(API_DEVICE_READ_URL, do_retry=True).json()

for device in data["Devices"]:
self.data[device["ID"]] = {}
Expand Down Expand Up @@ -358,11 +423,12 @@ def get_parameters(self):
"ModuleType": values["Type"],
}
try:
time.sleep(5)
response = self.make_api_call(
API_EVENT_TYPE_READ_URL, data=data
API_EVENT_TYPE_READ_URL, data=data, do_retry=False
)
except WemPortalError as exc:
if isinstance(exc.__cause__, reqs.exceptions.HTTPError) and exc.__cause__.response.status_code == 400:
if isinstance(exc.__cause__, reqs.exceptions.HTTPError) and (exc.__cause__.response.status_code == 400 or exc.__cause__.response.status_code == 403):
_LOGGER.error("Could not fetch parameters for device %s for index %s and type %s", device_id, values["Index"], values["Type"])
delete_candidates.append((values["Index"], values["Type"]))
continue
Expand Down Expand Up @@ -471,9 +537,11 @@ def get_data(self):
"https://www.wemportal.com/app/DataAccess/Refresh",
data=data,
)
time.sleep(5)
values = self.make_api_call(
"https://www.wemportal.com/app/DataAccess/Read",
data=data,
do_retry=True
).json()
# TODO: CLEAN UP
# Map values to sensors we got during scraping.
Expand Down Expand Up @@ -908,8 +976,28 @@ def scrape_pages(self, response):
except (IndexError, ValueError):
continue


output["cookie"] = self.cookie

yield output


def extract_viewstate(response_text: str) -> str:
"""
Extracts the __VIEWSTATE value from the HTML of the login page.
"""
import re
match = re.search(r'id="__VIEWSTATE" value="(.*?)"', response_text)
if not match:
raise UnknownAuthError("Failed to extract __VIEWSTATE from login page.")
return match.group(1)


def extract_eventvalidation(response_text: str) -> str:
"""
Extracts the __EVENTVALIDATION value from the HTML of the login page.
"""
import re
match = re.search(r'id="__EVENTVALIDATION" value="(.*?)"', response_text)
if not match:
raise UnknownAuthError("Failed to extract __EVENTVALIDATION from login page.")
return match.group(1)

0 comments on commit 3824eb4

Please sign in to comment.