-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathwifi-client-match
executable file
·185 lines (154 loc) · 7.1 KB
/
wifi-client-match
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
import itertools as it, operator as op, functools as ft
from collections import namedtuple
from pathlib import Path
import os, sys, re, logging, time, contextlib
import dbus
class LogMessage:
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt
class LogStyleAdapter(logging.LoggerAdapter):
def __init__(self, logger, extra=None):
super(LogStyleAdapter, self).__init__(logger, extra or {})
def log(self, level, msg, *args, **kws):
if not self.isEnabledFor(level): return
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info'))
msg, kws = self.process(msg, kws)
self.logger._log(level, LogMessage(msg, args, kws), (), **log_kws)
get_logger = lambda name: LogStyleAdapter(logging.getLogger(name))
bus_service = 'fi.epitest.hostap.WPASupplicant'
bus_root = '/fi/w1/wpa_supplicant1'
def get_bus():
bus = getattr(get_bus, '_bus', None)
if not bus: bus = get_bus._bus = dbus.SystemBus()
return bus
def props(o, k, v=None, iface=None):
if not iface: iface = getattr(o, 'dbus_interface')
if isinstance(o, str): o = get_bus().get_object(bus_service, o)
props = dbus.Interface(o, 'org.freedesktop.DBus.Properties')
if v is not None: props.Set(iface, k, v)
else: return props.Get(iface, k)
def ddict(*args, dbus_sig='sv', **kws):
return dbus.Dictionary(dict(*args, **kws), signature=dbus_sig)
it_adjacent = lambda seq, n: it.zip_longest(*([iter(seq)] * n))
BSS = namedtuple('BSS', 'ssid bssid p')
def find_bss(wifi_iface, ssid_prefix, bssid_checked, ts_deadline):
bss = list()
log.debug('Finding BSS to connect to...')
while True:
for p in map(str, props(wifi_iface, 'BSSs')):
try:
ssid, bssid = (
props(p, k, iface='fi.w1.wpa_supplicant1.BSS')
for k in ['SSID', 'BSSID'] )
ssid = bytearray(ssid).decode()
bssid = ':'.join(a+b for a, b in it_adjacent(bytearray(bssid).hex(), 2))
if bssid in bssid_checked: continue
log.debug('Found new bss ({}, {}), ssid: {}', bssid, p, ssid)
if ssid.startswith(ssid_prefix): bss.append(BSS(ssid, bssid, p))
else: bssid_checked.add(bssid)
except Exception as err:
log.debug( 'Failed to process bssid info'
' for {!r}, skipping: [{}] {}', p, err.__class__.__name__, err )
continue
if bss:
bss = sorted(bss)[0]
bssid_checked.add(bss.bssid)
return bss
ts = time.time()
delay = max(1.0, props(wifi_iface, 'ScanInterval') / 2)
if ts_deadline is not None:
delay = min(delay, ts_deadline - ts)
if delay < 0: break
log.debug('BSSID-check delay: {:.1f}', delay)
time.sleep(delay)
def main(args=None):
import argparse
parser = argparse.ArgumentParser(
description='wpa_supplicant auto-connection script.')
parser.add_argument('ssid_prefix',
help='AP SSID prefix to match among available ones and connect to.'
' If several SSID matches are found, first one is selected in alphasort-order.')
parser.add_argument('psk_file', nargs='?',
help='Read PSK or passphrase from specified file'
' (only first line is used). If not specified, stdin will be used.')
parser.add_argument('-i', '--wifi-iface', metavar='iface',
help='WiFi interface name to use with wpa_supplicant, if none is registered there.'
' Default is to pick first one available as /sys/class/net/*/wireless in alphasort-order.')
parser.add_argument('-s', '--scan-opts',
default='exponential:3:30', metavar='spec',
help='WiFi AP autoscan options. Default: %(default)s')
parser.add_argument('-w', '--wait',
default=-1, type=float, metavar='seconds',
help='Seconds to wait for suitable SSID to appear. Negative value = no limit (default).')
parser.add_argument('-r', '--reconnect', action='store_true',
help='Force reconnection even if already associated with some network.')
parser.add_argument('-c', '--wait-confirm',
default=10, type=float, metavar='seconds',
help='Seconds to wait for connection to succeed'
' before discarding it and trying different bssid/ssid.'
' Negative value disables the check.')
parser.add_argument('--debug', action='store_true',
help='Verbose operation mode. Also sets wpa_supplicant to DebugLevel=debug.')
opts = parser.parse_args()
psk_src = open(opts.psk_file) if opts.psk_file else sys.stdin
try: psk = psk_src.readline().rstrip('\r\n')
finally: psk_src.close()
global log
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING)
log = get_logger('main')
wifi = dbus.Interface(get_bus().get_object(bus_service, bus_root), 'fi.w1.wpa_supplicant1')
if opts.debug: props(wifi, 'DebugLevel', 'debug')
wifi_iface = props(wifi, 'Interfaces')
if not wifi_iface:
if not opts.wifi_iface:
log.debug('Finding suitable iface to use with wpa_supplicant')
iface_names = list()
for iface in Path('/sys/class/net').iterdir():
if (iface / 'wireless').exists(): iface_names.append(iface.name)
if not iface_names: parser.error('Failed to detect any wireless interfaces')
iface = sorted(iface_names)[0]
else: iface = opts.wifi_iface
log.debug('Registering iface with wpa_supplicant: {!r}', iface)
wifi.CreateInterface(ddict(Ifname=iface))
wifi_iface = props(wifi, 'Interfaces')
wifi_iface = dbus.Interface(
get_bus().get_object(bus_service, wifi_iface[0]), 'fi.w1.wpa_supplicant1.Interface' )
log.debug('Using wifi interface: {}', props(wifi_iface, 'Ifname'))
connected = props(wifi_iface, 'State') in ['associated', 'completed']
if connected and not opts.reconnect:
log.debug('Detected existing connection, exiting')
return
for net in props(wifi_iface, 'Networks'): wifi_iface.RemoveNetwork(net)
if connected:
log.debug('Detected existing connection, disconnecting...')
with contextlib.suppress(dbus.exceptions.DBusException): wifi_iface.Disconnect()
props(wifi_iface, 'BSSExpireAge', dbus.UInt32(2**30-1)) # i.e. never
props(wifi_iface, 'ScanInterval', 1)
wifi_iface.Scan(ddict(Type='passive'))
bssid_checked, ts_deadline = set(), (time.time() + opts.wait) if opts.wait >= 0 else None
while True:
wifi_iface.AutoScan(opts.scan_opts)
bss = find_bss(wifi_iface, opts.ssid_prefix, bssid_checked, ts_deadline)
if not bss: break # timeout
wifi_iface.AutoScan('')
log.debug('Connecting to ssid/bssid: {} / {}', bss.ssid, bss.bssid)
# log.debug('SSID psk: {!r}', psk)
net = wifi_iface.AddNetwork(ddict(
ssid=bss.ssid, bssid=bss.bssid, key_mgmt='WPA-PSK', psk=psk ))
props(net, 'Enabled', True, iface='fi.w1.wpa_supplicant1.Network')
log.debug('Waiting for connection to succeed...')
connected, ts_confirm = False, time.time() + opts.wait_confirm
delay = min(5.0, max(1.0, abs(opts.wait_confirm) / 6))
while time.time() < ts_confirm:
connected = props(wifi_iface, 'State') in ['associated', 'completed']
if connected: break
time.sleep(delay)
if connected: break
else:
log.debug('Connection timed-out, discarding ssid/bssid: {} / {}', bss.ssid, bss.bssid)
wifi_iface.RemoveNetwork(net)
with contextlib.suppress(dbus.exceptions.DBusException): wifi_iface.Disconnect()
log.debug('Finished: {}', 'connected to {}'.format(bss.ssid) if bss else 'timeout')
return not connected
if __name__ == '__main__': sys.exit(main())