diff --git a/packages/modules/vehicles/vwid/README.txt b/packages/modules/vehicles/vwid/README.txt index bcb537c95f..ff54f6c8cd 100644 --- a/packages/modules/vehicles/vwid/README.txt +++ b/packages/modules/vehicles/vwid/README.txt @@ -1,4 +1,41 @@ -Quellen: - https://github.com/TA2k/ioBroker.vw-connect - https://github.com/robinostlund/volkswagencarnet +Das ist ein Startpunkt und wird noch Verbesserungen und Korrekturen benötigen. +Dazu werden Rückmeldungen aus der Community benötigt. + +Dazu wird im SoC-Log einiges an Ausgaben auf Level Info ausgegeben. +In einigen Fällen könnte es auch nötig sein, Level Debug/Details einzustellen. + +Die Daten im eu-data-act Portal sind bei meinem Fahrzeug mittlerweile einigermaßen aktuell. +Wenn das Fahrzeug gefahren oder geladen wurde ca. 2 Stunden. + +Vor Nutzung muss die kontinuierliche Abfrage (alle Daten, 15-min) im Portal angestoßen werden. +https://eu-data-act.drivesomethinggreater.com/de/de/eu-data-act.html +Benutzerdefinierte Daten abrufen - All Data - 15 min +Es kann dann mehrere Stunden dauern, bis erste nicht-leere Datenpakete ankommen. + +Das Modul startet je konfiguriertem Fahrzeug einen parallelen Prozess um das Portal zu pollen. +Der Thread prüft im Minutentakt auf Bereitschaft des Portals und lädt dann die neueste zip-Datei herunter. +Die 30 letzten in den zip's enthaltenen json-Dateien werden in ramdisk/vweuda gespeichert zu evtl. Analyse +. +Aus der json wird versucht, folgende Daten zu extrahieren: soc, range, soc_timestamp, odometer. +soc scheint immer vorhanden zu sein. +Range habe ich noch nie bekommen. +Als soc_timestamp nehme ich das maximum aller car_captured_time Felder. +odometer kommt mal und mal nicht. + +Diese Daten werden je VIN für die Abfrage durch die openwb bereitgestellt. + +Der jeweils letzte Stand, der an die openwb gemeldet wird, wird auch persistent gehalten. +Dieser Stand wird bei jeder Abfrage der openwb mit den letzten aus den Portal gelieferten Daten ergänzt. +Damit wird ein Feld nicht ungültig, wenn es vom Portal nicht gemeldet wurde. +Wenn die Reichweite/range leer ist, wird diese aus Batterie-Kapazität, SoC und Durchschnittsverbrauch berechnet. + +Solange keine Daten vom Portal kommen werden entsprechende Meldungen im soc-log ausgegeben. + +Die Konfiguration des Moduls ist gleichgeblieben. + +Nach dem ersten Start ist es normal, daß erst mal keine Daten vorhanden sind. +Auch wenn das Portal schon nicht-leere Datenpakete liefert, kann es mehrere Abfrage-Intervalle dauern, bis erste Ergebnisse in openWB ankommen. + +Quellen: + https://github.com/mikrohard/hass-vw-eu-data-act diff --git a/packages/modules/vehicles/vwid/libeuda.py b/packages/modules/vehicles/vwid/libeuda.py new file mode 100755 index 0000000000..e27610a6bf --- /dev/null +++ b/packages/modules/vehicles/vwid/libeuda.py @@ -0,0 +1,913 @@ +#!/usr/bin/env python3 +"""Client for the VW EU Data Act portal (OIDC login + data delivery).""" + +from __future__ import annotations + +from typing import Union +from asyncio import new_event_loop, set_event_loop +import uuid +import logging +import aiohttp +import io +import json +import re +import zipfile +from html.parser import HTMLParser +from urllib.parse import urlencode, urljoin, urlparse +from datetime import datetime, timezone, timedelta +import time +import threading +import asyncio +# from helpermodules.pub import Pub +import glob +import os +import os.path +from pathlib import Path +from modules.common.store import RAMDISK_PATH +from modules.common.abstract_vehicle import VehicleUpdateData +from modules.vehicles.vwid.config import VWId + +"""Constants for the VW EU Data Act integration.""" + +# --- Portal / OIDC endpoints --------------------------------------------- +BASE_URL = "https://eu-data-act.drivesomethinggreater.com" +IDENTITY_BASE = "https://identity.vwgroup.io" + +# Brand is part of the OIDC state; VW passenger cars by default. +# BRAND = "VOLKSWAGEN_PASSENGER_CARS" +CALLBACK_LOGIN_PATH = "/services/callbacklogin" + +BRANDS: dict[str, dict[str, str]] = { + "volkswagen": { + "display_name": "Volkswagen", + "client_id": "9b58543e-1c15-4193-91d5-8a14145bebb0@apps_vw-dilab_com", + "state": "VOLKSWAGEN_PASSENGER_CARS", + }, + "audi": { + "display_name": "Audi", + "client_id": "cc29b87a-5e9a-4362-aecf-5adea6b01bbb@apps_vw-dilab_com", + "state": "AUDI", + }, + "skoda": { + "display_name": "Škoda", + "client_id": "3ea88bf9-1d4e-4a68-b3ad-4098c1f1d246@apps_vw-dilab_com", + "state": "SKODA", + }, + "seat": { + "display_name": "SEAT", + "client_id": "f85e5b69-e3b2-43aa-9c0d-1b7d0e0b576f@apps_vw-dilab_com", + "state": "SEAT", + }, + "cupra": { + "display_name": "CUPRA", + "client_id": "f85e5b69-e3b2-43aa-9c0d-1b7d0e0b576f@apps_vw-dilab_com", + "state": "CUPRA", + }, +} + + +DEFAULT_BRAND = "volkswagen" +DEFAULT_COUNTRY = "de" +DEFAULT_LANGUAGE = "en" + + +def get_oidc_client_id(brand: str = DEFAULT_BRAND) -> str: + """Return the OIDC client_id for the given brand.""" + return BRANDS.get(brand, BRANDS[DEFAULT_BRAND])["client_id"] + + +def get_oidc_state(brand: str = DEFAULT_BRAND) -> str: + """Return the OIDC state for the given brand.""" + brand_state = BRANDS.get(brand, BRANDS[DEFAULT_BRAND])["state"] + return f"{DEFAULT_COUNTRY}__{DEFAULT_LANGUAGE}__{brand_state}" + + +# OIDC: we build the authorize URL directly instead of using the portal's +# /services/redirect/authentication servlet, which returns HTTP 500 for +# non-browser clients (it depends on AEM browser session state). +OIDC_AUTHORIZE_URL = IDENTITY_BASE + "/oidc/v1/authorize" +# OIDC_CLIENT_ID = "9b58543e-1c15-4193-91d5-8a14145bebb0@apps_vw-dilab_com" +OIDC_SCOPE = "openid cars profile" +OIDC_REDIRECT_URI = BASE_URL + "/login" +# state encodes country__language__brand (echoed back to the portal callback). +# DEFAULT_COUNTRY = "si" +# DEFAULT_LANGUAGE = "sl" +CONF_BRAND = "brand" +# OIDC_STATE = f"{DEFAULT_COUNTRY}__{DEFAULT_LANGUAGE}__{BRAND}" + +# Legacy constants for backward compatibility (default to VW) +OIDC_CLIENT_ID = BRANDS[DEFAULT_BRAND]["client_id"] +OIDC_STATE = get_oidc_state(DEFAULT_BRAND) + +# proxy_api paths (relative to BASE_URL) +VEHICLES_PATH = "/proxy_api/consent/me/vehicles" +RELATION_PATH = "/proxy_api/vum/v2/users/me/relations/{vin}" +METADATA_PATH = "/proxy_api/euda-apim/datarequest/vehicles/{vin}/metadata/partial" +LIST_PATH = "/proxy_api/euda-apim/datadelivery/vehicles/{vin}/{identifier}/list" +DOWNLOAD_PATH = "/proxy_api/euda-apim/datadelivery/vehicles/{vin}/{identifier}/download" + +USER_AGENT = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36" +) + +# --- Config entry keys ---------------------------------------------------- +CONF_EMAIL = "email" +CONF_PASSWORD = "password" +CONF_VIN = "vin" +CONF_IDENTIFIER = "identifier" +CONF_NICKNAME = "nickname" + +# --- Scheduling ----------------------------------------------------------- +# Datasets land ~every 15 min; refresh shortly after the next expected drop. +DATASET_INTERVAL = timedelta(minutes=15) +POST_DATASET_BUFFER = timedelta(seconds=45) +RETRY_INTERVAL = timedelta(minutes=1) +MIN_INTERVAL = timedelta(seconds=30) + +# Files with this suffix carry no payload and are skipped. +NO_CONTENT_SUFFIX = "_no_content_found.zip" + +POLL_INTERVAL = 60 # polling interval in seconds +CYCLE_INTERVAL = 600 # cycle interval in seconds +EUDA_THREADNAME = "soc_bt_ev" +UTC = None +KEEP_JSON = 30 +DATA_PATH = Path(__file__).resolve().parents[4] / "data" / "modules" / "vwid" +JSON_PATH = Path(str(RAMDISK_PATH) + '/vweuda') +storeFileName = '/data_' + +# VIN-Brand map +VIN_BRAND_MAP = { + "WAU": "audi", # Audi car + "WA1": "audi", # Audi SUV + "WUA": "audi", # Audi Sport car + "WU1": "audi", # Audi Sport SUV + "99A": "audi", # Audi 2016- + "AAA": "audi", # Audi South-Africa- + "TRU": "audi", # Audi Hungary + "VSS": "cupra", # Seat/Cupra - assume cupra as default + "NAD": "skoda", # Skoda + "TMB": "skoda", # Skoda (Czech Republic) + "Y6U": "skoda", # Skoda Auto made by Eurocar (Ukraine) + "VWV": "volkswagen", # Volkswagen Spain + "WVG": "volkswagen", # SUV/Touran + "WVW": "volkswagen", # Passenger Cars + "WV1": "volkswagen", # Commercial Vehicles + "WV2": "volkswagen", # Commercial Vehicles + "WV3": "volkswagen", # Commercial Vehicles + "WV4": "volkswagen", # Commercial Vehicles + "WV5": "volkswagen", # Commercial Vehicles +} + + +_LOGGER = logging.getLogger(__name__) + + +class ApiError(Exception): + """Generic API failure.""" + + +class AuthError(ApiError): + """Authentication failed or session expired.""" + + +class _FormParser(HTMLParser): + """Extract the first
action and all hidden/input fields.""" + + def __init__(self) -> None: + super().__init__() + self.action: str | None = None + self.fields: dict[str, str] = {} + self._in_form = False + self._done = False # only capture the first form + + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None: + if self._done: + return + a = dict(attrs) + if tag == "form" and self.action is None: + self.action = a.get("action") + self._in_form = True + elif tag == "input" and self._in_form: + name = a.get("name") + if name: + self.fields[name] = a.get("value") or "" + + def handle_endtag(self, tag: str) -> None: + if tag == "form" and self._in_form: + self._in_form = False + self._done = True + + +def _parse_form(html: str) -> _FormParser: + p = _FormParser() + p.feed(html) + return p + + +def _extract_template_model(html: str) -> dict: + """Extract the VW identity ``templateModel`` JSON embedded in the page. + + The signin/authenticate pages carry their form state (hmac, relayState, + prefilled email, postAction, error) in a JS object rather than HTML inputs: + + window._IDK = { templateModel: { ... }, csrf_token: '...' } + """ + idx = html.find("templateModel") + if idx == -1: + return {} + brace = html.find("{", idx) + if brace == -1: + return {} + depth = 0 + for i in range(brace, len(html)): + c = html[i] + if c == "{": + depth += 1 + elif c == "}": + depth -= 1 + if depth == 0: + try: + return json.loads(html[brace: i + 1]) + except ValueError: + return {} + return {} + + +def _extract_csrf(html: str) -> str | None: + """Pull the csrf_token out of the identity page's JS.""" + m = re.search(r"csrf_token\s*[:=]\s*['\"]([^'\"]+)['\"]", html) + return m.group(1) if m else None + + +def _login_fields(html: str) -> tuple[dict[str, str], str | None]: + """Collect the fields needed to POST a VW identity login step. + + Merges HTML hidden inputs with the JS templateModel/csrf so it works + whether the page renders inputs server-side (email step) or via JS + (password step). Returns (fields, form_action).""" + form = _parse_form(html) + fields: dict[str, str] = dict(form.fields) + model = _extract_template_model(html) + if model: + for key in ("hmac", "relayState"): + if model.get(key): + fields[key] = model[key] + email = (model.get("emailPasswordForm") or {}).get("email") + if email: + fields.setdefault("email", email) + csrf = _extract_csrf(html) + if csrf: + fields.setdefault("_csrf", csrf) + return fields, form.action + + +def _login_error(html: str) -> str | None: + """Return a human-readable login error from the page, if present.""" + model = _extract_template_model(html) + err = model.get("error") or model.get("errorCode") + if isinstance(err, dict): + return err.get("text") or err.get("errorCode") or str(err) + return str(err) if err else None + + +def _extract_vins(payload) -> list[dict]: + """Best-effort extraction of vehicles from the (undocumented) vehicles body. + + Returns a list of {vin, nickname?} dicts. Walks the JSON for any 17-char + VIN-like identifier so it is robust to wrapper shape ({vehicles:[]}, list, …). + """ + vins: dict[str, dict] = {} + + def walk(node): + if isinstance(node, dict): + vin = node.get("vin") or node.get("vehicleIdentificationNumber") + if isinstance(vin, str) and len(vin) == 17: + vins.setdefault(vin, {"vin": vin}) + nick = node.get("vehicleNickname") or node.get("nickname") or node.get("modelName") + if nick: + vins[vin]["nickname"] = nick + for v in node.values(): + walk(v) + elif isinstance(node, list): + for v in node: + walk(v) + + walk(payload) + return list(vins.values()) + + +class EudaApiClient: + """Authenticated client for the EU Data Act portal.""" + + def __init__(self, session: aiohttp.ClientSession, email: str, password: str, brand: str = "volkswagen") -> None: + self._session = session + self._email = email + self._password = password + self._brand = brand + self._logged_in = False + + # -- low level --------------------------------------------------------- + + async def _get(self, url: str, *, headers: dict | None = None, allow_redirects: bool = True): + h = {"User-Agent": USER_AGENT, **(headers or {})} + return await self._session.get(url, headers=h, allow_redirects=allow_redirects) + + # -- authentication ---------------------------------------------------- + + async def async_login(self) -> None: + """Run the full OIDC login, populating the session cookie jar.""" + try: + await self._do_login() + except aiohttp.ClientError as err: + raise ApiError(f"Network error during login: {err}") from err + self._logged_in = True + + async def _do_login(self) -> None: + # 0. Prime the portal session (the browser loads the site first; this + # sets the AEM load-balancer/session cookies the callback needs). + try: + async with await self._get(f"{BASE_URL}/") as resp: + await resp.read() + except aiohttp.ClientError as err: + _LOGGER.debug("login step0: priming GET failed (ignored): %s", err) + + # 1. Start the OIDC flow directly at the identity provider. We build the + # authorize URL ourselves because the portal's + # /services/redirect/authentication servlet returns HTTP 500 for + # non-browser clients. + authorize_url = self._build_authorize_url(self._brand) + _LOGGER.debug("login step1: authorize url = %s", authorize_url) + async with await self._get(authorize_url) as resp: + signin_url = str(resp.url) + signin_html = await resp.text() + _LOGGER.debug("login step2: signin page = %s (%d bytes)", signin_url, len(signin_html)) + + # 2. POST the email (identifier step). Fields come from HTML inputs + # and/or the JS templateModel (hmac, _csrf, relayState). + fields, action = _login_fields(signin_html) + _LOGGER.debug("login step2: action=%s fields=%s", action, sorted(fields)) + if "hmac" not in fields or "_csrf" not in fields: + raise AuthError( + f"Could not parse the sign-in form (fields found: {sorted(fields)})" + ) + fields["email"] = self._email + identifier_action = urljoin(signin_url, action or "") + async with self._session.post( + identifier_action, + data=fields, + headers={"User-Agent": USER_AGENT, "Referer": signin_url}, + ) as resp: + authenticate_url = str(resp.url) + authenticate_html = await resp.text() + status = resp.status + _LOGGER.debug( + "login step3: after identifier POST status=%s url=%s", status, authenticate_url + ) + + # 3. The identifier step lands on the password (authenticate) page, + # whose hidden fields live in the JS templateModel, not HTML inputs. + fields2, action2 = _login_fields(authenticate_html) + _LOGGER.debug("login step3: action=%s fields=%s", action2, sorted(fields2)) + if "hmac" not in fields2 or "_csrf" not in fields2: + err = _login_error(authenticate_html) + raise AuthError( + err + or "Identity portal did not return the password form - check the " + "email address (or the login flow changed)" + ) + fields2["email"] = self._email + fields2["password"] = self._password + # The browser posts to the clean /login/authenticate URL with relayState + # in the body; posting to authenticate_url (which carries ?relayState=) + # duplicates it and is rejected with HTTP 400. Strip the query. + if action2: + authenticate_action = urljoin(authenticate_url, action2) + else: + authenticate_action = authenticate_url.split("?", 1)[0] + _LOGGER.debug("login step4: POST credentials to %s", authenticate_action) + + # 4. POST credentials; follow the redirect chain back to the portal, + # which sets the session cookies via /services/callbacklogin. + async with self._session.post( + authenticate_action, + data=fields2, + headers={"User-Agent": USER_AGENT, "Referer": authenticate_url}, + ) as resp: + landing = str(resp.url) + landing_html = await resp.text() + if resp.status >= 400: + _LOGGER.debug( + "login step4: HTTP %s body[:500]=%s", resp.status, landing_html[:500] + ) + err = _login_error(landing_html) + raise AuthError(err or f"Login rejected (HTTP {resp.status})") + _LOGGER.debug("login step4: landed on %s", landing) + + # Positively confirm success: a completed flow lands back on the portal + # host (via /services/callbacklogin). Bad credentials re-render the + # identity sign-in page (URL still on identity.vwgroup.io/signin-service). + portal_host = urlparse(BASE_URL).netloc + if "signin-service" in landing or "/error" in landing: + raise AuthError("Login failed - check email and password") + if urlparse(landing).netloc != portal_host: + raise AuthError(f"Login did not complete (ended at {landing})") + + @staticmethod + def _build_authorize_url(brand: str = "volkswagen") -> str: + """Construct the OIDC authorize URL (bypasses the broken AEM servlet).""" + params = { + "client_id": get_oidc_client_id(brand), + "response_type": "code", + "scope": OIDC_SCOPE, + "state": get_oidc_state(brand), + "redirect_uri": OIDC_REDIRECT_URI, + "prompt": "login", + } + return f"{OIDC_AUTHORIZE_URL}?{urlencode(params)}" + + # -- authenticated requests ------------------------------------------- + + async def _get_json(self, url: str, *, headers: dict | None = None, _retry: bool = True): + async with await self._get(url, headers=headers) as resp: + if resp.status in (401, 403) and _retry: + _LOGGER.debug("Session expired (%s) for %s; re-authenticating", resp.status, url) + self._logged_in = False + await self.async_login() + return await self._get_json(url, headers=headers, _retry=False) + if resp.status >= 400: + raise ApiError(f"GET {url} -> HTTP {resp.status}") + text = await resp.text() + try: + return json.loads(text) + except ValueError as err: + raise ApiError(f"Invalid JSON from {url}: {err}") from err + + async def async_ensure_login(self) -> None: + if not self._logged_in: + await self.async_login() + + async def async_list_vehicles(self) -> list[dict]: + await self.async_ensure_login() + payload = await self._get_json(f"{BASE_URL}{VEHICLES_PATH}?viewPosition=FRONT_LEFT") + vehicles = _extract_vins(payload) + # Always enrich with the friendly vehicleNickname from the relation + # endpoint (the authoritative source, e.g. "ID.3"). + for veh in vehicles: + try: + rel = await self.async_get_relation(veh["vin"]) + nickname = (rel.get("relation") or {}).get("vehicleNickname") + _LOGGER.debug("relation for %s: nickname=%r", veh["vin"], nickname) + if nickname: + veh["nickname"] = nickname + except ApiError as err: + _LOGGER.debug("Could not fetch nickname for %s: %s", veh["vin"], err) + return vehicles + + async def async_get_relation(self, vin: str) -> dict: + await self.async_ensure_login() + # The relation endpoint requires a traceid header; it returns HTTP 400 + # without one. + headers = {"traceid": f"vehicle-relation-fetch-{uuid.uuid4()}"} + return await self._get_json( + f"{BASE_URL}{RELATION_PATH.format(vin=vin)}", headers=headers + ) + + async def async_get_metadata(self, vin: str) -> dict: + """Return the data-request metadata; ``Identifier`` is needed downstream.""" + await self.async_ensure_login() + return await self._get_json(f"{BASE_URL}{METADATA_PATH.format(vin=vin)}") + + async def async_list_datasets(self, vin: str, identifier: str) -> list[dict]: + """Return the rolling list of available zips: [{name, createdOn, size}].""" + await self.async_ensure_login() + url = f"{BASE_URL}{LIST_PATH.format(vin=vin, identifier=identifier)}" + # The list endpoint requires the data-request type header (matching + # metadata/partial); without it the backend returns HTTP 500. + data = await self._get_json(url, headers={"type": "partial"}) + return data if isinstance(data, list) else data.get("files", []) + + async def async_download_dataset(self, vin: str, identifier: str, name: str) -> Union[str, dict]: + """Download a specific zip by name and return the parsed JSON inside it.""" + await self.async_ensure_login() + if name.endswith(NO_CONTENT_SUFFIX): + raise ApiError(f"{name} contains no content") + url = f"{BASE_URL}{DOWNLOAD_PATH.format(vin=vin, identifier=identifier)}" + headers = {"filename": name, "type": "partial"} + async with await self._get(url, headers=headers) as resp: + if resp.status in (401, 403): + self._logged_in = False + await self.async_login() + async with await self._get(url, headers=headers) as resp2: + if resp2.status >= 400: + raise ApiError(f"Download {name} -> HTTP {resp2.status}") + raw = await resp2.read() + elif resp.status >= 400: + raise ApiError(f"Download {name} -> HTTP {resp.status}") + else: + raw = await resp.read() + _data = self._unzip_json(raw, name) + _name = name.replace('.zip', '.json') + return _name, _data + + @staticmethod + def _unzip_json(raw: bytes, name: str) -> dict: + try: + with zipfile.ZipFile(io.BytesIO(raw)) as zf: + members = [n for n in zf.namelist() if n.lower().endswith(".json")] + if not members: + raise ApiError(f"No JSON inside {name}") + with zf.open(members[0]) as fh: + return json.loads(fh.read().decode("utf-8")) + except (zipfile.BadZipFile, ValueError) as err: + raise ApiError(f"Could not read {name}: {err}") from err + + +async def _async_try_login(session, client) -> str | None: + """Attempt login + vehicle discovery; return an error key or None.""" + try: + await client.async_login() + _vehicles = await client.async_list_vehicles() + _LOGGER.info(f"_vehicles={_vehicles}") + except AuthError: + return "invalid_auth" + except ApiError: + return "cannot_connect" + except Exception: # noqa: BLE001 + _LOGGER.exception("Unexpected error during login") + return "unknown" + return None + + +async def _async_update_data(client, vin, identifier) -> Union[str, str]: + await client.async_ensure_login() + # await client.async_login() + try: + listing = await client.async_list_datasets(vin, identifier) + except AuthError as err: + # Retry soon rather than waiting the full ~15-min cadence. + # self.update_interval = RETRY_INTERVAL + raise ApiError(f"Authentication failed: {err}") from err + except ApiError as err: + # self.update_interval = RETRY_INTERVAL + if "HTTP 400" in str(err): + _LOGGER.info(f"err={err}") + # The data-delivery endpoint returns 400 until the portal has + # finished provisioning a newly enabled continuous data request, + # which can take a few hours. HA keeps retrying until it's ready. + raise ApiError( + "Data delivery not ready yet (HTTP 400). If you just enabled " + "the continuous data request on the portal, it can take a few " + "hours to start; will keep retrying." + ) from err + raise ApiError(str(err)) from err + + # content datasets, oldest -> newest by createdOn + content = sorted( + (e for e in listing if e.get("name") and not e["name"].endswith(NO_CONTENT_SUFFIX)), + key=lambda e: _created_on(e) or datetime.min.replace(tzinfo=timezone.utc), + ) + _LOGGER.debug("refresh: %d listed, %d with content", len(listing), len(content)) + + if not content: + raise ApiError("No datasets with content available yet") + + # Load the newest dataset for live state. (We don't backfill history into + # statistics: importing into recorder-managed sensor entities collides + # with the recorder's own statistics and can corrupt unrelated ones.) + newest = content[-1] + + try: + name, payload = await client.async_download_dataset( + vin, identifier, newest["name"] + ) + except ApiError as err: + raise ApiError(f"Could not download newest dataset: {err}") from err + + return name, payload + + +def _filename_timestamp(name: str) -> datetime | None: + """Parse a YYYYMMDDhhmmss segment from a dataset filename. + + Handles both layouts seen in the wild ("TIMESTAMP_VIN.zip" and + "VIN_TIMESTAMP.zip") by scanning the underscore-separated parts + right-to-left for the first one that parses as a timestamp. + """ + stem = name.rsplit(".", 1)[0] + for part in reversed(stem.split("_")): + try: + return datetime.strptime(part, "%Y%m%d%H%M%S").replace(tzinfo=timezone.utc) + except ValueError: + continue + return None + + +def _created_on(entry: dict) -> datetime | None: + raw = entry.get("createdOn") + if not raw: + return _filename_timestamp(entry.get("name", "")) + try: + return datetime.fromisoformat(raw.replace("Z", "+00:00")) + except ValueError: + return _filename_timestamp(entry.get("name", "")) + + +def get_field_value_by_key(D: dict, key: str) -> str: + ret = None + for f in D: + if f['key'] == key: + ret = f['value'] + return ret + + +CAR_TIMESTAMP = "car_captured_time" + + +def get_max_value_by_fieldname(D: dict, field: str) -> str: + ret = "-" + for f in D: + if f['dataFieldName'] == field: + v = f['value'] + if ret == "-": + ret = v + else: + if v > ret: + ret = v + return ret + + +class euda(): + + # _LOGGER.setLevel(logging.DEBUG) + client = {} + thread = {} + result = {} + + def __init__(self): + # make sure required folders are there + try: + _LOGGER.debug("DATA_PATH=" + str(DATA_PATH)) + DATA_PATH.mkdir(parents=True, exist_ok=True) + except Exception as e: + _LOGGER.exception("init: dataPath creation failed, dataPath: " + + str(DATA_PATH) + ", error=" + str(e)) + + try: + _LOGGER.debug("JSON_PATH=" + str(JSON_PATH)) + JSON_PATH.mkdir(parents=True, exist_ok=True) + except Exception as e: + _LOGGER.exception("init: JSON_PATH creation failed, dataPath: " + + str(JSON_PATH) + ", error=" + str(e)) + + def save_json_file(self, _name: str, vin: str, _data: dict) -> bool: + status = False + fname = str(JSON_PATH) + '/' + _name + if not os.path.isfile(fname): + with open(fname, 'w') as f: + _LOGGER.info(f"save json file {fname}") + json.dump(_data, f, indent=4) + status = True + else: + _LOGGER.info(f"file {fname} not saved because is exists already") + + # cleanup old file except the latest KEEP_JSON! + _l = glob.glob(str(JSON_PATH) + '/*_' + vin + '.json') + _l.sort() + _len = len(_l) + _del = _len - KEEP_JSON + _LOGGER.debug(f"cleanup: KEEP_JSON={KEEP_JSON}, _l={_l}\n _len ={_len}, _del={_del}") + if _del > 0: + _del_list = _l[0:_del] + _LOGGER.debug(f"cleanup: _del_list={_del_list}") + for f in _del_list: + os.remove(f) + _LOGGER.debug(f"delete json file {f}") + + return status + + # eudaThread + async def async_eudaThread(self, username: str, password: str, vin: str): + brand = VIN_BRAND_MAP[vin[0:3]] + _LOGGER.info(f"async Thread started, brand={brand}") + try: + async with aiohttp.ClientSession(headers={'Connection': 'keep-alive'}) as session: + _k = str(euda.client.keys()) + _LOGGER.info(f"libeuda.Thread client at entry: euda.client.keys={_k}") + if username not in euda.client: + _LOGGER.debug(f"create new client, key={self.username}") + euda.client[username] = {} + euda.client[username] = EudaApiClient(session, username, password, brand) + _k = str(euda.client.keys()) + _LOGGER.info(f"libeuda.Thread client: euda.client.keys={_k}") + meta = None + while meta is None: + try: + meta = await euda.client[username].async_get_metadata(vin) + except ApiError as err: + if "HTTP 500" in str(err): + _LOGGER.info(f"Portal not ready/get_metadata, wait {POLL_INTERVAL} seconds") + else: + _LOGGER.info(f"APIError/get_metadata: {err}") + meta = None + except Exception as err: + _LOGGER.info(f"Exception/get_metadata: {err}") + meta = None + if meta is None: + time.sleep(POLL_INTERVAL) + + identifier = meta.get("Identifier") + + # thread main loop + while True: + try: + _data = None + while _data is None: + try: + _name, _data = await _async_update_data(euda.client[username], vin, identifier) + except ApiError as err: + if "HTTP 500" in str(err): + _LOGGER.info(f"Portal not ready/update_data, wait {POLL_INTERVAL} seconds") + _data = None + else: + _LOGGER.exception(f"thread APIError {err}") + _data = None + except Exception as err: + _LOGGER.exception(f"thread Exception {err}") + _data = None + if _data is None: + time.sleep(POLL_INTERVAL) + + vin = _data['vin'] + status = self.save_json_file(_name, vin, _data) + + if status: + _Data = _data['Data'] + soc = get_field_value_by_key(_Data, 'f89ed652-d104-3fa6-b7e2-ab7543309e7b') + if soc is None: + soc = get_field_value_by_key(_Data, '506cb83e-f99f-3af3-bbeb-0429b69a78d9') + if soc is None: + soc = get_field_value_by_key(_Data, 'ac1108b1-b8cc-3db9-a663-03d387e42223') + range = get_field_value_by_key(_Data, '153e8c40-4c6c-3c17-a11b-0ecc35d55b81') + if range is None: + range = get_field_value_by_key(_Data, '0ca40e18-0564-3eda-bcc0-7aee9ef44f04') + odometer = get_field_value_by_key(_Data, '30cc36fd-71ca-3c09-9296-e94ebd47bd2b') + soc_timestamp = get_field_value_by_key(_Data, '7b76a2c8-162c-3438-814b-0768f6cc6649') + car_timestamp = get_field_value_by_key(_Data, '2496cd73-8a68-318c-a159-200ecfd0e47d') + max_timestamp = get_max_value_by_fieldname(_Data, CAR_TIMESTAMP) + + euda.result[vin] = {} + euda.result[vin]['soc'] = soc + euda.result[vin]['range'] = range + euda.result[vin]['soc_timestamp'] = soc_timestamp + euda.result[vin]['max_timestamp'] = max_timestamp + euda.result[vin]['car_timestamp'] = car_timestamp + euda.result[vin]['odometer'] = odometer + _LOGGER.info(f"thread result:\n{json.dumps(euda.result, indent=4)}") + _LOGGER.info(f"sleep {CYCLE_INTERVAL} seconds") + time.sleep(CYCLE_INTERVAL) + + except Exception as e: + _LOGGER.exception(f"thread loop failed 0, exception={e}") + + except Exception as e: + _LOGGER.exception(f"thread body failed 0, exception={e}") + + def eudaThread(self, username: str, password: str, vin: str): + _LOGGER.info(f"sync libeuda.eudaThread {threading.current_thread().name} started") + asyncio.run(self.async_eudaThread(username, password, vin)) + _LOGGER.info(f"sync libeuda.eudaThread {threading.current_thread().name} ended") + + async def get_status(self, + conf: VWId, + vehicle: int, + vehicle_update_data: VehicleUpdateData) -> Union[int, float, str, float, float]: + + # error codes SOCERR-xx raised: + # SOCERR-00: general error + # SOCERR-01: login problem, username, password wrong, account locked, etc. + # SOCERR-02: vehicle not (yet) found in portal, VIN wrong? + self.username = conf.configuration.user_id + self.password = conf.configuration.password + self.vin = conf.configuration.vin + self.vehicle = vehicle + _LOGGER.debug(f"update_data:{vehicle_update_data}") + self.average_consump = vehicle_update_data.average_consump + self.battery_capacity = vehicle_update_data.battery_capacity + + try: + self.storeFile = str(DATA_PATH) + storeFileName + str(self.vin) + '.json' + if os.path.isfile(self.storeFile): + _LOGGER.debug(f"load data from {self.storeFile}") + with open(self.storeFile) as f: + data = json.load(f) + else: + data = {} + data['currentSOC_pct'] = str(0) + data['cruisingRangeElectric_km'] = str(0) + data['carCapturedTimestamp'] = "1970-01-01T00:00:00Z" + data['soc_timestamp'] = str(0) + data['odometer'] = str(0) + + euda.thread[self.username] = {} + euda.thread[self.username]['name'] = EUDA_THREADNAME + self.vehicle + euda.thread[self.username]['thread'] = None + for t in threading.enumerate(): + if t.name == euda.thread[self.username]['name']: + _LOGGER.debug(f"thread {t.name} exists already") + euda.thread[self.username]['thread'] = t + if euda.thread[self.username]['thread'] is None: + _LOGGER.debug(f"{euda.thread[self.username]['name']} not found: starting now") + euda.thread[self.username]['thread'] = threading.Thread(target=self.eudaThread, + name=euda.thread[self.username]['name'], + args=(self.username, self.password, self.vin)) + euda.thread[self.username]['thread'].start() + + if self.vin in euda.result: + _LOGGER.debug(f"vehicle match: {self.vin}") + _LOGGER.info(f"result from thread:\n{json.dumps(euda.result, indent=4)}") + soc = euda.result[self.vin]['soc'] + range = euda.result[self.vin]['range'] + if int(soc) > 0 and range is None: + range = int(float(soc) * float(self.battery_capacity) / float(self.average_consump)) + _LOGGER.warn(f"no range delivered, calculate range = {range}km") + ts = euda.result[self.vin]['soc_timestamp'] + tsxx = euda.result[self.vin]['max_timestamp'] + odometer = euda.result[self.vin]['odometer'] + + _LOGGER.debug(f"vin = {self.vin}") + _LOGGER.debug(f"soc = {soc}") + _LOGGER.debug(f"range = {range}") + _LOGGER.debug(f"soc_timestamp = {ts}") + _LOGGER.debug(f"soc_timestampxx = {tsxx}") + _LOGGER.debug(f"odometer = {odometer}") + + data_modified = False + if soc and str(soc) != data['currentSOC_pct']: + data['currentSOC_pct'] = str(soc) + data_modified = True + if range and str(range) != data['cruisingRangeElectric_km']: + data['cruisingRangeElectric_km'] = str(range) + data_modified = True + if ts and str(ts) != data['soc_timestamp']: + data['soc_timestamp'] = str(ts) + data_modified = True + if tsxx and str(tsxx) != data['carCapturedTimestamp']: + data['carCapturedTimestamp'] = str(tsxx) + data_modified = True + if odometer and str(odometer) != data['odometer']: + data['odometer'] = str(odometer) + data_modified = True + + # save data to file if modified + if data_modified or not os.path.isfile(self.storeFile): + _LOGGER.info(f"save data to {self.storeFile}") + with open(self.storeFile, 'w') as f: + json.dump(data, f, indent=4) + + else: + _LOGGER.error(f"SOCERR-02: Für VIN {self.vin} wurden (noch) keine Daten gefunden") + # raise Exception(f"SOCERR-02: Für VIN {self.vin} wurden (noch) keine Daten gefunden") + + _LOGGER.info(f"return data:\n{json.dumps(data, indent=4)}") + soc = data['currentSOC_pct'] + range = data['cruisingRangeElectric_km'] + tsxx = data['carCapturedTimestamp'] + ts = data['soc_timestamp'] + odometer = data['odometer'] + _LOGGER.info(f"get_status: soc={soc}, range={range}, ts={ts}, tsxx={tsxx}, odometer={odometer}") + + # for test only: + # set soc_timestamp to 0 to avoid computed state being later than this reported state + # topic = f"openWB/vehicle/{self.vehicle}/get/soc_timestamp" + # ep0 = 0 + # _LOGGER.info(f"get_status: publish soc_timestamp as 0: topic: {topic}, message: {ep0}") + # Pub().pub(topic, ep0) + + return int(soc), float(range), int(ts), tsxx, float(odometer) + except Exception as e: + _LOGGER.exception(f"get_status failed 0, exception={e}") + # if exception is a SOCERR reraise it, otherwise raise general SOCERR-00 + if "SOCERR" in str(e): + raise e + else: + _t = f"SOCERR-00: Für User {self.username} und VIN {self.vin} wurden keine Daten empfangen" + raise Exception(f"{_t} {e}") + + +# sync function +def fetch_soc(conf: VWId, + vehicle: int, + vehicle_update_data: VehicleUpdateData) -> Union[int, float, int, str, float]: + + # prepare and call async method + loop = new_event_loop() + set_event_loop(loop) + + # get soc, range from server + a = euda() + soc, range, soc_ts, soc_tsX, odometer =\ + loop.run_until_complete(a.get_status(conf, vehicle, vehicle_update_data)) + + return soc, range, soc_ts, soc_tsX, odometer diff --git a/packages/modules/vehicles/vwid/libvwid.py b/packages/modules/vehicles/vwid/libvwid.py deleted file mode 100755 index 2018906f56..0000000000 --- a/packages/modules/vehicles/vwid/libvwid.py +++ /dev/null @@ -1,1510 +0,0 @@ -#!/usr/bin/env python3 -"""Communicate with Volkswagen Connect services.""" - -from __future__ import annotations - -import asyncio -from datetime import datetime, timedelta -import hashlib -from json import dumps as to_json -from json import loads -import logging -from random import randint, random -import re -from urllib.parse import parse_qs, urljoin, urlparse - -import aiohttp -# from aiohttp.hdrs import METH_GET, METH_POST, METH_PUT -import bs4 -import jwt - -ANDROID_PACKAGE_NAME = "com.volkswagen.weconnect" -APP_URI = "weconnect://authenticated" -BASE_API = "https://emea.bff.cariad.digital" -BASE_AUTH = "https://identity.vwgroup.io" -BASE_SESSION = "https://msg.volkswagen.de" -BRAND = "VW" -USER_AGENT = "Volkswagen/3.51.1-android/14" -CLIENT = { - "Legacy": { - "CLIENT_ID": "a24fba63-34b3-4d43-b181-942111e6bda8@apps_vw-dilab_com", - "SCOPE": "openid profile badge cars dealers vin", - "TOKEN_TYPES": "code", - } -} -COUNTRY = "DE" -HEADERS_SESSION = { - "Connection": "keep-alive", - "Content-Type": "application/json", - "Accept-charset": "UTF-8", - "Accept": "application/json", - "User-Agent": USER_AGENT, - "tokentype": "IDK_TECHNICAL", - "x-android-package-name": ANDROID_PACKAGE_NAME, -} -HEADERS_AUTH = { - "Connection": "keep-alive", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp," - "image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "Accept-Encoding": "gzip, deflate", - "Content-Type": "application/x-www-form-urlencoded", - "x-android-package-name": ANDROID_PACKAGE_NAME, -} - -MAX_RETRIES_ON_RATE_LIMIT = 3 -TIMEOUT = timedelta(seconds=30) -JWT_ALGORITHMS = ["RS256"] - - -class Services: - """Service names that are used in `capabilities` and `selectivestatus` calls.""" - - CHARGING = "charging" - PARAMETERS = "parameters" - SERVICE_STATUS = "service_status" - MEASUREMENTS = "measurements" - - -def find_path_in_dict(src, path) -> object: - """Return data at path in dictionary source. - - Simple navigation of a hierarchical dict structure using XPATH-like syntax. - - >>> find_path_in_dict(dict(a=1), 'a') - 1 - - >>> find_path_in_dict(dict(a=1), '') - {'a': 1} - - >>> find_path_in_dict(dict(a=None), 'a') - - - >>> find_path_in_dict(dict(a=1), 'b') - Traceback (most recent call last): - ... - KeyError: 'b' - - >>> find_path_in_dict(dict(a=dict(b=1)), 'a.b') - 1 - - >>> find_path_in_dict(dict(a=dict(b=1)), 'a') - {'b': 1} - - >>> find_path_in_dict(dict(a=dict(b=1)), 'a.c') - Traceback (most recent call last): - ... - KeyError: 'c' - - """ - if not path: - return src - if isinstance(path, str): - path = path.split(".") - if isinstance(src, list): - try: - f = float(path[0]) - if f.is_integer() and len(src) > 0: - return find_path_in_dict(src[int(f)], path[1:]) - raise KeyError("Key not found") - except ValueError as valerr: - raise KeyError(f"{path[0]} should be an integer") from valerr - except IndexError as idxerr: - raise KeyError("Index out of range") from idxerr - return find_path_in_dict(src[path[0]], path[1:]) - - -def find_path(src, path) -> object: - """Return data at path in source.""" - try: - return find_path_in_dict(src, path) - except KeyError: - _LOGGER.error( - "Dictionary path: %s is no longer present. Dictionary: %s", path, src - ) - return None - - -def is_valid_path(src, path): - """Check if path exists in source. - - >>> is_valid_path(dict(a=1), 'a') - True - - >>> is_valid_path(dict(a=1), '') - True - - >>> is_valid_path(dict(a=1), None) - True - - >>> is_valid_path(dict(a=1), 'b') - False - - >>> is_valid_path({"a": [{"b": True}, {"c": True}]}, 'a.0.b') - True - - >>> is_valid_path({"a": [{"b": True}, {"c": True}]}, 'a.1.b') - False - """ - try: - find_path_in_dict(src, path) - except KeyError: - return False - else: - return True - - -UTC = None -BACKEND_RECEIVED_TIMESTAMP = "BACKEND_RECEIVED_TIMESTAMP" - -_LOGGER = logging.getLogger(__name__) - - -class Vehicle: - """Vehicle contains the state of sensors and methods for interacting with the car.""" - - def __init__(self, conn, url) -> None: - """Initialize the Vehicle with default values.""" - self._connection = conn - self._url = url - self._homeregion = "https://msg.volkswagen.de" - self._discovered = False - self._states = {} - self._requests: dict[str, object] = { - "departuretimer": {"status": "", "timestamp": datetime.now(UTC)}, - "batterycharge": {"status": "", "timestamp": datetime.now(UTC)}, - "climatisation": {"status": "", "timestamp": datetime.now(UTC)}, - "refresh": {"status": "", "timestamp": datetime.now(UTC)}, - "lock": {"status": "", "timestamp": datetime.now(UTC)}, - "latest": "", - "state": "", - } - - # API Endpoints that might be enabled for car (that we support) - self._services: dict[str, dict[str, object]] = { - Services.CHARGING: {"active": False}, - Services.PARAMETERS: {}, - } - - def _in_progress(self, topic: str, unknown_offset: int = 0) -> bool: - """Check if request is already in progress.""" - if self._requests.get(topic, {}).get("id", False): - timestamp = self._requests.get(topic, {}).get( - "timestamp", - datetime.now(UTC) - timedelta(minutes=unknown_offset), - ) - if timestamp + timedelta(minutes=3) < datetime.now(UTC): - self._requests.get(topic, {}).pop("id") - else: - _LOGGER.debug("Action (%s) already in progress", topic) - return True - return False - - async def _handle_response( - self, response, topic: str, error_msg: str | None = None - ) -> bool: - """Handle errors in response and get requests remaining.""" - if not response: - self._requests[topic] = { - "status": "Failed", - "timestamp": datetime.now(UTC), - } - _LOGGER.error( - error_msg - if error_msg is not None - else f"Failed to perform {topic} action" - ) - raise Exception( - error_msg - if error_msg is not None - else f"Failed to perform {topic} action" - ) - self._requests[topic] = { - "timestamp": datetime.now(UTC), - "status": response.get("state", "Unknown"), - "id": response.get("id", 0), - } - if response.get("state", None) == "Throttled": - status = "Throttled" - _LOGGER.warning("Request throttled (%s)", topic) - else: - status = await self.wait_for_request(request=response.get("id", 0)) - self._requests[topic] = { - "status": status, - "timestamp": datetime.now(UTC), - } - return True - - # API get and set functions # - # Init and update vehicle data - async def discover(self): - """Discover vehicle and initial data.""" - - _LOGGER.debug("Attempting discovery of supported API endpoints for vehicle") - - capabilities_response = await self._connection.getOperationList(self.vin) - parameters_list = capabilities_response.get("parameters", {}) - capabilities_list = capabilities_response.get("capabilities", {}) - - # Update services with parameters - if parameters_list: - self._services[Services.PARAMETERS].update(parameters_list) - - # If there are no capabilities, log a warning - if not capabilities_list: - _LOGGER.warning( - "Could not determine available API endpoints for %s", self.vin - ) - self._discovered = True - return - - for service_id, service in capabilities_list.items(): - if service_id not in self._services: - continue - - service_name = service.get("id", "Unknown Service") - data = {} - - if service.get("isEnabled", False): - data["active"] = True - _LOGGER.debug("Discovered enabled service: %s", service_name) - - expiration_date = service.get("expirationDate", None) - if expiration_date: - data["expiration"] = expiration_date - - operations = service.get("operations", {}) - data["operations"] = [op.get("id", None) for op in operations.values()] - - parameters = service.get("parameters", []) - data["parameters"] = parameters - - else: - reason = service.get("status", "Unknown reason") - _LOGGER.debug( - "Service: %s is disabled due to: %s", service_name, reason - ) - data["active"] = False - - # Update the service data - try: - self._services[service_name].update(data) - except Exception as error: - _LOGGER.warning( - 'Exception "%s" while updating service "%s": %s', - error, - service_name, - data, - ) - - _LOGGER.debug("API endpoints: %s", self._services) - self._discovered = True - - async def update(self): - """Try to fetch data for all known API endpoints.""" - _LOGGER.debug("connection update") - if not self._discovered: - _LOGGER.debug("connection discover") - await self.discover() - if not self.deactivated: - _LOGGER.debug("connection gather selective status charging") - await asyncio.gather( - self.get_selectivestatus( - [ - Services.CHARGING, - Services.MEASUREMENTS, - ] - ) - ) - await asyncio.gather(self.get_service_status()) - else: - _LOGGER.warning("Vehicle with VIN %s is deactivated", self.vin) - - # Data collection functions - async def get_selectivestatus(self, services): - """Fetch selective status for specified services.""" - data = await self._connection.getSelectiveStatus(self.vin, services) - if data: - self._states.update(data) - - async def get_vehicle(self): - """Fetch car masterdata.""" - _LOGGER.debug("get_vehicle for Vehicle with VIN %s", self.vin) - data = await self._connection.getVehicleData(self.vin) - if data: - self._states.update(data) - - async def get_service_status(self): - """Fetch service status.""" - data = await self._connection.get_service_status() - if data: - self._states.update({Services.SERVICE_STATUS: data}) - - # Refresh vehicle data (VSR) - async def set_refresh(self): - """Wake up vehicle and update status data.""" - if self._in_progress("refresh", unknown_offset=-5): - return False - try: - self._requests["latest"] = "Refresh" - response = await self._connection.wakeUpVehicle(self.vin) - if response: - if response.status == 204: - self._requests["state"] = "in_progress" - self._requests["refresh"] = { - "timestamp": datetime.now(UTC), - "status": "in_progress", - "id": 0, - } - status = await self.wait_for_data_refresh() - elif response.status == 429: - status = "Throttled" - _LOGGER.debug("Server side throttled. Try again later") - else: - _LOGGER.debug( - "Unable to refresh the data. Incorrect response code: %s", - response.status, - ) - self._requests["state"] = status - self._requests["refresh"] = { - "status": status, - "timestamp": datetime.now(UTC), - } - return True - _LOGGER.debug("Unable to refresh the data") - except Exception as error: - _LOGGER.warning("Failed to execute data refresh - %s", error) - self._requests["refresh"] = { - "status": "Exception", - "timestamp": datetime.now(UTC), - } - raise Exception("Data refresh failed") - - # Vehicle class helpers # - # Vehicle info - @property - def attrs(self): - """Return all attributes. - - :return: - """ - return self._states - - def has_attr(self, attr) -> bool: - """Return true if attribute exists. - - :param attr: - :return: - """ - return is_valid_path(self.attrs, attr) - - def get_attr(self, attr): - """Return a specific attribute. - - :param attr: - :return: - """ - return find_path(self.attrs, attr) - - async def expired(self, service): - """Check if access to service has expired.""" - try: - now = datetime.now(UTC) - if self._services.get(service, {}).get("expiration", False): - expiration = self._services.get(service, {}).get("expiration", False) - if not expiration: - expiration = datetime.neow(UTC) + timedelta(days=1) - else: - _LOGGER.debug( - "Could not determine end of access for service %s, assuming it is valid", - service, - ) - expiration = datetime.now(UTC) + timedelta(days=1) - expiration = expiration.replace(tzinfo=None) - if now >= expiration: - _LOGGER.info("Access to %s has expired!", service) - self._discovered = False - return True - except Exception: - _LOGGER.debug( - "Exception. Could not determine end of access for service %s, assuming it is valid", - service, - ) - return False - else: - return False - - @property - def vin(self) -> str: - """Vehicle identification number. - - :return: - """ - return self._url - - @property - def deactivated(self) -> bool | None: - """Return true if service is deactivated. - - :return: - """ - return self.attrs.get("carData", {}).get("deactivated", None) - - # Helper functions # - def __str__(self): - """Return the vin.""" - return self.vin - - -def json_loads(s) -> object: - """Load JSON from string and parse timestamps.""" - return loads(s, object_hook=obj_parser) - - -def obj_parser(obj: dict) -> dict: - """Parse datetime.""" - for key, val in obj.items(): - try: - obj[key] = datetime.strptime(val, "%Y-%m-%dT%H:%M:%S%z") - except (TypeError, ValueError): - """The value was not a date.""" - return obj - - -# noinspection PyPep8Naming -class Connection: - """Connection to VW-Group Connect services.""" - - _login_lock = asyncio.Lock() - - # Init connection class - def __init__( - self, - session, - username, - password, - fulldebug=False, - country=COUNTRY, - interval=timedelta(minutes=5), - ) -> None: - """Initialize.""" - self._x_client_id = None - self._session = session - self._session_fulldebug = fulldebug - self._session_headers = HEADERS_SESSION.copy() - self._session_base = BASE_SESSION - self._session_auth_headers = HEADERS_AUTH.copy() - self._session_auth_base = BASE_AUTH - self._session_refresh_interval = interval - - no_vin_key = "" - self._session_auth_ref_urls = {no_vin_key: BASE_SESSION} - self._session_spin_ref_urls = {no_vin_key: BASE_SESSION} - self._session_logged_in = False - self._session_first_update = False - self._session_auth_username = username - self._session_auth_password = password - self._session_tokens = {} - self._session_country = country.upper() - - self._vehicles = [] - - _LOGGER.debug("Using service %s", self._session_base) - - self._jarCookie = "" - self._state = {} - - self._service_status = {} - - def _clear_cookies(self): - self._session._cookie_jar._cookies.clear() - - # API Login - async def doLogin(self, tries: int = 3): - """Login method, clean login.""" - async with self._login_lock: - _LOGGER.debug("Initiating new login") - - for i in range(tries): - self._session_logged_in = await self._login("Legacy") - if self._session_logged_in: - break - if i > tries: - _LOGGER.error("Login failed after %s tries", tries) - return False - await asyncio.sleep(random() * 5) - - if not self._session_logged_in: - return False - - _LOGGER.debug("Successfully logged in") - self._session_tokens["identity"] = self._session_tokens["Legacy"].copy() - - # Get list of vehicles from account - _LOGGER.debug("Fetching vehicles associated with account") - self._session_headers.pop("Content-Type", None) - loaded_vehicles = await self.get(url=f"{BASE_API}/vehicle/v2/vehicles") - # Add Vehicle class object for all VIN-numbers from account - if loaded_vehicles.get("data") is not None: - _LOGGER.debug("Found vehicle(s) associated with account") - self._vehicles = [] - for vehicle in loaded_vehicles.get("data"): - self._vehicles.append(Vehicle(self, vehicle.get("vin"))) - else: - _LOGGER.warning("Failed to login to Volkswagen Connect API") - self._session_logged_in = False - return False - - # Update all vehicles data before returning - await self.update() - return True - - async def get_openid_config(self): - """Get OpenID config.""" - if self._session_fulldebug: - _LOGGER.debug("Requesting openid config") - req = await self._session.get( - url=f"{BASE_API}/login/v1/idk/openid-configuration" - ) - if req.status != 200: - _LOGGER.error("Failed to get OpenID configuration, status: %s", req.status) - raise Exception("OpenID configuration error") - return await req.json() - - async def get_authorization_page(self, authorization_endpoint, client): - """Get authorization page (login page).""" - if self._session_fulldebug: - _LOGGER.debug( - 'Requesting authorization page from "%s"', authorization_endpoint - ) - self._session_auth_headers.pop("Referer", None) - self._session_auth_headers.pop("Origin", None) - _LOGGER.debug('Request headers: "%s"', self._session_auth_headers) - - try: - req = await self._session.get( - url=authorization_endpoint, - headers=self._session_auth_headers, - allow_redirects=False, - params={ - "redirect_uri": APP_URI, - "response_type": CLIENT[client].get("TOKEN_TYPES"), - "client_id": CLIENT[client].get("CLIENT_ID"), - "scope": CLIENT[client].get("SCOPE"), - }, - ) - - # Check if the response contains a redirect location - location = req.headers.get("Location") - if not location: - raise Exception( - f"Missing 'Location' header, payload returned: {await req.content.read()}" - ) - - ref = urljoin(authorization_endpoint, location) - if "error" in ref: - parsed_query = parse_qs(urlparse(ref).query) - error_msg = parsed_query.get("error", ["Unknown error"])[0] - error_description = parsed_query.get( - "error_description", ["No description"] - )[0] - _LOGGER.exception("Authorization error: %s", error_description) - raise Exception(error_msg) - - # If redirected, fetch the new location - req = await self._session.get( - url=ref, headers=self._session_auth_headers, allow_redirects=False - ) - - if req.status != 200: - raise Exception("Failed to fetch authorization endpoint") - - return await req.text() - - except Exception as e: - _LOGGER.warning("Error during fetching authorization page: %s", str(e)) - raise - - def extract_form_data(self, page_content, form_id): - """Extract form data from a page.""" - soup = bs4.BeautifulSoup(page_content, "html.parser") - form = soup.find("form", id=form_id) - if form is None: - _LOGGER.debug(f"Form with ID '{form_id}' not found.") - return None - return { - input_field["name"]: input_field["value"] - for input_field in form.find_all("input", type="hidden") - } - - def extract_state_token(self, page_content): - """Extract state token from a page.""" - soup = bs4.BeautifulSoup(page_content, "html.parser") - state_input = soup.select_one('input[name="state"]') - if not state_input or not state_input.get("value"): - _LOGGER.debug("State token not found.") - return None - return state_input["value"] - - def extract_password_form_data(self, soup): - """Extract password form data from a page.""" - pw_form = {} - for script in soup.find_all("script"): - if "src" in script.attrs or not script.string: - continue - script_text = script.string - - if "window._IDK" not in script_text: - continue # Skip scripts that don't contain relevant data - if re.match('"errorCode":"', script_text): - raise Exception("Error code found in script data.") - - pw_form["relayState"] = re.search( - '"relayState":"([a-f0-9]*)"', script_text - )[1] - pw_form["hmac"] = re.search('"hmac":"([a-f0-9]*)"', script_text)[1] - pw_form["email"] = re.search('"email":"([^"]*)"', script_text)[1] - pw_form["_csrf"] = re.search("csrf_token:\\s*'([^\"']*)'", script_text)[1] - - post_action = re.search('"postAction":\\s*"([^"\']*)"', script_text)[1] - client_id = re.search('"clientId":\\s*"([^"\']*)"', script_text)[1] - return pw_form, post_action, client_id - - raise Exception("Password form data not found in script.") - - async def post_form(self, session, url, headers, form_data, redirect=True): - """Post a form and check for success.""" - req = await session.post( - url, headers=headers, data=form_data, allow_redirects=redirect - ) - if not redirect and req.status == 302: - return req.headers["Location"] - if req.status != 200: - raise Exception("Form POST request failed.") - return await req.text() - - async def handle_login_with_password(self, session, url, auth_headers, form_data): - """Handle login with email and password.""" - return await self.post_form(session, url, auth_headers, form_data, False) - - async def follow_redirects(self, session, pw_url, redirect_location): - """Handle redirects.""" - ref = urljoin(pw_url, redirect_location) - max_depth = 10 - while not ref.startswith(APP_URI): - if max_depth == 0: - raise Exception("Too many redirects") - response = await session.get( - url=ref, headers=self._session_auth_headers, allow_redirects=False - ) - if "Location" not in response.headers: - _LOGGER.warning("Failed to find next redirect location") - raise Exception("Redirect error") - ref = urljoin(ref, response.headers["Location"]) - max_depth -= 1 - return ref - - async def _login(self, client="Legacy"): - """Login function.""" - - try: - # Clear cookies and reset headers - self._clear_cookies() - self._session_headers = HEADERS_SESSION.copy() - self._session_auth_headers = HEADERS_AUTH.copy() - - # Get OpenID configuration - openid_config = await self.get_openid_config() - authorization_endpoint = openid_config["authorization_endpoint"] - token_endpoint = openid_config["token_endpoint"] - auth_issuer = openid_config["issuer"] - - # Get authorization page - authorization_page = await self.get_authorization_page( - authorization_endpoint, client - ) - - # Extract form data - mailform = self.extract_form_data(authorization_page, "emailPasswordForm") - state_token = self.extract_state_token(authorization_page) - if mailform: - _LOGGER.debug("Legacy authentication found., client=" + str(client)) - mailform["email"] = self._session_auth_username - pe_url = auth_issuer + bs4.BeautifulSoup( - authorization_page, "html.parser" - ).find("form", id="emailPasswordForm").get("action") - - # POST email - # https://identity.vwgroup.io/signin-service/v1/{CLIENT_ID}/login/identifier - self._session_auth_headers["Referer"] = authorization_endpoint - self._session_auth_headers["Origin"] = auth_issuer - response_text = await self.post_form( - self._session, pe_url, self._session_auth_headers, mailform - ) - - # Extract password form data - response_soup = bs4.BeautifulSoup(response_text, "html.parser") - pw_form, post_action, client_id = self.extract_password_form_data( - response_soup - ) - - # Add password to form data - pw_form["password"] = self._session_auth_password - pw_url = f"{auth_issuer}/signin-service/v1/{client_id}/{post_action}" - - # POST password - self._session_auth_headers["Referer"] = pe_url - redirect_location = await self.handle_login_with_password( - self._session, pw_url, self._session_auth_headers, pw_form - ) - - # Handle redirects and extract tokens - redirect_response = await self.follow_redirects( - self._session, pw_url, redirect_location - ) - jwt_auth_code = parse_qs(urlparse(redirect_response).query)["code"][0] - elif state_token: - _LOGGER.debug( - "Legacy authentication not found. Trying new authentication flow., client=" + str(client) - ) - - # Do login - login_form = {} - login_form["username"] = self._session_auth_username - login_form["password"] = self._session_auth_password - login_form["state"] = state_token - login_url = f"{auth_issuer}/u/login?state={state_token}" - - redirect_location = await self.post_form( - self._session, - login_url, - self._session_auth_headers, - login_form, - False, - ) - - # Handle redirects and extract tokens - redirect_response = await self.follow_redirects( - self._session, auth_issuer, redirect_location - ) - jwt_auth_code = parse_qs(urlparse(redirect_response).query)["code"][0] - else: - _LOGGER.error( - "Unable to find valid login page." - "Try logging in to the portal: https://www.myvolkswagen.net/" - ) - return False - - # Exchange authorization code for tokens - token_body = { - "client_id": CLIENT[client].get("CLIENT_ID"), - "grant_type": "authorization_code", - "code": jwt_auth_code, - "redirect_uri": APP_URI, - } - - # Token endpoint - token_response = await self.post_form( - self._session, token_endpoint, self._session_auth_headers, token_body - ) - - # Store session tokens - self._session_tokens[client] = json_loads(token_response) - - # Verify tokens - if not await self.verify_tokens( - self._session_tokens[client].get("id_token", ""), "identity" - ): - _LOGGER.warning("User identity token could not be verified!") - else: - _LOGGER.debug("User identity token verified successfully, client=" + str(client)) - - # Mark session as logged in - self._session_logged_in = True - - except Exception as error: - _LOGGER.error("Login failed: %s", error) - self._session_logged_in = False - return False - self._session_headers["Authorization"] = ( - "Bearer " + self._session_tokens[client]["access_token"] - ) - return True - - async def _handle_action_result(self, response_raw): - response = await response_raw.json(loads=json_loads) - if not response: - raise Exception("Invalid or no response") - if response == 429: - return {"id": None, "state": "Throttled"} - request_id = response.get("data", {}).get("requestID", 0) - _LOGGER.debug("Request returned with request id: %s", request_id) - return {"id": str(request_id)} - - async def terminate(self): - """Log out from connect services.""" - _LOGGER.debug("Initiating logout") - await self.logout() - - async def logout(self): - """Logout, revoke tokens.""" - _LOGGER.debug("logout - revoke token") - self._session_headers.pop("Authorization", None) - - if self._session_logged_in: - if self._session_headers.get("identity", {}).get("identity_token"): - _LOGGER.debug("Revoking Identity Access Token") - - if self._session_headers.get("identity", {}).get("refresh_token"): - _LOGGER.debug("Revoking Identity Refresh Token") - params = {"token": self._session_tokens["identity"]["refresh_token"]} - await self.post( - "https://emea.bff.cariad.digital/login/v1/idk/revoke", data=params - ) - - # HTTP methods to API - async def _request(self, method, url, return_raw=False, **kwargs): - """Perform a query to the VW-Group API.""" - _LOGGER.debug('HTTP %s "%s"', method, url) - if kwargs.get("json", None): - _LOGGER.debug("Request payload: %s", kwargs.get("json", None)) - try: - async with self._session.request( - method, - url, - headers=self._session_headers, - timeout=aiohttp.ClientTimeout(total=TIMEOUT.seconds), - cookies=self._jarCookie, - raise_for_status=False, - **kwargs, - ) as response: - response.raise_for_status() - - # Update cookie jar - if self._jarCookie != "": - self._jarCookie.update(response.cookies) - else: - self._jarCookie = response.cookies - - # Update service status - await self.update_service_status(url, response.status) - - try: - if response.status == 204: - if return_raw: - res = response - else: - res = {"status_code": response.status} - elif response.status >= 200 or response.status <= 300: - res = await response.json(loads=json_loads) - else: - res = {} - _LOGGER.debug( - "Not success status code [%s] response: %s", - response.status, - response.text, - ) - except Exception: - res = {} - _LOGGER.debug( - "Something went wrong [%s] response: %s", - response.status, - response.text, - ) - if return_raw: - return response - return res - - if self._session_fulldebug: - _LOGGER.debug( - 'Request for "%s" returned with status code [%s], headers: %s, response: %s', - url, - response.status, - response.headers, - res, - ) - else: - _LOGGER.debug( - 'Request for "%s" returned with status code [%s]', - url, - response.status, - ) - - if return_raw: - res = response - return res - except aiohttp.client_exceptions.ClientResponseError as httperror: - # Update service status - await self.update_service_status(url, httperror.code) - raise httperror from None - except Exception as error: - # Update service status - await self.update_service_status(url, 1000) - raise error from None - - async def get(self, url, vin="", tries=0): - """Perform a get query.""" - try: - return await self._request(aiohttp.hdrs.METH_GET, url) - except aiohttp.client_exceptions.ClientResponseError as error: - if error.status == 400: - _LOGGER.error( - 'Got HTTP 400 "Bad Request" from server, this request might be malformed or not implemented' - " correctly for this vehicle" - ) - elif error.status == 401: - _LOGGER.warning( - 'Received "unauthorized" error while fetching data: %s', error - ) - self._session_logged_in = False - elif error.status == 429 and tries < MAX_RETRIES_ON_RATE_LIMIT: - delay = randint(1, 3 + tries * 2) - _LOGGER.debug( - "Server side throttled. Waiting %s, try %s", delay, tries + 1 - ) - await asyncio.sleep(delay) - return await self.get(url, vin, tries + 1) - elif error.status == 500: - _LOGGER.warning( - "Got HTTP 500 from server, service might be temporarily unavailable" - ) - elif error.status == 502: - _LOGGER.warning( - "Got HTTP 502 from server, this request might not be supported for this vehicle" - ) - else: - _LOGGER.error("Got unhandled error from server: %s", error.status) - return {"status_code": error.status} - - async def post(self, url, vin="", tries=0, return_raw=False, **data): - """Perform a post query.""" - try: - if data: - return await self._request( - aiohttp.hdrs.METH_POST, url, return_raw=return_raw, **data - ) - return await self._request(aiohttp.hdrs.METH_POST, url, return_raw=return_raw) - except aiohttp.client_exceptions.ClientResponseError as error: - if error.status == 429 and tries < MAX_RETRIES_ON_RATE_LIMIT: - delay = randint(1, 3 + tries * 2) - _LOGGER.debug( - "Server side throttled. Waiting %s, try %s", delay, tries + 1 - ) - await asyncio.sleep(delay) - return await self.post( - url, vin, tries + 1, return_raw=return_raw, **data - ) - raise - - async def put(self, url, vin="", tries=0, return_raw=False, **data): - """Perform a put query.""" - try: - if data: - return await self._request(aiohttp.hdrs.METH_PUT, url, return_raw=return_raw, **data) - return await self._request(aiohttp.hdrs.METH_PUT, url, return_raw=return_raw) - except aiohttp.client_exceptions.ClientResponseError as error: - if error.status == 429 and tries < MAX_RETRIES_ON_RATE_LIMIT: - delay = randint(1, 3 + tries * 2) - _LOGGER.debug( - "Server side throttled. Waiting %s, try %s", delay, tries + 1 - ) - await asyncio.sleep(delay) - return await self.put( - url, vin, tries + 1, return_raw=return_raw, **data - ) - raise - - # Update data for all Vehicles - async def update(self): - """Update status.""" - if not self.logged_in: - if not await self._login(): - _LOGGER.warning("Login for %s account failed!", BRAND) - return False - try: - if not await self.validate_tokens: - _LOGGER.warning( - "Session expired. Initiating new login for %s account", BRAND - ) - if not await self.doLogin(): - _LOGGER.warning("Login for %s account failed!", BRAND) - raise Exception(f"Login for {BRAND} account failed") - else: - _LOGGER.debug("Going to call vehicle updates") - # Get all Vehicle objects and update in parallell - updatelist = [vehicle.update() for vehicle in self.vehicles] - # Wait for all data updates to complete - await asyncio.gather(*updatelist) - - return True - except (OSError, LookupError, Exception) as error: - _LOGGER.warning("Could not update information: %s", error) - return False - - async def getPendingRequests(self, vin): - """Get status information for pending requests.""" - if not await self.validate_tokens: - return False - try: - response = await self.get( - f"{BASE_API}/vehicle/v1/vehicles/{vin}/pendingrequests" - ) - - if response: - response["refreshTimestamp"] = datetime.now(UTC) - return response - - except Exception as error: - _LOGGER.warning( - "Could not fetch information for pending requests, error: %s", error - ) - return False - - async def getOperationList(self, vin): - """Collect operationlist for VIN, supported/licensed functions.""" - if not await self.validate_tokens: - return False - try: - response = await self.get( - f"{BASE_API}/vehicle/v1/vehicles/{vin}/capabilities", "" - ) - if response.get("capabilities", False): - data = response - elif response.get("status_code", {}): - _LOGGER.warning( - "Could not fetch operation list, HTTP status code: %s", - response.get("status_code"), - ) - data = response - else: - _LOGGER.warning("Could not fetch operation list: %s", response) - data = {"error": "unknown"} - except Exception as error: - _LOGGER.warning("Could not fetch operation list, error: %s", error) - data = {"error": "unknown"} - return data - - async def getSelectiveStatus(self, vin, services): - """Get status information for specified services.""" - if not await self.validate_tokens: - return False - try: - response = await self.get( - f"{BASE_API}/vehicle/v1/vehicles/{vin}/selectivestatus?jobs={','.join(services)}", - "", - ) - - for service in services: - if not response.get(service): - _LOGGER.debug( - "Did not receive return data for requested service %s." - " (This is expected for several service/car combinations)", - service, - ) - - if response: - response.update({"refreshTimestamp": datetime.now(UTC)}) - return response - - except Exception as error: - _LOGGER.warning("Could not fetch selectivestatus, error: %s", error) - return False - - async def getVehicleData(self, vin): - """Get car information like VIN, nickname, etc.""" - if not await self.validate_tokens: - return False - try: - response = await self.get(f"{BASE_API}/vehicle/v2/vehicles", "") - - for vehicle in response.get("data"): - if vehicle.get("vin") == vin: - return {"vehicle": vehicle} - - _LOGGER.warning("Could not fetch vehicle data for vin %s", vin) - - except Exception as error: - _LOGGER.warning("Could not fetch vehicle data, error: %s", error) - return False - - async def wakeUpVehicle(self, vin): - """Wake up vehicle to send updated data to VW Backend.""" - if not await self.validate_tokens: - return False - try: - return await self.post( - f"{BASE_API}/vehicle/v1/vehicles/{vin}/vehiclewakeuptrigger", - json={}, - return_raw=True, - ) - - except Exception as error: - _LOGGER.warning("Could not refresh the data, error: %s", error) - return False - - async def get_request_status(self, vin, requestId, actionId=""): - """Return status of a request ID for a given section ID.""" - if self.logged_in is False: - if not await self.doLogin(): - _LOGGER.warning("Login for %s account failed!", BRAND) - raise Exception(f"Login for {BRAND} account failed") - try: - if not await self.validate_tokens: - _LOGGER.warning( - "Session expired. Initiating new login for %s account", BRAND - ) - if not await self.doLogin(): - _LOGGER.warning("Login for %s account failed!", BRAND) - raise Exception(f"Login for {BRAND} account failed") - - response = await self.getPendingRequests(vin) - - requests = response.get("data", []) - result = None - for request in requests: - if request.get("id", "") == requestId: - result = request.get("status") - - # Translate status messages to meaningful info - if result in ("in_progress", "queued", "fetched"): - status = "In Progress" - elif result in ("request_fail", "failed"): - status = "Failed" - elif result == "unfetched": - status = "No response" - elif result in ("request_successful", "successful"): - status = "Success" - elif result == "fail_ignition_on": - status = "Failed because ignition is on" - else: - status = result - except Exception as error: - _LOGGER.warning("Failure during get request status: %s", error) - raise Exception(f"Failure during get request status: {error}") from error - else: - return status - - async def check_spin_state(self): - """Determine SPIN state to prevent lockout due to wrong SPIN.""" - result = await self.get(f"{BASE_API}/vehicle/v1/spin/state") - remainingTries = result.get("remainingTries", None) - if remainingTries is None: - raise Exception("Couldn't determine S-PIN state.") - - if remainingTries < 3: - raise Exception( - "Remaining tries for S-PIN is < 3. Bailing out for security reasons. " - "To resume operation, please make sure the correct S-PIN has been set in the integration " - "and then use the correct S-PIN once via the Volkswagen app." - ) - - return True - - # Token handling # - @property - async def validate_tokens(self): - """Validate expiry of tokens.""" - _LOGGER.debug("validate_tokens") - idtoken = self._session_tokens["identity"]["id_token"] - atoken = self._session_tokens["identity"]["access_token"] - id_exp = jwt.decode( - idtoken, - options={"verify_signature": False, "verify_aud": False}, - algorithms=JWT_ALGORITHMS, - ).get("exp", None) - at_exp = jwt.decode( - atoken, - options={"verify_signature": False, "verify_aud": False}, - algorithms=JWT_ALGORITHMS, - ).get("exp", None) - id_dt = datetime.fromtimestamp(int(id_exp)) - at_dt = datetime.fromtimestamp(int(at_exp)) - now = datetime.now() - later = now + self._session_refresh_interval - - # Check if tokens have expired, or expires now - if now >= id_dt or now >= at_dt: - _LOGGER.warning("Tokens have expired. Try to fetch new tokens") - if await self.refresh_tokens(): - _LOGGER.debug("Successfully refreshed tokens") - else: - return False - # Check if tokens expires before next update - elif later >= id_dt or later >= at_dt: - _LOGGER.debug("Tokens about to expire. Try to fetch new tokens") - if await self.refresh_tokens(): - _LOGGER.debug("Successfully refreshed tokens") - else: - return False - return True - - async def verify_tokens(self, token, type, client="Legacy"): - """Verify JWT against JWK(s).""" - _LOGGER.debug("verify_tokens, type=" + str(type) + ", client=" + str(client)) - if type == "identity": - req = await self._session.get(url="https://identity.vwgroup.io/v1/jwks") - keys = await req.json() - audience = [ - CLIENT[client].get("CLIENT_ID"), - "VWGMBB01DELIV1", - "https://api.vas.eu.dp15.vwg-connect.com", - "https://api.vas.eu.wcardp.io", - ] - else: - _LOGGER.debug("Not implemented") - return False - try: - pubkeys = {} - for jwk in keys["keys"]: - kid = jwk["kid"] - if jwk["kty"] == "RSA": - pubkeys[kid] = jwt.algorithms.RSAAlgorithm.from_jwk(to_json(jwk)) - - token_kid = jwt.get_unverified_header(token)["kid"] - - pubkey = pubkeys[token_kid] - jwt.decode(token, key=pubkey, algorithms=JWT_ALGORITHMS, audience=audience) - except Exception as error: - _LOGGER.exception("Failed to verify token, error: %s", error) - return False - return True - - async def refresh_tokens(self): - """Refresh tokens.""" - _LOGGER.debug("refresh_tokens") - try: - tHeaders = { - "Accept-Encoding": "gzip, deflate, br", - "Connection": "keep-alive", - "Content-Type": "application/x-www-form-urlencoded", - "User-Agent": USER_AGENT, - "x-android-package-name": ANDROID_PACKAGE_NAME, - } - - body = { - "grant_type": "refresh_token", - "refresh_token": self._session_tokens["identity"]["refresh_token"], - "client_id": CLIENT["Legacy"]["CLIENT_ID"], - } - response = await self._session.post( - url="https://emea.bff.cariad.digital/login/v1/idk/token", - headers=tHeaders, - data=body, - ) - await self.update_service_status("token", response.status) - if response.status == 200: - tokens = await response.json() - # Verify Token - if not await self.verify_tokens(tokens["id_token"], "identity"): - _LOGGER.warning("Token could not be verified!") - _LOGGER.debug("refresh_tokens successful") - for token in tokens: - self._session_tokens["identity"][token] = tokens[token] - self._session_headers["Authorization"] = ( - "Bearer " + self._session_tokens["identity"]["access_token"] - ) - else: - _LOGGER.warning( - "Something went wrong when refreshing %s account tokens", BRAND - ) - return False - except Exception as error: - _LOGGER.warning("Could not refresh tokens: %s", error) - return False - else: - return True - - async def update_service_status(self, url, response_code): - """Update service status.""" - if response_code in [200, 204, 207]: - status = "Up" - elif response_code == 401: - status = "Unauthorized" - elif response_code == 403: - status = "Forbidden" - elif response_code == 429: - status = "Rate limited" - elif response_code == 1000: - status = "Error" - else: - status = "Down" - - if "vehicle/v2/vehicles" in url: - self._service_status["vehicles"] = status - elif "parkingposition" in url: - self._service_status["parkingposition"] = status - elif "/vehicle/v1/trips/" in url: - self._service_status["trips"] = status - elif "capabilities" in url: - self._service_status["capabilities"] = status - elif "selectivestatus" in url: - self._service_status["selectivestatus"] = status - elif "token" in url: - self._service_status["token"] = status - else: - _LOGGER.debug('Unhandled API URL: "%s"', url) - - async def get_service_status(self): - """Return list of service statuses.""" - _LOGGER.debug("Getting API status updates") - return self._service_status - - # Class helpers # - @property - def vehicles(self): - """Return list of Vehicle objects.""" - return self._vehicles - - @property - def logged_in(self): - """Return cached logged in state. - - Not actually checking anything. - """ - return self._session_logged_in - - def vehicle(self, vin): - """Return vehicle object for given vin.""" - return next( - ( - vehicle - for vehicle in self.vehicles - if vehicle.unique_id.lower() == vin.lower() - ), - None, - ) - - def hash_spin(self, challenge, spin): - """Convert SPIN and challenge to hash.""" - spinArray = bytearray.fromhex(spin) - byteChallenge = bytearray.fromhex(challenge) - spinArray.extend(byteChallenge) - return hashlib.sha512(spinArray).hexdigest() - - @property - async def validate_login(self): - """Check that we have a valid access token.""" - try: - if not await self.validate_tokens: - return False - except OSError as error: - _LOGGER.warning("Could not validate login: %s", error) - return False - else: - return True - - -class vwid(): - - connection = {} - - def __init__(self, session): - self.session = session - # self.log = logging.getLogger(__name__) - # self.connection = {} - - def set_vin(self, vin): - self.vin = vin - - def set_credentials(self, username, password): - self.username = username - self.password = password - - def set_jobs(self, jobs): - self.jobs_string = ','.join(jobs) - - async def get_status(self): - # error codes SOCERR-xx raised: - # SOCERR-00: general error - # SOCERR-01: login problem, username, password wrong, account locked, etc. - # SOCERR-02: vehicle not found in account, VIN wrong? - try: - async with aiohttp.ClientSession(headers={'Connection': 'keep-alive'}) as session: - _now = datetime.now(UTC).strftime('%Y-%m-%dT%H:%M:%SZ') - data = {} - data['charging'] = {} - data['charging']['batteryStatus'] = {} - data['charging']['batteryStatus']['value'] = {} - data['charging']['batteryStatus']['value']['currentSOC_pct'] = str(0) - data['charging']['batteryStatus']['value']['cruisingRangeElectric_km'] = str(0) - data['charging']['batteryStatus']['value']['carCapturedTimestamp'] = _now - data['charging']['batteryStatus']['value']['odometer'] = None - - _k = str(vwid.connection.keys()) - _LOGGER.debug(f"libvwid.get_status connections at entry: vwid.connections.keys={_k}") - _update_result = False - if self.username not in vwid.connection: - _LOGGER.debug(f"create new connection, key={self.username}") - vwid.connection[self.username] = Connection(session, self.username, self.password) - self._connection = vwid.connection[self.username] - vwid.connection[self.username]._session_tokens['identity'] = {} - vwid.connection[self.username]._session_tokens['Legacy'] = {} - for token in self.tokens: - vwid.connection[self.username]._session_tokens['identity'][token] = self.tokens[token] - vwid.connection[self.username]._session_tokens['Legacy'][token] = self.tokens[token] - _conn_reuse = False - else: - _LOGGER.debug(f"reuse existing connection, key={self.username}") - vwid.connection[self.username]._session = session - _conn_reuse = True - if not _conn_reuse: - _doLogin_result = await vwid.connection[self.username].doLogin() - _LOGGER.debug("after 1st doLogin, result=" + str(_doLogin_result)) - if _doLogin_result: - _update_result = True - else: - raise Exception(f"SOCERR-01: Login für User {self.username} fehlgeschlagen") - else: - _update_result = await vwid.connection[self.username].update() - _LOGGER.debug("after 1st connection.update without doLogin, result=" + str(_update_result)) - if not _update_result: - _doLogin_result = await vwid.connection[self.username].doLogin() - _LOGGER.debug("after 2nd doLogin, result=" + str(_doLogin_result)) - if _doLogin_result: - _update_result = await vwid.connection[self.username].update() - _LOGGER.debug("after 2nd connection.update, result=" + str(_update_result)) - else: - _LOGGER.error(f"retry doLogin for user {self.username} failed, exit") - raise Exception(f"SOCERR-01: Login für User {self.username} fehlgeschlagen") - if _update_result: - _LOGGER.debug("update/doLogin look OK, get results") - for vehicle in vwid.connection[self.username].vehicles: - _LOGGER.debug("vehicle loop: " + str(vehicle) + ", self.vin=" + str(self.vin)) - if str(vehicle) == str(self.vin): - _LOGGER.debug("vehicle loop match: " + str(vehicle) + ", self.vin=" + str(self.vin)) - soc = vehicle._states['charging']['batteryStatus']['value']['currentSOC_pct'] - range =\ - vehicle._states['charging']['batteryStatus']['value']['cruisingRangeElectric_km'] - ts = vehicle._states['charging']['batteryStatus']['value']['carCapturedTimestamp'] - odometer = vehicle._states['measurements']['odometerStatus']['value']['odometer'] - _LOGGER.debug("vehicle =" + str(vehicle)) - _LOGGER.debug("soc =" + str(soc)) - _LOGGER.debug("range =" + str(range)) - _LOGGER.debug("timestamp=" + str(ts)) - tsxx = ts.strftime('%Y-%m-%dT%H:%M:%SZ') - _LOGGER.debug("timestampxx=" + str(tsxx)) - data['charging']['batteryStatus']['value']['currentSOC_pct'] = str(soc) - data['charging']['batteryStatus']['value']['cruisingRangeElectric_km'] = str(range) - data['charging']['batteryStatus']['value']['carCapturedTimestamp'] = str(tsxx) - data['charging']['batteryStatus']['value']['odometer'] = str(odometer) - _LOGGER.debug("return data =" + to_json(data, indent=4)) - for token in vwid.connection[self.username]._session_tokens['identity']: - self.tokens[token] =\ - vwid.connection[self.username]._session_tokens['identity'][token] - _LOGGER.info("VWID: soc=" + str(soc)+", range=" + str(range) + "@" + str(tsxx) + - ', odometer=' + str(odometer)) - return data - else: - _LOGGER.error(f"SOCERR-02: Fahrzeug mit VIN {self.vin} nicht gefunden") - raise Exception(f"SOCERR-02: Fahrzeug mit VIN {self.vin} nicht gefunden") - else: - _t = f"SOCERR-00: Für User {self.username} und VIN {self.vin} wurden keine Daten empfangen." - _LOGGER.error(f"{_t}: get_status update failed") - raise Exception(_t) - except Exception as e: - _LOGGER.exception(f"get_status failed 0, exception={e}") - # if exception is a SOCERR reraise it, otherwise raise general SOCERR-00 - if "SOCERR" in str(e): - raise e - else: - _t = f"SOCERR-00: Für User {self.username} und VIN {self.vin} wurden keine Daten empfangen" - raise Exception(f"{_t} {e}") diff --git a/packages/modules/vehicles/vwid/soc.py b/packages/modules/vehicles/vwid/soc.py index 8fdb236392..6e35bef41b 100755 --- a/packages/modules/vehicles/vwid/soc.py +++ b/packages/modules/vehicles/vwid/soc.py @@ -1,37 +1,29 @@ -import aiohttp +# import aiohttp import logging -from asyncio import new_event_loop, set_event_loop -from typing import Union +# from asyncio import new_event_loop, set_event_loop +# from typing import Union from modules.common.abstract_device import DeviceDescriptor from modules.common.abstract_vehicle import VehicleUpdateData from modules.common.component_state import CarState from modules.common.configurable_vehicle import ConfigurableVehicle from modules.vehicles.vwid.config import VWId -from modules.vehicles.vwid import libvwid -from modules.vehicles.vwgroup.vwgroup import VwGroup +from modules.vehicles.vwid import libeuda +# from modules.vehicles.vwgroup.vwgroup import VwGroup log = logging.getLogger(__name__) -def create_vehicle(vehicle_config: VWId, vehicle: int): - def fetch() -> CarState: - nonlocal vw_group - - # async method, called from sync fetch_soc, required because libvwid expect async environment - async def _fetch_soc() -> Union[int, float, str]: - async with aiohttp.ClientSession() as session: - return await vw_group.request_data(libvwid.vwid(session)) - - loop = new_event_loop() - set_event_loop(loop) - soc, range, soc_ts, soc_tsX, odometer = loop.run_until_complete(_fetch_soc()) - return CarState(soc=soc, range=range, soc_timestamp=soc_tsX, odometer=odometer) +def fetch(vehicle_update_data: VehicleUpdateData, config: VWId, vehicle: int) -> CarState: + soc, range, soc_ts, soc_tsX, odometer = libeuda.fetch_soc(config, vehicle, vehicle_update_data) + log.debug(f"soc return: soc={soc}, range={range}, soc_ts={soc_ts}, soc_tsX={soc_tsX}, odometer={odometer}") + return CarState(soc=soc, range=range, soc_timestamp=soc_ts, odometer=odometer) - vw_group = VwGroup(vehicle_config, vehicle) +def create_vehicle(vehicle_config: VWId, vehicle: int): def updater(vehicle_update_data: VehicleUpdateData) -> CarState: - return fetch() + return fetch(vehicle_update_data, vehicle_config, vehicle) + return ConfigurableVehicle(vehicle_config=vehicle_config, component_updater=updater, vehicle=vehicle,