Skip to content

Commit

Permalink
feat: Sync car state from Teslamate MQTT
Browse files Browse the repository at this point in the history
Since Teslamate doesn't map directly to TeslaCar from teslajsonpy,
we need to have special handling of both 'plugged_in' and 'state'
when they come from Teslamate. When we receive a new 'plugged_in'-value,
we check the `car.state` to determine the new 'charging_state'
in teslajsonpy, and similarly we need to check the `car.charging_state`
when Teslamate sends a new car-state.

Also refactor the MQTT-code slightly to avoid multiple implementations
of the code that updates the TeslaCar-object.
  • Loading branch information
dagstuan committed Oct 8, 2023
1 parent ed85fb2 commit 0b28502
Showing 1 changed file with 79 additions and 51 deletions.
130 changes: 79 additions & 51 deletions custom_components/tesla_custom/teslamate.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ def cast_km_to_miles(km_to_convert: float) -> float:
return miles


def cast_plugged_in(val: str) -> str:
"""Convert boolean string for plugged_in value."""
return "Connected" if cast_bool(val) else "Disconnected"


def cast_bool(val: str) -> bool:
"""Convert bool string to actual bool."""
return val.lower() in ["true", "True"]
Expand All @@ -74,6 +69,45 @@ def cast_speed(speed: int) -> int:
return int(speed_miles)


def cast_plugged_in(val: str, car: TeslaCar) -> str:
"""Casts new car plugged_in.
When receiving a new value here, we also need to check the
car state to see if it is charging or not before returning
a value to be set as 'charging_state'
"""
plugged_in = cast_bool(val)

logger.debug(
"Casting plugged_in. Current state: '%s', plugged_in: '%s'",
car.state,
plugged_in,
)

if plugged_in:
return "Charging" if car.state == "charging" else "Stopped"

return "Disconnected"


def cast_car_state(state: str, car: TeslaCar) -> (str, str):
"""Casts new car state to both state and charging_state.
Since Teslamate doesn't have a direct 'charging_state' we need both
'state' and 'plugged_in' to determine the charging state. This
method uses both to determine the new state and charging_state.
"""
logger.debug(
"Casting car state. Current charging_state '%s', new state: '%s'",
car.charging_state,
state,
)

if state == "charging":
return (state, "Charging")
return (state, "Stopped" if car.charging_state == "Stopped" else "Disconnected")


MAP_DRIVE_STATE = {
"latitude": ("latitude", float),
"longitude": ("longitude", float),
Expand Down Expand Up @@ -112,7 +146,6 @@ def cast_speed(speed: int) -> int:
"charger_voltage": ("charger_voltage", int),
"time_to_full_charge": ("time_to_full_charge", float),
"charge_limit_soc": ("charge_limit_soc", int),
"plugged_in": ("charging_state", cast_plugged_in),
"charge_port_door_open": ("charge_port_door_open", cast_bool),
"charge_current_request": ("charge_current_request", int),
"charge_current_request_max": ("charge_current_request_max", int),
Expand Down Expand Up @@ -316,68 +349,63 @@ async def async_handle_new_data(self, msg: ReceiveMessage):

if mqtt_attr in MAP_DRIVE_STATE:
attr, cast = MAP_DRIVE_STATE[mqtt_attr]
self.update_drive_state(car, attr, cast(msg.payload))
coordinator.async_update_listeners_debounced()
self.update_car_state(car, "drive_state", attr, cast(msg.payload))

elif mqtt_attr in MAP_VEHICLE_STATE:
attr, cast = MAP_VEHICLE_STATE[mqtt_attr]
self.update_vehicle_state(car, attr, cast(msg.payload))
coordinator.async_update_listeners_debounced()
self.update_car_state(car, "vehicle_state", attr, cast(msg.payload))

elif mqtt_attr in MAP_CLIMATE_STATE:
attr, cast = MAP_CLIMATE_STATE[mqtt_attr]
self.update_climate_state(car, attr, cast(msg.payload))
coordinator.async_update_listeners_debounced()
self.update_car_state(car, "climate_state", attr, cast(msg.payload))

elif mqtt_attr in MAP_CHARGE_STATE:
attr, cast = MAP_CHARGE_STATE[mqtt_attr]
self.update_charge_state(car, attr, cast(msg.payload))
coordinator.async_update_listeners_debounced()
self.update_car_state(car, "charge_state", attr, cast(msg.payload))

@staticmethod
def update_drive_state(car: TeslaCar, attr, value):
"""Update Drive State Safely."""
# pylint: disable=protected-access
logger.debug("Updating drive_state for VIN:%s", car.vin)
elif mqtt_attr == "plugged_in":
self.update_charging_state(car, cast_plugged_in(msg.payload, car))

if "drive_state" not in car._vehicle_data:
car._vehicle_data["drive_state"] = {}
elif mqtt_attr == "state":
(state, charging_state) = cast_car_state(msg.payload, car)
self.update_charging_state(car, charging_state)
self.update_car_state(car, None, "state", state)

drive_state = car._vehicle_data["drive_state"]
drive_state[attr] = value

@staticmethod
def update_vehicle_state(car, attr, value):
"""Update Vehicle State Safely."""
# pylint: disable=protected-access
logger.debug("Updating vehicle_state for VIN:%s", car.vin)
else:
# Nothing matched. Return without updating listeners.
return

if "vehicle_state" not in car._vehicle_data:
car._vehicle_data["vehicle_state"] = {}
coordinator.async_update_listeners_debounced()

vehicle_state = car._vehicle_data["vehicle_state"]
vehicle_state[attr] = value
def update_charging_state(self, car: TeslaCar, val: str):
"""Update charging state."""
self.update_car_state(car, "charge_state", "charging_state", val)

@staticmethod
def update_climate_state(car, attr, value):
"""Update Climate State Safely."""
def update_car_state(car: TeslaCar, sub_path: str, attr: str, value):
"""Update state safely."""
# pylint: disable=protected-access
logger.debug("Updating climate_state for VIN:%s", car.vin)

if "climate_state" not in car._vehicle_data:
car._vehicle_data["climate_state"] = {}
if sub_path is not None:
logger.debug(
"Updating state '%s' to value '%s' in sub_path '%s' for VIN:%s",
attr,
value,
sub_path,
car.vin,
)

climate_state = car._vehicle_data["climate_state"]
climate_state[attr] = value
if sub_path not in car._vehicle_data:
car._vehicle_data[sub_path] = {}

@staticmethod
def update_charge_state(car, attr, value):
"""Update Charge State Safely."""
# pylint: disable=protected-access
logger.debug("Updating charge_state for VIN:%s", car.vin)

if "charge_state" not in car._vehicle_data:
car._vehicle_data["charge_state"] = {}

charge_state = car._vehicle_data["charge_state"]
charge_state[attr] = value
state = car._vehicle_data[sub_path]
state[attr] = value
else:
logger.debug(
"Updating state '%s' to value '%s' in root for VIN:%s",
attr,
value,
car.vin,
)
state = car._car
state[attr] = value

0 comments on commit 0b28502

Please sign in to comment.