From ef6f6a8e7fe93afb8a47abe210f1305ebae91a5d Mon Sep 17 00:00:00 2001 From: rleidner Date: Thu, 28 Dec 2023 12:19:33 +0100 Subject: [PATCH] soc_smarteq: add 2FA, minor improvements and cleanup --- modules/soc_smarteq/README.txt | 25 +- modules/soc_smarteq/_secrets.py | 73 --- modules/soc_smarteq/main.sh | 19 +- modules/soc_smarteq/soc_smarteq_2fa.py | 499 ++++++++++++++++++ .../{soc_smarteq.py => soc_smarteq_pass.py} | 49 +- runs/atreboot.sh | 14 - runs/updateConfig.sh | 10 +- web/settings/modulconfiglp.php | 45 +- 8 files changed, 612 insertions(+), 122 deletions(-) delete mode 100755 modules/soc_smarteq/_secrets.py create mode 100755 modules/soc_smarteq/soc_smarteq_2fa.py rename modules/soc_smarteq/{soc_smarteq.py => soc_smarteq_pass.py} (93%) diff --git a/modules/soc_smarteq/README.txt b/modules/soc_smarteq/README.txt index f3d5404a0..78ea0831a 100755 --- a/modules/soc_smarteq/README.txt +++ b/modules/soc_smarteq/README.txt @@ -1,12 +1,25 @@ Das smart SOC Modul ist vom smartEQ Modul des iobroker inspiriert. Der js code wurde in python implementiert und etwas vereinfacht. -Es wird in OAUTh ein Token Refresh durchgeführt. -Da alle Token nur 2 Stunden gültig sind und der Refresh mit gültigem refresh_token erfolgen muss, -sollten die Intervalle in der Modulkonfiguration weniger als 2 Stunden betragen. +Das Modul unterstützt die Authentication mit Passwort oder 2-Factor-Authentication (2FA). +Die Passwort-Authentication wird benutzt wenn ein Passwort konfiguriert ist. +Wenn das Passwort leer ist, wird die 2FA benutzt, die in 2 Schritten durchgeführt wird. +Zu Beginn sind Passwort und PIN leer. +Das SOC-Modul fordert eine PIN (6-stellige Zahl) an. +Vom Authentication Server wird die PIN per email gesendet. +Die PIN ist vom Anwender in die Konfiguration des SOC-Moduls einzutragen. +Das SOC-Modul benutzt nach Speichern die PIN um die Authentication abzuschliessen. +Die PIN ist 15 Minuten gültig, daher ist dies zügig zu machen. + + +Für beide Methoden gilt: +Bei erfolgreicher Authentication wird vom Authentication Server ein refresh token und access token erzeugt. +Bei Ablauf des access token wird in OAUTh ein Token Refresh durchgeführt, der ein neues Paar token erzeugt. + +Der access Token ist nur 2 Stunden gültig. +Der refresh Token ist länger gültig, ist aber nur ein mal nutzbar. +Daher muss der refresh token immer gespeichert werden wenn der access token ungültig ist. +Daher ist es empfohlen, die Intervalle in der Modulkonfiguration auf weniger als 2 Stunden zu setzen. 90 Minuten im Standby und 10 Minuten während des Ladens haben sich im Test bewährt. -Wenn die Intervalle auf mehr als 2 Stunden konfiguriert sind, wird bei der nächsten Abfrage ein Login durchgeführt. -Das benötigt deutlich mehr Zeit und es können die Token anderer Sitzungen, z.B. der smart Control App, -ungültig werden und einen neuen Login benötigen mit Eingabe von User und Passwort. diff --git a/modules/soc_smarteq/_secrets.py b/modules/soc_smarteq/_secrets.py deleted file mode 100755 index 130434229..000000000 --- a/modules/soc_smarteq/_secrets.py +++ /dev/null @@ -1,73 +0,0 @@ -"""Generate cryptographically strong pseudo-random numbers suitable for -managing secrets such as account authentication, tokens, and similar. - -See PEP 506 for more information. -https://www.python.org/dev/peps/pep-0506/ - -""" - -__all__ = ['choice', 'randbelow', 'randbits', 'SystemRandom', - 'token_bytes', 'token_hex', 'token_urlsafe', - 'compare_digest', - ] - - -import base64 -import binascii -import os - -from hmac import compare_digest -from random import SystemRandom - -_sysrand = SystemRandom() - -randbits = _sysrand.getrandbits -choice = _sysrand.choice - -def randbelow(exclusive_upper_bound): - """Return a random int in the range [0, n).""" - if exclusive_upper_bound <= 0: - raise ValueError("Upper bound must be positive.") - return _sysrand._randbelow(exclusive_upper_bound) - -DEFAULT_ENTROPY = 32 # number of bytes to return by default - -def token_bytes(nbytes=None): - """Return a random byte string containing *nbytes* bytes. - - If *nbytes* is ``None`` or not supplied, a reasonable - default is used. - - >>> token_bytes(16) #doctest:+SKIP - b'\\xebr\\x17D*t\\xae\\xd4\\xe3S\\xb6\\xe2\\xebP1\\x8b' - - """ - if nbytes is None: - nbytes = DEFAULT_ENTROPY - return os.urandom(nbytes) - -def token_hex(nbytes=None): - """Return a random text string, in hexadecimal. - - The string has *nbytes* random bytes, each byte converted to two - hex digits. If *nbytes* is ``None`` or not supplied, a reasonable - default is used. - - >>> token_hex(16) #doctest:+SKIP - 'f9bf78b9a18ce6d46a0cd2b0b86df9da' - - """ - return binascii.hexlify(token_bytes(nbytes)).decode('ascii') - -def token_urlsafe(nbytes=None): - """Return a random URL-safe text string, in Base64 encoding. - - The string has *nbytes* random bytes. If *nbytes* is ``None`` - or not supplied, a reasonable default is used. - - >>> token_urlsafe(16) #doctest:+SKIP - 'Drmhze6EPcv0fN_81Bj-nA' - - """ - tok = token_bytes(nbytes) - return base64.urlsafe_b64encode(tok).rstrip(b'=').decode('ascii') diff --git a/modules/soc_smarteq/main.sh b/modules/soc_smarteq/main.sh index 24d921cc2..93bb2b07d 100755 --- a/modules/soc_smarteq/main.sh +++ b/modules/soc_smarteq/main.sh @@ -6,6 +6,8 @@ DMOD="EVSOC" CHARGEPOINT=$1 export OPENWBBASEDIR RAMDISKDIR MODULEDIR +export debug=1 + # check if config file is already in env if [[ -z "$debug" ]]; then echo "soc_smarteq: Seems like openwb.conf is not loaded. Reading file." @@ -24,6 +26,7 @@ case $CHARGEPOINT in fztype=$soc2type username=$soc2user password=$soc2pass + pin=$soc2pint vin=$soc2vin intervall=$(( soc2intervall * 6 )) intervallladen=$(( soc2intervallladen * 6 )) @@ -37,6 +40,7 @@ case $CHARGEPOINT in socfile="$RAMDISKDIR/soc" username=$soc_smarteq_username password=$soc_smarteq_passwort + pin=$soc_smarteq_pin vin=$soc_smarteq_vin intervall=$(( soc_smarteq_intervall * 6 )) intervallladen=$(( soc_smarteq_intervallladen * 6 )) @@ -69,14 +73,13 @@ incrementTimer(){ getAndWriteSoc(){ openwbDebugLog ${DMOD} 2 "Lp$CHARGEPOINT: Requesting SoC" echo 0 > $soctimerfile - #Prepare for secrets used in soc module libvwid in Python - if ! python3 -c "import secrets" &> /dev/null ; then - if [ ! -L $MODULEDIR/secrets.py ]; then - echo 'soc_vwid: enable local secrets.py...' - ln -s $MODULEDIR/_secrets.py $MODULEDIR/secrets.py - fi - fi - answer=$($MODULEDIR/soc_smarteq.py --user "$username" --password "$password" --vin "$vin" --chargepoint "$CHARGEPOINT" 2>>$RAMDISKDIR/soc.log) + + if [ "$password" != "" ] + then + answer=$($MODULEDIR/soc_smarteq_pass.py --user "$username" --password "$password" --vin "$vin" --chargepoint "$CHARGEPOINT" 2>>$RAMDISKDIR/soc.log) + else + answer=$($MODULEDIR/soc_smarteq_2fa.py --user "$username" --pin "$pin" --vin "$vin" --chargepoint "$CHARGEPOINT" 2>>$RAMDISKDIR/soc.log) + fi if [ $? -eq 0 ]; then # we got a valid answer echo $answer > $socfile diff --git a/modules/soc_smarteq/soc_smarteq_2fa.py b/modules/soc_smarteq/soc_smarteq_2fa.py new file mode 100755 index 000000000..c51cb0dda --- /dev/null +++ b/modules/soc_smarteq/soc_smarteq_2fa.py @@ -0,0 +1,499 @@ +#!/usr/bin/python3 + +from argparse import ArgumentParser +import requests +import json +import os +import time +import datetime +import logging +import copy +import urllib +import uuid + + +# Constants +BASE_URL = "https://id.mercedes-benz.com" +TOKEN_URL = BASE_URL + "/as/token.oauth2" +STATUS_URL_SMART = "https://oneapp.microservice.smart.mercedes-benz.com" +STATUS_URL_MERCEDES = "https://bff.emea-prod.mobilesdk.mercedes-benz.com" +SCOPE = "openid+profile+email+phone+ciam-uid+offline_access" +CLIENT_ID = "70d89501-938c-4bec-82d0-6abb550b0825" +GUID = "280C6B55-F179-4428-88B6-E0CCF5C22A7C" +ACCEPT_LANGUAGE = "de-de" +SSL_VERIFY_STATUS = True +LOGIN_APP_ID = "01398c1c-dc45-4b42-882b-9f5ba9f175f1" +COUNTRY_CODE = "DE" +X_APPLICATIONNAME_ECE = "mycar-store-ece" +RIS_APPLICATION_VERSION = "1.39.0 (2066)" +RIS_OS_NAME = "ios" +RIS_OS_VERSION = "16.5" +RIS_SDK_VERSION = "2.109.0" +X_LOCALE = "de-DE" +WEBSOCKET_USER_AGENT = "MyCar/1.30.1 (com.daimler.ris.mercedesme.ece.ios; build:1819; iOS 16.5.0) Alamofire/5.4.0" +STATUS_USER_AGENT = "Device: iPhone 6; OS-version: iOS_12.5.1; App-Name: smart EQ control; App-Version: 3.0;\ + Build: 202108260942; Language: de_DE" +CONTENT_TYPE_OAUTH = "application/x-www-form-urlencoded" +CONTENT_TYPE = "application/json" +ACCEPT = "*/*" +ACCEPT_LANGUAGE = "de-DE;q=1.0" + + +# helper functions +def nested_key_exists(element: dict, *keys: str) -> bool: + # Check if *keys (nested) exists in `element` (dict). + if not isinstance(element, dict): + raise AttributeError('nested_key_exists() expects dict as first argument - got type ' + str(type(element))) + if len(keys) == 0: + raise AttributeError('nested_key_exists() expects at least two arguments, one given.') + + _element = element + for key in keys: + try: + _element = _element[key] + except KeyError: + return False + return True + + +class smarteq: + def __init__(self, storeFile: str): + self.storeFile = storeFile + + self.log = logging.getLogger("soc_smarteq_2fa") + debug = os.environ.get('debug', '0') + LOGLEVEL = 'WARN' + if debug == '1': + LOGLEVEL = 'INFO' + if debug == '2': + LOGLEVEL = 'DEBUG' + RAMDISKDIR = os.environ.get("RAMDISKDIR", "undefined") + logFile = RAMDISKDIR+'/soc.log' + format = '%(asctime)s %(levelname)s:%(name)s:%(message)s' + datefmt = '%Y-%m-%d %H:%M:%S' + logging.basicConfig(filename=logFile, + filemode='a', + format=format, + datefmt=datefmt, + level=LOGLEVEL) + + # for testing send logs to console + if os.environ.get('logConsole', '0') == '1': + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(logging.Formatter(format, datefmt)) + self.log.addHandler(consoleHandler) + + # self.method keeps a high level trace of actions + self.method = '' + self.soc_ts = 'n/a' + + self._country_code = COUNTRY_CODE + self.session = requests.session() + + self.load_store() + self.oldTokens = copy.deepcopy(self.store['Tokens']) + + # set_authState + # authState: 'init' + # 'authenticated' + # 'tokenRequested' + # 'pinRequested' + # 'accessTokenExpired' + def set_authState(self, state: str): + if 'authState' in self.store: + old_state = self.store['authState'] + else: + old_state = 'n/a' + self.store['authState'] = state + self.log.debug('set_authState from ' + old_state + ' to ' + state) + + # initialize store structures when no store is available + def init_store(self): + self.store['Tokens'] = {} + self.store['Tokens']['access_token'] = "" + self.store['Tokens']['refresh_token'] = "" + # self.store['refresh_token_history'] = [] + self.store['refresh_timestamp'] = int(0) + self.set_authState('init') + self.store['last_pin_used'] = '' + + # load store from file, initialize store structure if no file exists + def load_store(self): + try: + tf = open(self.storeFile, 'r', encoding='utf-8') + self.store = json.load(tf) + if 'Tokens' not in self.store: + self.init_store() + tf.close() + except FileNotFoundError: + self.log.warning("init: store file not found, new 2FA authentication required") + self.store = {} + self.init_store() + self.set_authState('init') + except Exception as e: + self.log.debug("init: loading stored data failed, file: " + + self.storeFile + ", error=" + str(e)) + self.store = {} + self.init_store() + + # write store file + def write_store(self): + try: + tf = open(self.storeFile, 'w', encoding='utf-8') + except Exception as e: + self.log.debug("write_store_file: Exception " + str(e)) + os.system("sudo rm -f " + self.storeFile) + tf = open(self.storeFile, 'w', encoding='utf-8') + json.dump(self.store, tf, indent=4) + tf.close() + try: + os.chmod(self.storeFile, 0o777) + except Exception as e: + os.system("sudo chmod 0777 " + self.storeFile) + + # set username and pin + def set_credentials(self, username: str, pin: str): + self.username = username + self.pin = pin + + # set vin + def set_vin(self, vin: str): + self.vin = vin + + # set chargepoint number + def set_chargepoint(self, chargepoint: str): + self.chargepoint = chargepoint + + # send request for new pin to oauth server + def request_pin(self, email: str, nonce: str): + self.log.debug("Start request_pin: email=" + email + ", nonce=" + nonce) + + url = STATUS_URL_MERCEDES + "/v1/login" + d = { + "emailOrPhoneNumber": self.username, + "countryCode": self._country_code, + "nonce": nonce + } + data = json.dumps(d) + + headers = { + "Ris-Os-Name": RIS_OS_NAME, + "Ris-Os-Version": RIS_OS_VERSION, + "Ris-Sdk-Version": RIS_SDK_VERSION, + "X-Locale": X_LOCALE, + "Accept": ACCEPT, + "Accept-Language": ACCEPT_LANGUAGE, + "X-Trackingid": str(uuid.uuid4()), + "X-Sessionid": str(uuid.uuid4()), + "X-Requestid": str(uuid.uuid4()), + "device-id": str(uuid.uuid4()), + "User-Agent": WEBSOCKET_USER_AGENT, + "Content-Type": CONTENT_TYPE, + "X-Applicationname": X_APPLICATIONNAME_ECE, + "Ris-Application-Version": RIS_APPLICATION_VERSION, + "X-Authmode": "KEYCLOAK" + } + + self.log.info("request_pin-post: url=" + url + + ", data=" + json.dumps(data, indent=4) + + ", headers=" + json.dumps(headers, indent=4)) + response = self.session.post(url, data=data, headers=headers) + self.log.debug("Result request_pin%s", response) + self.set_authState('pinRequested') + return response + + # request new token set using pin + def request_new_token_set(self, user_input=None): + errors = {} + nonce = self.store['nonce'] + self.set_authState('tokenRequested') + self.store['last_pin_used'] = self.pin + try: + self.log.debug("calling request_access_token") + result = self.request_access_token(self.username, self.pin, nonce) + except Exception as error: + errors = error + self.log.error("Request token error: %s", errors) + if not errors: + self.log.debug("Token received: " + str(result)) + + # authenticate: request pin and get tokens + def authenticate(self): + errors = {} + nonce = str(uuid.uuid4()) + user_input = {} + user_input["nonce"] = nonce + + try: + self.log.debug("calling request_pin") + response = self.request_pin(self.username, nonce) + self.log.debug("request_pin done, response=" + str(response)) + self.log.debug("response.status_code = " + str(response.status_code)) + if response.status_code > 200: + errors["request_pin"] = "Authentication error " + str(response.status_code) + except Exception as error: + errors = error + self.log.error("Request PIN error: %s", errors) + + # request_access_token - part of 2FA process + def request_access_token(self, email: str, pin: str, nonce: str): + self.log.debug("enter request_access_token: email=" + email + ", pin=" + pin + ", nonce=" + nonce) + self.method += " 3-request_access_token" + url = TOKEN_URL + encoded_email = urllib.parse.quote_plus(email, safe="@") + + data = ( + "client_id=" + LOGIN_APP_ID + + "&grant_type=password&username=" + encoded_email + + "&password=" + nonce + ":" + pin + + "&scope=" + SCOPE + ) + + headers = { + "X-Applicationname": X_APPLICATIONNAME_ECE, + "Ris-Application-Version": RIS_APPLICATION_VERSION, + "Content-Type": CONTENT_TYPE_OAUTH, + "Stage": "prod", + "X-Device-Id": str(uuid.uuid4()), + "X-Request-Id": str(uuid.uuid4()) + } + self.log.debug("request_access_token-post: url=" + url + + ", data=" + json.dumps(data, indent=4) + + ", headers=" + json.dumps(headers, indent=4)) + try: + token_info = self.session.post(url, data=data, headers=headers) + self.log.debug("request_access_token.status_code = " + str(token_info.status_code)) + Tokens = json.loads(token_info.text) + if not Tokens['access_token']: + self.log.warning("request_access_token failed") + return None + self.log.debug("Tokens=\n" + json.dumps(Tokens, indent=4)) + self.store['Tokens'] = Tokens + + if token_info is not None: + ts = int(time.time()) + self.store['refresh_timestamp'] = ts + self.store['refresh_time'] = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.set_authState('authenticated') + self.write_store() + return token_info + except Exception as e: + self.log.error("request_access_token exception: " + str(e)) + return None + + # refresh_token using existing refresh_token + def refresh_token(self, refresh_token: str) -> dict: + self.method += " 2-refresh_token" + + url = TOKEN_URL + + headers = { + "Accept": ACCEPT, + "User-Agent": "sOAF/202108260942 CFNetwork/978.0.7 Darwin/18.7.0", + "Accept-Language": ACCEPT_LANGUAGE, + "Authorization": "Bearer " + self.store['Tokens']['refresh_token'], + "Content-Type": CONTENT_TYPE_OAUTH, + } + + data = {'grant_type': 'refresh_token', + 'refresh_token': refresh_token} + + try: + response = self.session.post(url, + headers=headers, + data=data, + verify=True, + allow_redirects=False, + timeout=(30, 30)) + self.log.debug("refresh_token.status_code = " + str(response.status_code)) + self.log.debug("refresh_token.text = " + str(response.text)) + except Exception as e: + self.log.error("refresh_token exception: " + str(e)) + resperr = {} + resperr.status_code = 500 + resperr.test = str(e) + return resperr + return response + + # refresh access_token + def refresh_access_token(self) -> dict: + response = self.refresh_token(self.store['Tokens']['refresh_token']) + self.log.debug("refresh_access_token.status_code = " + str(response.status_code)) + + if response.status_code > 200: + self.log.warning("refresh_access_token failed, start 2FA process") + self.set_authState('init') + return None + else: + self.log.debug("refresh_access_token.text = " + str(response.text)) + Tokens = json.loads(response.text) + self.log.debug("Tokens=\n" + json.dumps(self.store['Tokens'], indent=4)) + + if Tokens['access_token']: + self.store['Tokens'] = Tokens + ts = int(time.time()) + self.store['refresh_timestamp'] = ts + self.store['refresh_time'] = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') + self.set_authState('authenticated') + self.write_store() + return self.store['Tokens']['access_token'] + + # get Soc of Vehicle + def get_status(self, vin: str) -> int: + self.method += " 1-get_status" + + url = STATUS_URL_SMART + "/seqc/v0/vehicles/" + vin + "/refresh-data" + + headers = { + "Accept": ACCEPT, + "Accept-Language": ACCEPT_LANGUAGE, + "Authorization": "Bearer " + self.store['Tokens']['access_token'], + "X-Applicationname": CLIENT_ID, + "User-Agent": STATUS_USER_AGENT, + "Guid": GUID + } + # soc = -1 + try: + response = self.session.get(url, headers=headers, verify=SSL_VERIFY_STATUS) + res = json.loads(response.text) + res_json = json.dumps(res, indent=4) + self.log.debug("get_status: result json:\n" + res_json) + if nested_key_exists(res, 'precond', 'data', 'soc', 'value'): + res_json = json.dumps(res['precond']['data']['soc'], indent=4) + try: + soc = res['precond']['data']['soc']['value'] + _ts = res['precond']['data']['soc']['ts'] + self.soc_ts = datetime.datetime.fromtimestamp(_ts).strftime('%Y-%m-%d %H:%M:%S') + self.log.debug("get_status: result json:\n" + res_json) + except Exception as e: + self.log.exception("get_status: exception0 e=" + str(e)) + soc = -1 + elif 'error' in res and res['error'] == 'unauthorized': + self.log.debug("get_status: access_token expired or invalid - try refresh") + self.log.debug("get_status: error - result json:\n" + res_json) + soc = -1 + else: + self.log.debug("get_status: unexpected error - response.status_code:" + str(response.status_code)) + self.log.debug("get_status: unexpected error - response.text:" + response.text) + self.log.debug("get_status: unexpected error - result json:\n" + res_json) + soc = -1 + + except Exception as e: + self.log.error("get_status: Exception1: " + str(e)) + self.log.error("get_status: result:\n" + res_json) + soc = -1 + if "Vehicle not found" in res_json: + soc = -2 + return soc + + # fetch_soc: execute next step dependent on authState + # authState: 'init' + # 'authenticated' + # 'pinRequested' + # 'tokenRequested' + # 'accessTokenExpired' + def fetch_soc(self: str, start_ts: float) -> int: + self.log.debug('fetch_soc/' + self.store['authState'] + + ': username=' + self.username + + ', pin=' + self.pin + + ', vin=' + self.vin) + self.Soc = -1 + while self.Soc == -1: + if self.store['authState'] == 'authenticated': + try: + if 'access_token' in self.store['Tokens']: + self.Soc = self.get_status(self.vin) + if self.Soc >= 0: + self.log.debug("fetch_soc/authenticated: 1st attempt successful") + + if self.Soc == -1: + self.set_authState('accessTokenExpired') + self.log.debug("fetch_soc/authenticated: get_status failed, refresh access_token ...") + self.store['Tokens']['access_token'] = self.refresh_access_token() + if self.store['authState'] == 'authenticated' and 'access_token' in self.store['Tokens']: + self.Soc = self.get_status(self.vin) + if self.Soc >= 0: + self.log.debug("fetch_soc/authenticated: 2nd attempt successful") + else: + self.log.warning("fetch_soc/authenticated: 2nd attempt failed - soc=" + str(self.Soc)) + else: + self.log.error("fetch_soc/authenticated: refresh_access_token failed") + self.Soc = -1 + elif self.Soc == -2: + self.log.error("fetch_soc/authenticated: failed, Vehicle not found, check VIN") + self.Soc = -1 + else: + self.log.debug("fetch_soc/authenticated: get_status 1st attempt success") + except Exception as e: + self.log.error("fetch_soc/authenticated: get_status exception, refresh_access_token ..." + str(e)) + self.store['Tokens']['access_token'] = self.refresh_access_token() + if self.store['authState'] == 'authenticated' and 'access_token' in self.store['Tokens']: + self.Soc = self.get_status(self.vin) + else: + self.log.error("fetch_soc/authenticated: refresh_access_token failed") + elapsed = time.time() - start_ts + self.log.info("Lp" + self.chargepoint + + " SOC: " + str(self.Soc) + '%' + + '@' + self.soc_ts + + ', Elapsed: ' + str(round(elapsed, 2)) + ' s' + + ', Method:' + self.method) + if self.store['Tokens'] != self.oldTokens: + self.log.debug("fetch_soc/authenticated: tokens changed, store token file") + self.write_store() + + # authState == pinRequested and pin != last_pin_used: get new token set + if self.store['authState'] == 'pinRequested': + self.log.debug('fetch_soc/pinRequested: old_pin = ' + self.store['last_pin_used'] + ', pin=' + self.pin) + if self.store['last_pin_used'] != self.pin: + self.log.debug('fetch_soc/pinRequested: call request_new_token_set') + self.request_new_token_set() + self.write_store() + self.Soc = -1 + else: + self.log.warning('fetch_soc/pinRequested: waiting for new pin in configuration') + self.Soc = 0 + + # authState == init: request pin + if self.store['authState'] == 'init' or self.pin == 'neu': + self.log.debug('fetch_soc/init: request_pin') + self.store['nonce'] = str(uuid.uuid4()) + self.request_pin(self.username, self.store['nonce']) + self.write_store() + self.Soc = 0 + + return self.Soc + + +# main program +def main(): + start_ts = time.time() + parser = ArgumentParser() + parser.add_argument("-v", "--vin", + help="VIN of vehicle", metavar="VIN", required=True) + parser.add_argument("-u", "--user", + help="user", metavar="user", required=True) + parser.add_argument("-p", "--pin", + help="pin", metavar="pin", required=True) + parser.add_argument("-c", "--chargepoint", + help="chargepoint", metavar="chargepoint", required=True) + args = vars(parser.parse_args()) + user_id = args['user'] + pin = args['pin'] + vin = args['vin'] + chargepoint = args['chargepoint'] + + OPENWBBASEDIR = os.environ.get("OPENWBBASEDIR", "undefined") + storeFile = OPENWBBASEDIR+'/soc_smarteq_store_lp'+chargepoint+'.json' + + Smart = smarteq(storeFile) + Smart.set_credentials(user_id, pin) + Smart.set_vin(vin) + Smart.set_chargepoint(chargepoint) + soc = Smart.fetch_soc(start_ts) + if soc == -1: + soc = 0 + print(soc) + + +if __name__ == "__main__": + main() diff --git a/modules/soc_smarteq/soc_smarteq.py b/modules/soc_smarteq/soc_smarteq_pass.py similarity index 93% rename from modules/soc_smarteq/soc_smarteq.py rename to modules/soc_smarteq/soc_smarteq_pass.py index 6b4e642e1..6bf144eb7 100755 --- a/modules/soc_smarteq/soc_smarteq.py +++ b/modules/soc_smarteq/soc_smarteq_pass.py @@ -7,17 +7,18 @@ import os import time import datetime -import pkce import logging import pickle import copy +import base64 +import re +import hashlib # Constants BASE_URL = "https://id.mercedes-benz.com" OAUTH_URL = BASE_URL + "/as/authorization.oauth2" LOGIN_URL = BASE_URL + "/ciam/auth/login" TOKEN_URL = BASE_URL + "/as/token.oauth2" -#STATUS_URL = "https://oneapp.microservice.smart.com" STATUS_URL = "https://oneapp.microservice.smart.mercedes-benz.com" REDIRECT_URI = STATUS_URL SCOPE = "openid+profile+email+phone+ciam-uid+offline_access" @@ -30,7 +31,6 @@ SSL_VERIFY_STATUS = True - # helper functions def nested_key_exists(element: dict, *keys: str) -> bool: # Check if *keys (nested) exists in `element` (dict). @@ -48,10 +48,19 @@ def nested_key_exists(element: dict, *keys: str) -> bool: return True +def generateCodeChallengePair() -> tuple: + code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') + code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) + code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() + code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') + code_challenge = code_challenge.replace('=', '') + return (code_verifier, code_challenge) + + class smarteq: def __init__(self, storeFile: str): self.storeFile = storeFile - self.log = logging.getLogger("soc_smarteq") + self.log = logging.getLogger("soc_smarteq_pass") debug = os.environ.get('debug', '0') LOGLEVEL = 'WARN' if debug == '1': @@ -68,19 +77,24 @@ def __init__(self, storeFile: str): datefmt=datefmt, level=LOGLEVEL) + # for testing send logs to console + if os.environ.get('logConsole', '0') == '1': + consoleHandler = logging.StreamHandler() + consoleHandler.setFormatter(logging.Formatter(format, datefmt)) + self.log.addHandler(consoleHandler) + # self.method keeps a high level trach of actions self.method = '' self.soc_ts = 'n/a' # self.store is read from ramdisk at start and saved at end. # currently is contains: # Tokens: refresh- and access-tokens of OAUTH - # refresh_timestamp: epoch of last refresh_tokens. + # refresh_timestamp: epoch of last refresh_token. self.session = requests.session() self.load_store() self.oldTokens = copy.deepcopy(self.store['Tokens']) - self.init = True def load_store(self): try: @@ -131,7 +145,7 @@ def set_chargepoint(self, chargepoint: str): # ===== get resume string ====== def get_resume(self) -> str: response_type = "code" - self.code_verifier, self.code_challenge = pkce.generate_pkce_pair() + self.code_verifier, self.code_challenge = generateCodeChallengePair() self.code_challenge_method = "S256" url = OAUTH_URL + '?client_id=' + CLIENT_ID + '&response_type=' + response_type + '&scope=' + SCOPE url = url + '&redirect_uri=' + REDIRECT_URI @@ -261,7 +275,7 @@ def get_tokens(self) -> dict: # refresh tokens def refresh_tokens(self) -> dict: - self.method += " 2-refresh_tokens" + self.method += " 2-refresh_token" url = TOKEN_URL headers = { "Accept": "*/*", @@ -336,12 +350,8 @@ def reconnect(self) -> dict: # get Soc of Vehicle def get_status(self, vin: str) -> int: self.method += " 1-get_status" - if self.init: - url = STATUS_URL + "/seqc/v0/vehicles/" + vin +\ - "/init-data?requestedData=BOTH&countryCode=DE&locale=de-DE" - else: - url = STATUS_URL + "/seqc/v0/vehicles/" + vin + "/refresh-data" - self.init = False + + url = STATUS_URL + "/seqc/v0/vehicles/" + vin + "/refresh-data" headers = { "accept": "*/*", @@ -383,7 +393,7 @@ def get_status(self, vin: str) -> int: # 1. get_status via stored access_token # 2. if expired: refresh_access_token using id and refresh token, then get_status # 3. if refresh token expired: login, get tokens, then get_status - def fetch_soc(self) -> int: + def fetch_soc(self: str, start_ts: float) -> int: soc = -1 try: if 'refresh_token' in self.store['Tokens']: @@ -416,10 +426,13 @@ def fetch_soc(self) -> int: self.store['Tokens'] = self.reconnect() if 'access_token' in self.store['Tokens']: soc = self.get_status(self.vin) + + elapsed = time.time() - start_ts self.log.info("Lp" + self.chargepoint + " SOC: " + str(soc) + '%' + '@' + self.soc_ts + - ', Method: ' + self.method) + ', Elapsed: ' + str(round(elapsed, 2)) + ' s' + + ', Method:' + self.method) if self.store['Tokens'] != self.oldTokens: self.log.debug("reconnect: tokens changed, store token file") @@ -430,6 +443,8 @@ def fetch_soc(self) -> int: # main program def main(): + start_ts = time.time() + parser = ArgumentParser() parser.add_argument("-v", "--vin", help="VIN of vehicle", metavar="VIN", required=True) @@ -452,7 +467,7 @@ def main(): Smart.set_credentials(user_id, password) Smart.set_vin(vin) Smart.set_chargepoint(chargepoint) - soc = Smart.fetch_soc() + soc = Smart.fetch_soc(start_ts) print(soc) diff --git a/runs/atreboot.sh b/runs/atreboot.sh index 9be566580..86b645d58 100755 --- a/runs/atreboot.sh +++ b/runs/atreboot.sh @@ -294,20 +294,6 @@ at_reboot() { ln -s "$VWIDMODULEDIR/_secrets.py" "$VWIDMODULEDIR/secrets.py" fi fi - #Prepare for secrets used in soc module soc_smarteq in Python - SMARTEQMODULEDIR="$OPENWBBASEDIR/modules/soc_smarteq" - if python3 -c "import secrets" &>/dev/null; then - echo 'soc_smarteq: python3 secrets installed...' - if [ -L "$SMARTEQMODULEDIR/secrets.py" ]; then - echo 'soc_smarteq: remove local python3 secrets.py...' - rm "$SMARTEQMODULEDIR/secrets.py" - fi - else - if [ ! -L "$SMARTEQMODULEDIR/secrets.py" ]; then - echo 'soc_smarteq: enable local python3 secrets.py...' - ln -s "$SMARTEQMODULEDIR/_secrets.py" "$SMARTEQMODULEDIR/secrets.py" - fi - fi # update outdated urllib3 for Tesla Powerwall pip3 install --upgrade urllib3 diff --git a/runs/updateConfig.sh b/runs/updateConfig.sh index b2033f19d..bcc7fa8a7 100755 --- a/runs/updateConfig.sh +++ b/runs/updateConfig.sh @@ -1695,6 +1695,9 @@ updateConfig(){ if ! grep -Fq "soc2pin=" $ConfigFile; then echo "soc2pin=pin" >> $ConfigFile fi + if ! grep -Fq "soc2pint=" $ConfigFile; then + echo "soc2pint=''" >> $ConfigFile + fi if ! grep -Fq "lgessv1ip=" $ConfigFile; then echo "lgessv1ip=youripaddress" >> $ConfigFile fi @@ -2111,9 +2114,14 @@ updateConfig(){ fi if ! grep -Fq "soc_smarteq_passwort=" $ConfigFile; then echo "soc_smarteq_passwort=''" >> $ConfigFile - else + else sed -i "/soc_smarteq_passwort='/b; s/^soc_smarteq_passwort=\(.*\)/soc_smarteq_passwort=\'\1\'/g" $ConfigFile fi + if ! grep -Fq "soc_smarteq_pin=" $ConfigFile; then + echo "soc_smarteq_pin=''" >> $ConfigFile + else + sed -i "/soc_smarteq_pin='/b; s/^soc_smarteq_pin=\(.*\)/soc_smarteq_pin=\'\1\'/g" $ConfigFile + fi if ! grep -Fq "soc_smarteq_vin=" $ConfigFile; then echo "soc_smarteq_vin=VIN" >> $ConfigFile fi diff --git a/web/settings/modulconfiglp.php b/web/settings/modulconfiglp.php index 021535665..8ce966fd2 100644 --- a/web/settings/modulconfiglp.php +++ b/web/settings/modulconfiglp.php @@ -1300,6 +1300,13 @@ function visibility_kia_advanced() {
Für smart EQ Fahrzeuge. Es wird benötigt:
- smart Control Account aktiv
+ Für smart EQ Fahrzeuge. Es wird benötigt:
+ - smart Control Account aktiv
+ Das Modul unterstützt 2 Loginverfahren
+ - Login mit Passwort - wird benutzt wenn das Passwort nicht leer ist
+ - Login über 2FA/Pin - wird benutzt wenn das Password leer ist.
+ Bei 2FA wird nach Konfiguration ein Pin per Email übermittelt. Dieser PIN (6-stellig) ist in das Feld Pin einzutragen.
+ Wichtig: Der Pin ist nach Empfang nur 15 Minuten gültig.
@@ -1312,10 +1319,20 @@ function visibility_kia_advanced() {
-
+
- + Password des Logins. + +
+
+
+ +
+ + + PIN des Accounts.
+ Bei Smart EQ kommt die PIN (OTP Code) via Email.
@@ -3140,6 +3157,11 @@ function visibility_twcmanagerlp2_connection() {
Für smart EQ Fahrzeuge. Es wird benötigt:
- smart Control Account aktiv
+ Das Modul unterstützt 2 Loginverfahren
+ - Login mit Passwort - wird benutzt wenn das Passwort nicht leer ist
+ - Login über 2FA/Pin - wird benutzt wenn das Password leer ist.
+ Bei 2FA wird nach Konfiguration ein Pin per Email übermittelt. Dieser PIN (6-stellig) ist in das Feld Pin einzutragen.
+ Wichtig: Der Pin ist nach Empfang nur 15 Minuten gültig.
@@ -3626,7 +3648,22 @@ function visibility_i3_soccalclp2() {
- PIN des Accounts. + PIN des Accounts.
+ Bei Smart EQ kommt die PIN (OTP Code) via Email.
+
+
+
+
+
+
+
+
+ +
+ + + PIN des Accounts.
+ Bei Smart EQ kommt die PIN (OTP Code) via Email.
@@ -4381,6 +4418,7 @@ function display_socmodul1() { hideSection('#socmuser2'); hideSection('#socmpass2'); hideSection('#socmpin2'); + hideSection('#socmpin2t'); hideSection('#socmnone1'); hideSection('#socmhttp1'); hideSection('#socleaf1'); @@ -4458,6 +4496,7 @@ function display_socmodul1() { showSection('#socmsmarteqinfolp2'); showSection('#socmuser2'); showSection('#socmpass2'); + showSection('#socmpin2t'); showSection('#socmvin2'); showSection('#socmintervall2'); showSection('#socmintervallladen2');