Skip to content

Updated AirStatus for current use #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
92 changes: 57 additions & 35 deletions main.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from bleak import discover
#!/usr/bin/env python3
from bleak import BleakScanner
from asyncio import new_event_loop, set_event_loop, get_event_loop
from time import sleep, time_ns
from time import sleep
from binascii import hexlify
from json import dumps
from sys import argv
from datetime import datetime
import sys
import asyncio

# Configure update duration (update after n seconds)
UPDATE_DURATION = 1
Expand All @@ -16,49 +19,63 @@
recent_beacons = []


def get_best_result(device):
recent_beacons.append({
def get_best_result(device, adv_data):
try:
from time import time_ns
except ImportError:
from datetime import datetime

def time_ns():
now = datetime.now()
return int(now.timestamp() * 1e9)

current_beacon = {
"time": time_ns(),
"device": device
})
"device": device,
"adv_data": adv_data
}
recent_beacons.append(current_beacon)
strongest_beacon = None
i = 0
while i < len(recent_beacons):
if(time_ns() - recent_beacons[i]["time"] > RECENT_BEACONS_MAX_T_NS):
recent_beacons.pop(i)
continue
if (strongest_beacon == None or strongest_beacon.rssi < recent_beacons[i]["device"].rssi):
strongest_beacon = recent_beacons[i]["device"]
if (strongest_beacon == None or strongest_beacon["adv_data"].rssi < recent_beacons[i]["adv_data"].rssi):
strongest_beacon = recent_beacons[i]
i += 1

if (strongest_beacon != None and strongest_beacon.address == device.address):
strongest_beacon = device
if (strongest_beacon != None and strongest_beacon["device"].address == device.address):
strongest_beacon = current_beacon

return strongest_beacon
return strongest_beacon["adv_data"]


# Getting data with hex format
async def get_device():
# Scanning for devices
devices = await discover()
for d in devices:
discovered_devices_and_advertisement_data = await BleakScanner.discover(return_adv=True)
for device, adv_data in discovered_devices_and_advertisement_data.values():
# Checking for AirPods
d = get_best_result(d)
if d.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in d.metadata['manufacturer_data']:
data_hex = hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(d.metadata['manufacturer_data'][AIRPODS_MANUFACTURER])))
adv_data = get_best_result(device, adv_data)
if adv_data.rssi >= MIN_RSSI and AIRPODS_MANUFACTURER in adv_data.manufacturer_data:
data_hex = hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER]))
data_length = len(hexlify(bytearray(adv_data.manufacturer_data[AIRPODS_MANUFACTURER])))
if data_length == AIRPODS_DATA_LENGTH:
return data_hex
return False


# Same as get_device() but it's standalone method instead of async
def get_data_hex():
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
if sys.version_info < (3, 7):
new_loop = new_event_loop()
set_event_loop(new_loop)
loop = get_event_loop()
a = loop.run_until_complete(get_device())
loop.close()
else:
a = asyncio.run(get_device())
return a


Expand All @@ -84,6 +101,8 @@ def get_data():
model = "AirPods1"
elif chr(raw[7]) == 'a':
model = "AirPodsMax"
elif chr(raw[7]) == '4':
model = "AirPodsPro2"
else:
model = "unknown"

Expand Down Expand Up @@ -130,19 +149,22 @@ def is_flipped(raw):
def run():
output_file = argv[-1]

while True:
data = get_data()

if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)

sleep(UPDATE_DURATION)
try:
while True:
data = get_data()

if data["status"] == 1:
json_data = dumps(data)
if len(argv) > 1:
f = open(output_file, "a")
f.write(json_data+"\n")
f.close()
else:
print(json_data)

sleep(UPDATE_DURATION)
except KeyboardInterrupt:
pass


if __name__ == '__main__':
Expand Down