Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve device discovery #6

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 118 additions & 20 deletions helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,68 @@
from datetime import datetime, timedelta
from time import sleep
from base64 import b64decode, binascii

from typing import Optional, List, Union
from broadlink import Device
import socket
import broadlink


def get_device():
# Default timeout in seconds for waiting for device responses
DEFAULT_TIMEOUT = 10

def get_local_ip() -> str:
"""
Get the local IP address by creating a temporary socket connection.

Returns:
str: Local IP address, defaults to '127.0.0.1' if unable to determine
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Doesn't need to be reachable, just used to determine local IP
s.connect(('10.255.255.255', 1))
local_ip = s.getsockname()[0]
except Exception:
local_ip = '127.0.0.1'
finally:
s.close()
return local_ip

def get_device() -> Optional[Device]:
"""
Discovers and connects to Broadlink devices on the local network.

Prompts user for:
- WiFi SSID
- WiFi password
- Subnet broadcast IP address
- Local IP address

If multiple devices are found, allows user to select one.

Returns:
Optional[Device]: Connected Broadlink device instance if successful, None otherwise
"""

# discover availabile devices on the local network
devices = broadlink.discover(timeout=5)
# get ssid from user
ssid = input("Enter WiFi SSID (or press Enter to skip WiFi setup): ").strip()
if ssid:
# get network password from user
network_password = input("Enter WiFi Network Password: ").strip()
# get ip address from user
ip_address = input("Enter IP Address for your subnet broadcast (e.g. 192.168.0.255): ").strip()

# Validate IP address format
if not all(x.isdigit() and 0 <= int(x) <= 255 for x in ip_address.split('.')):
print("Invalid IP address format")
return None

broadlink.setup(ssid, network_password, 3, ip_address=ip_address)



local_ip = get_local_ip()
devices = broadlink.discover(timeout=5, local_ip_address=local_ip)

# counter for device number selection
n = 0
Expand Down Expand Up @@ -47,7 +101,17 @@ def get_device():
return devices[sel - 1]


def get_packet(device, timeout=10):
def get_packet(device: Device, timeout: int = DEFAULT_TIMEOUT) -> Optional[str]:
"""
Attempt to receive an IR/RF packet from the device.

Args:
device: Broadlink device instance
timeout: Maximum time to wait for packet in seconds

Returns:
Optional[str]: Base64 encoded packet if received, None otherwise
"""

device.auth()
device.enter_learning()
Expand All @@ -69,7 +133,19 @@ def get_packet(device, timeout=10):
return None


def learn_command(device, command_lbl=None):
def learn_command(device: Device, command_lbl: Optional[str] = None) -> Optional[str]:
"""
Puts device in learning mode and waits for an IR/RF signal.

Allows user to retry if no signal is received.

Args:
device: Broadlink device instance
command_lbl: Optional label for the command being learned

Returns:
Optional[str]: Base64 encoded packet if received and user doesn't quit, None otherwise
"""
if command_lbl:
prompt_txt = f"\n> Press button for {command_lbl}"
else:
Expand Down Expand Up @@ -104,23 +180,45 @@ def learn_command(device, command_lbl=None):
return p


def send_command(device, packet=""):
device.auth()
def send_command(device: Device, packet: str = "") -> None:
"""
Sends an IR/RF command to the device.

Continuously prompts for packets to send until user quits.
If initial packet is provided, sends that first.

Args:
device: Broadlink device instance
packet: Optional initial packet to send (base64 encoded string)

Returns:
None

Raises:
binascii.Error: If packet has invalid base64 encoding
"""
try:
device.auth()
except Exception as e:
print(f"Authentication failed: {e}")
return

while packet not in (["q", "Q"]):
try:
packet = (
input("\nEnter IR/RF Packet to send or [Q] to quit: \n")
if packet == ""
else packet
)

packet = (
input("\nEnter IR/RF Packet to send or [Q] to quit: \n")
if packet == ""
else packet
)
if packet.lower() == 'q':
break

try:
if packet not in ["Q", "q"]:
payload = b64decode(packet)
device.send_data(payload)
print("Packet sent\n")
payload = b64decode(packet)
device.send_data(payload)
print("Packet sent\n")

except binascii.Error:
print("Packet not valid\n")
packet = input("\nEnter IR/RF Packet to send or [Q] to quit: \n")
print("Invalid base64 encoding in packet")
except Exception as e:
print(f"Error sending packet: {e}")
98 changes: 98 additions & 0 deletions send_commands_from_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import json
import argparse
from base64 import b64decode
from broadlink import Device
from helpers import get_device, send_command

def parse_args():
parser = argparse.ArgumentParser(description='Send IR commands from a SmartIR JSON file')
parser.add_argument('filename', help='Path to the SmartIR JSON file')
return parser.parse_args()

def load_commands(filename):
"""Load and parse the SmartIR JSON file"""
try:
with open(filename, 'r') as f:
data = json.load(f)
return data
except FileNotFoundError:
print(f"Error: File {filename} not found")
return None
except json.JSONDecodeError:
print(f"Error: Invalid JSON in {filename}")
return None

def main():
# Parse command line arguments
args = parse_args()

# Load commands from JSON file
data = load_commands(args.filename)
if not data:
return

# Get Broadlink device
device = get_device()
if not device:
print("No device found")
return

# Print available modes and commands
print("\nAvailable modes:")
for mode in data["operationModes"]:
print(f"- {mode}")

print("\nAvailable fan speeds:")
for fan in data["fanModes"]:
print(f"- {fan}")

print("\nTemperature range:")
print(f"Min: {data['minTemperature']}°C")
print(f"Max: {data['maxTemperature']}°C")
print(f"Step: {data['precision']}°C")

# Main interaction loop
while True:
print("\nEnter command (or 'q' to quit):")
print("Format: <mode> <fan_speed> <temperature>")
print("Example: cool auto 23")
print("For power off, just type: off")

cmd = input("> ").strip().lower()
if cmd == 'q':
break

if cmd == 'off':
code = data["commands"]["off"]
send_command(device, code)
continue

try:
mode, fan, temp = cmd.split()
temp = float(temp)

# Validate inputs
if mode not in data["operationModes"]:
print(f"Invalid mode. Available modes: {', '.join(data['operationModes'])}")
continue

if fan not in data["fanModes"]:
print(f"Invalid fan speed. Available speeds: {', '.join(data['fanModes'])}")
continue

if not (data["minTemperature"] <= temp <= data["maxTemperature"]):
print(f"Temperature must be between {data['minTemperature']} and {data['maxTemperature']}")
continue

# Get the IR code
code = data["commands"][mode][fan][str(int(temp))]
send_command(device, code)

except ValueError:
print("Invalid command format. Use: <mode> <fan_speed> <temperature>")
except KeyError:
print("Command not found in the database")

if __name__ == "__main__":
main()

Loading