diff --git a/helpers.py b/helpers.py index b11783a..5953b71 100644 --- a/helpers.py +++ b/helpers.py @@ -3,14 +3,68 @@ from datetime import datetime, timedelta from time import sleep from base64 import b64decode, binascii - +from typing import Optional, List, Union +from broadlink import Device +import socket import broadlink - -def get_device(): +# Default timeout in seconds for waiting for device responses +DEFAULT_TIMEOUT = 10 + +def get_local_ip() -> str: + """ + Get the local IP address by creating a temporary socket connection. + + Returns: + str: Local IP address, defaults to '127.0.0.1' if unable to determine + """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # Doesn't need to be reachable, just used to determine local IP + s.connect(('10.255.255.255', 1)) + local_ip = s.getsockname()[0] + except Exception: + local_ip = '127.0.0.1' + finally: + s.close() + return local_ip + +def get_device() -> Optional[Device]: + """ + Discovers and connects to Broadlink devices on the local network. + + Prompts user for: + - WiFi SSID + - WiFi password + - Subnet broadcast IP address + - Local IP address + + If multiple devices are found, allows user to select one. + + Returns: + Optional[Device]: Connected Broadlink device instance if successful, None otherwise + """ # discover availabile devices on the local network - devices = broadlink.discover(timeout=5) + # get ssid from user + ssid = input("Enter WiFi SSID (or press Enter to skip WiFi setup): ").strip() + if ssid: + # get network password from user + network_password = input("Enter WiFi Network Password: ").strip() + # get ip address from user + ip_address = input("Enter IP Address for your subnet broadcast (e.g. 192.168.0.255): ").strip() + + # Validate IP address format + if not all(x.isdigit() and 0 <= int(x) <= 255 for x in ip_address.split('.')): + print("Invalid IP address format") + return None + + broadlink.setup(ssid, network_password, 3, ip_address=ip_address) + + + + local_ip = get_local_ip() + devices = broadlink.discover(timeout=5, local_ip_address=local_ip) # counter for device number selection n = 0 @@ -47,7 +101,17 @@ def get_device(): return devices[sel - 1] -def get_packet(device, timeout=10): +def get_packet(device: Device, timeout: int = DEFAULT_TIMEOUT) -> Optional[str]: + """ + Attempt to receive an IR/RF packet from the device. + + Args: + device: Broadlink device instance + timeout: Maximum time to wait for packet in seconds + + Returns: + Optional[str]: Base64 encoded packet if received, None otherwise + """ device.auth() device.enter_learning() @@ -69,7 +133,19 @@ def get_packet(device, timeout=10): return None -def learn_command(device, command_lbl=None): +def learn_command(device: Device, command_lbl: Optional[str] = None) -> Optional[str]: + """ + Puts device in learning mode and waits for an IR/RF signal. + + Allows user to retry if no signal is received. + + Args: + device: Broadlink device instance + command_lbl: Optional label for the command being learned + + Returns: + Optional[str]: Base64 encoded packet if received and user doesn't quit, None otherwise + """ if command_lbl: prompt_txt = f"\n> Press button for {command_lbl}" else: @@ -104,23 +180,45 @@ def learn_command(device, command_lbl=None): return p -def send_command(device, packet=""): - device.auth() +def send_command(device: Device, packet: str = "") -> None: + """ + Sends an IR/RF command to the device. + + Continuously prompts for packets to send until user quits. + If initial packet is provided, sends that first. + + Args: + device: Broadlink device instance + packet: Optional initial packet to send (base64 encoded string) + + Returns: + None + + Raises: + binascii.Error: If packet has invalid base64 encoding + """ + try: + device.auth() + except Exception as e: + print(f"Authentication failed: {e}") + return while packet not in (["q", "Q"]): + try: + packet = ( + input("\nEnter IR/RF Packet to send or [Q] to quit: \n") + if packet == "" + else packet + ) - packet = ( - input("\nEnter IR/RF Packet to send or [Q] to quit: \n") - if packet == "" - else packet - ) + if packet.lower() == 'q': + break - try: - if packet not in ["Q", "q"]: - payload = b64decode(packet) - device.send_data(payload) - print("Packet sent\n") + payload = b64decode(packet) + device.send_data(payload) + print("Packet sent\n") except binascii.Error: - print("Packet not valid\n") - packet = input("\nEnter IR/RF Packet to send or [Q] to quit: \n") + print("Invalid base64 encoding in packet") + except Exception as e: + print(f"Error sending packet: {e}") diff --git a/send_commands_from_json.py b/send_commands_from_json.py new file mode 100644 index 0000000..acdbe93 --- /dev/null +++ b/send_commands_from_json.py @@ -0,0 +1,98 @@ +import json +import argparse +from base64 import b64decode +from broadlink import Device +from helpers import get_device, send_command + +def parse_args(): + parser = argparse.ArgumentParser(description='Send IR commands from a SmartIR JSON file') + parser.add_argument('filename', help='Path to the SmartIR JSON file') + return parser.parse_args() + +def load_commands(filename): + """Load and parse the SmartIR JSON file""" + try: + with open(filename, 'r') as f: + data = json.load(f) + return data + except FileNotFoundError: + print(f"Error: File {filename} not found") + return None + except json.JSONDecodeError: + print(f"Error: Invalid JSON in {filename}") + return None + +def main(): + # Parse command line arguments + args = parse_args() + + # Load commands from JSON file + data = load_commands(args.filename) + if not data: + return + + # Get Broadlink device + device = get_device() + if not device: + print("No device found") + return + + # Print available modes and commands + print("\nAvailable modes:") + for mode in data["operationModes"]: + print(f"- {mode}") + + print("\nAvailable fan speeds:") + for fan in data["fanModes"]: + print(f"- {fan}") + + print("\nTemperature range:") + print(f"Min: {data['minTemperature']}°C") + print(f"Max: {data['maxTemperature']}°C") + print(f"Step: {data['precision']}°C") + + # Main interaction loop + while True: + print("\nEnter command (or 'q' to quit):") + print("Format: ") + print("Example: cool auto 23") + print("For power off, just type: off") + + cmd = input("> ").strip().lower() + if cmd == 'q': + break + + if cmd == 'off': + code = data["commands"]["off"] + send_command(device, code) + continue + + try: + mode, fan, temp = cmd.split() + temp = float(temp) + + # Validate inputs + if mode not in data["operationModes"]: + print(f"Invalid mode. Available modes: {', '.join(data['operationModes'])}") + continue + + if fan not in data["fanModes"]: + print(f"Invalid fan speed. Available speeds: {', '.join(data['fanModes'])}") + continue + + if not (data["minTemperature"] <= temp <= data["maxTemperature"]): + print(f"Temperature must be between {data['minTemperature']} and {data['maxTemperature']}") + continue + + # Get the IR code + code = data["commands"][mode][fan][str(int(temp))] + send_command(device, code) + + except ValueError: + print("Invalid command format. Use: ") + except KeyError: + print("Command not found in the database") + +if __name__ == "__main__": + main() + \ No newline at end of file diff --git a/smartir_generator.py b/smartir_generator.py index 9a69b35..9d3a51a 100644 --- a/smartir_generator.py +++ b/smartir_generator.py @@ -1,31 +1,50 @@ import json import argparse +from typing import Dict, List, Union, Optional, Any, Literal +import broadlink # Added for type hints # my modules from helpers import get_device, learn_command +# Define custom types +JsonDict = Dict[str, Any] +CommandDict = Dict[str, Union[str, Dict[str, Dict[str, str]]]] +ActionType = Literal["redo", "stop", "continue"] +Temperature = Union[int, float, str] -def main(): +def main() -> None: # initialise some variables - suspend = False + suspend: bool = False args = parse_args() - json_config = args.json_file + json_config: str = args.json_file - # read json file to dict - with open(json_config, "r") as fp: - ac_dict = json.load(fp) + try: + # read json file to dict + with open(json_config, "r") as fp: + ac_dict: JsonDict = json.load(fp) + except (IOError, json.JSONDecodeError) as e: + print(f"Error reading JSON file: {e}") + return # get broadlink device - device = get_device() - - # get config values - min_temp = int(ac_dict.get("minTemperature", 18)) - max_temp = int(ac_dict.get("maxTemperature", 30)) - temp_step = int(ac_dict.get("precision", 1)) - op_modes = ac_dict.get("operationModes", ["cool", "heat"]) - fan_modes = ac_dict.get("fanModes", ["auto"]) - commands = ac_dict.get("commands", {}) + device: Optional[broadlink.Device] = get_device() + if device is None: + return + + # get config values with type validation + try: + min_temp: int = int(ac_dict.get("minTemperature", 18)) + max_temp: int = int(ac_dict.get("maxTemperature", 30)) + temp_step: int = int(ac_dict.get("precision", 1)) + if min_temp > max_temp or temp_step <= 0: + raise ValueError("Invalid temperature configuration") + except ValueError as e: + print(f"Error in temperature configuration: {e}") + return + op_modes: List[str] = ac_dict.get("operationModes", ["cool", "heat"]) + fan_modes: List[str] = ac_dict.get("fanModes", ["auto"]) + commands: CommandDict = ac_dict.get("commands", {}) # loop through operation modes for op_mode in ["off"] + op_modes: @@ -50,46 +69,50 @@ def main(): commands[op_mode][fan_mode] = {} # loop through temps - temp = min_temp + temp: int = min_temp while temp <= max_temp: - - # skip temp if already in config - if clean_temp(temp) in commands[op_mode][fan_mode]: - temp += temp_step - continue - - # label for command - if op_mode == "off": - lbl = "off" - else: - lbl = f"{op_mode}_{fan_mode}_{clean_temp(temp)}" - - # get packet - pkt = learn_command(device, lbl) - - # get next action - action = prompt_next_action() - - # save command to json if action is continue or stop - if action in ["continue", "stop"]: - - if op_mode == "off": - commands["off"] = pkt + try: + temp_key = clean_temp(temp) + if temp_key in commands[op_mode][fan_mode]: + temp += temp_step + continue + + # label for command + lbl: str = "off" if op_mode == "off" else f"{op_mode}_{fan_mode}_{temp_key}" + + # get packet + pkt: Optional[str] = learn_command(device, lbl) + if pkt is None: + print("Failed to learn command") + continue + + # get next action + action: ActionType = prompt_next_action() + + # save command to json if action is continue or stop + if action in ["continue", "stop"]: + if op_mode == "off": + commands["off"] = pkt + else: + commands[op_mode][fan_mode][temp_key] = pkt + + if not update_json(json_config, commands): + print("Failed to update JSON file") + return + + # break out if action was stop + if action == "stop": + suspend = True + break + + # increment temp + temp += temp_step + # redo command if action is redo else: - commands[op_mode][fan_mode][clean_temp(temp)] = pkt - - update_json(json_config, commands) - - # break out if action was stop - if action == "stop": - suspend = True - break - - # increment temp - temp += temp_step + continue - # redo command if action is redo - else: + except ValueError as e: + print(f"Error processing temperature: {e}") continue if op_mode == "off": @@ -99,11 +122,11 @@ def main(): break -def prompt_next_action(): +def prompt_next_action() -> ActionType: """Prompt user for next action Returns: - str: one of "redo", "stop" or "continue" + ActionType: one of "redo", "stop" or "continue" """ sel = input(f"Press:\n[ENTER] to continue\n[R] to redo last command\n[S] to stop\n") @@ -116,19 +139,33 @@ def prompt_next_action(): return "continue" -def update_json(json_config_file, commands_dict): +def update_json(json_config_file: str, commands_dict: CommandDict) -> bool: + try: + with open(json_config_file, "r") as fp: + config_dict: JsonDict = json.load(fp) - with open(json_config_file, "r") as fp: - config_dict = json.load(fp) + config_dict["commands"] = commands_dict - config_dict["commands"] = commands_dict + with open(json_config_file, "w") as fp: + json.dump(config_dict, fp, indent=4) + except (IOError, json.JSONDecodeError) as e: + print(f"Error updating JSON file: {e}") + return False + return True - with open(json_config_file, "w") as fp: - json.dump(config_dict, fp, indent=4) +def clean_temp(temp: Temperature) -> str: + """Convert temperature to standardized string format -def clean_temp(temp): + Args: + temp: Temperature value to clean + Returns: + str: Cleaned temperature string + + Raises: + ValueError: If temperature format is invalid + """ if isinstance(temp, int): return str(temp) elif isinstance(temp, float): @@ -142,7 +179,7 @@ def clean_temp(temp): raise ValueError(f"Temperature: {temp} not valid") -def parse_args(): +def parse_args() -> argparse.Namespace: # cli arguments parser = argparse.ArgumentParser( description="Generates json file of Climate IR commands for SmartIR Home Assistant integration"