-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy patha7106.py
executable file
·384 lines (315 loc) · 14.8 KB
/
a7106.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/python3
import RPi.GPIO as GPIO
import spidev
import csv
import struct
import time
def load_csv_regs(filename):
regs = []
with open(filename) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
reg = {}
reg['address'] = int(row['address'])
reg['name'] = row['name']
reg['read'] = [row['read{}'.format(i)] for i in range(7,-1, -1)]
reg['write'] = [row['write{}'.format(i)] for i in range(7,-1, -1)]
regs.append(reg)
return regs
class RxError(Exception):
pass
class A7106:
pins = {
'cs':2, # CS
'ck':3, # SCK
'da':4, # MOSI
'io1':17, # MISO (after configuration)
'io2':27, # WTR (signals that a packet was received)
}
def __init__(self, id=0x930B51DE, channel=0, packet_len=64, pins={}, spi=[]):
""" Initialize the A7106 radio """
if spi != []:
self.spi_dev = spi
self.mode = 'spi'
self.pins = {'io2':7}
elif pins != {}:
self.pins = pins
self.mode = 'gpio'
else:
self.mode = 'gpio'
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
if self.mode == 'gpio':
GPIO.setup(self.pins['cs'], GPIO.OUT, initial = GPIO.HIGH)
GPIO.setup(self.pins['ck'], GPIO.OUT, initial = GPIO.LOW)
GPIO.setup(self.pins['da'], GPIO.OUT, initial = GPIO.LOW)
GPIO.setup(self.pins['io1'], GPIO.IN)
if self.mode == 'spi':
self.spi = spidev.SpiDev(*self.spi_dev)
self.spi.max_speed_hz = 10000000
GPIO.setup(self.pins['io2'], GPIO.IN)
GPIO.setwarnings(True)
self.regs = load_csv_regs('a7106_registers.csv')
self.setup()
self.set_channel(channel)
self.set_id(id)
self.set_packet_length(64)
def setup(self):
# These settings were derived from the recommended values in the datasheet, except where noted.
self.write_reg(0x00, 0b0) # Software reset
self.write_reg(0x01, 0b01100010) # Enable auto RSSI measurement, disable RF IF shift, NOTE: AIF inverted
self.write_reg(0x02, 0x00) # Set during calibration
self.write_reg(0x03, 0x3F) # Set maximum size fo RX mode
self.write_reg(0x04, 0b00000000) # Clear FPM and PSA to use FIFO simple mode
#self.write_reg(0x05, 0x00) # FIFO data, not needed for setup
#self.write_reg(0x06, 0x00) # ID Data, set later.
self.write_reg(0x07, 0x00)
self.write_reg(0x08, 0x00) # Wake on radio disabled
self.write_reg(0x09, 0x00) # Wake on radio disabled
#self.write_reg(0x0A, 0b10100010) # Configure CKO pin to output Fsysck
self.write_reg(0x0A, 0b00000000) # Configure CKO pin to off
self.write_reg(0x0B, 0b00011001) # Configure GIO1 pin for 4-wire SPI mode
self.write_reg(0x0C, 0b00000001) # Configure GIO2 pin to output WTR
self.write_reg(0x0D, 0b00000101) # Configure for 16MHz crystal, 500Kpbs data rate
self.write_reg(0x0E, 0b00000000) # Configure for 16MHz crystal, 500Kbps data rate
self.write_reg(0x0F, 0x00) # TODO: pass channel into config?
self.write_reg(0x10, 0b10011110) # Configure for 16MHz crystal, 500kbps data rate
self.write_reg(0x11, 0x4B) #recommended
self.write_reg(0x12, 0x00) # Configure for 16 MHz crystal, 500kbps data rate
self.write_reg(0x13, 0x02) # Configure for 16 MHz crystal, 500kbps data rate
self.write_reg(0x14, 0b00010110) # Recommended, disable TXSM moving average, disable TX modulation, disable filter
self.write_reg(0x15, 0b00101011) # TODO
self.write_reg(0x16, 0b00010010) # Recommended,
self.write_reg(0x17, 0b01001111) # Recommended, XTAL delay 600us, AGC delay 20us, RSSI delay 80us
self.write_reg(0x18, 0b01100011) # TODO: BWS=1
self.write_reg(0x19, 0b10000000) # Recommended, set pixer gain to 24dB, LNA gain to 24dB
self.write_reg(0x1A, 0x80) # TODO: Why (copied from sample code)
self.write_reg(0x1B, 0x00) # TODO: Why (copied from sample code)
self.write_reg(0x1C, 0b00001010) # Recommended
self.write_reg(0x1D, 0x32) # TODO: Why (copied from sample code)
self.write_reg(0x1E, 0b11000011) # Recommended TODO: this enables RSSI, what is the effect?
self.write_reg(0x1F, 0b00011111) # Disable whitening, enable FEC, CRC, id=4 bytes, preamble=4 bytes TODO: what is MCS
self.write_reg(0x20, 0b00010110) # PMD = 10 for 250/500 kbps signal rate
self.write_reg(0x21, 0x00) # Data whitening disabled
self.write_reg(0x22, 0b00000000) # Recommeded TODO: should MFB still be set?
#self.write_reg(0x23, 0x00) # Read only
self.write_reg(0x24, 0b00010011) # Recommended
#self.write_reg(0x25, 0x00)
self.write_reg(0x26, 0x23) # TODO: Why (copied from sample code)
self.write_reg(0x27, 0x00)
self.write_reg(0x28, 0b00010111) # TX output power: 0dBm TODO: verify me
self.write_reg(0x29, 0b01000111) # Reserved, DCM=10 for 250/500 Kbps
self.write_reg(0x2A, 0x80) # Recommended
self.write_reg(0x2B, 0b11010110) # Reserved, Recommended
self.write_reg(0x2C, 0b00000001) # Reserved
self.write_reg(0x2D, 0b01010001) # Reserved, TODO: PRS
self.write_reg(0x2E, 0b00011000) # Reserved
self.write_reg(0x2F, 0b00000000) # Recommended
self.write_reg(0x30, 0b00000001) # Reserved
self.write_reg(0x31, 0x0F) # Reserved
self.write_reg(0x32, 0x00) # Reserved
self.write_reg(0x32, 0b01111111) # Max ramping
# Calibration
self.write_reg(0x22, 0b00000000) # Set MFBS = 0
self.write_reg(0x24, 0b00000000) # Set MCVS = 0
self.write_reg(0x25, 0b00000000) # Set MVBS = 0
self.strobe(0b1011) # Enter PLL mode
self.write_reg(0x02, 0b00001111) # Enable IF Filter Bank
time.sleep(0.5) # Wait for calibration to finish
ccr = ord(self.read_reg(0x02))
if ccr & 0b00001111:
print('Error: calibrations failed to finish, CCR:0b{:08b} expected:0bxxxx0000'.format(ccr))
if_cal = ord(self.read_reg(0x22))
if if_cal & 0b00010000:
print('Error: IF filter auto calibration failed. if_cal:0b{:08b} expected:0xxx0xxxx'.format(if_cal))
# else:
# print('IF filter calibrated, FB:0b{:04b} FCD:{:05b}'.format(
# if_cal & 0b00001111,
# ord(self.read_reg(0x23)) & 0xb00011111,
# ))
vco_current_cal = ord(self.read_reg(0x24))
if vco_current_cal & 0b00010000:
print('Error: VCO current calibration failed. vco_cal:0b{:08b} expected:0bxxxx0xxx'.format(vco_current_cal))
# else:
# print('VCO current calibrated, VCB:0b{:04b}'.format(vco_current_cal & 0b00001111))
vco_cal = ord(self.read_reg(0x25))
if vco_cal & 0b00001000:
print('Error: VCO single bank calibration failed. vco_cal:0b{:08b} expected:0bxxxx0xxx'.format(vco_cal))
# else:
# print('VCO bank calibrated, VB:0b{:03b}'.format(vco_cal & 0b00000111))
def txrx(self, data, rx_len=0):
if self.mode == 'gpio':
#GPIO.setup(self.pins['da'], GPIO.OUT)
GPIO.output(self.pins['cs'], GPIO.LOW)
for d in data:
for b in '{:08b}'.format(d):
GPIO.output(self.pins['ck'], GPIO.LOW)
if b=='0':
GPIO.output(self.pins['da'], GPIO.LOW)
else:
GPIO.output(self.pins['da'], GPIO.HIGH)
GPIO.output(self.pins['ck'], GPIO.HIGH)
#GPIO.setup(self.pins['da'], GPIO.IN)
GPIO.output(self.pins['ck'], GPIO.LOW)
rx_data = bytearray()
for i in range(0, rx_len):
din = 0;
for b in range(0,8):
GPIO.output(self.pins['ck'], GPIO.HIGH)
din = din << 1
#if GPIO.input(self.pins['da']) == GPIO.HIGH:
if GPIO.input(self.pins['io1']) == GPIO.HIGH:
din = din | 1
GPIO.output(self.pins['ck'], GPIO.LOW)
rx_data.append(din)
GPIO.output(self.pins['cs'], GPIO.HIGH)
return rx_data
elif self.mode == 'spi':
if rx_len == 0:
self.spi.writebytes(data)
else:
data.extend(bytes(rx_len))
ret = self.spi.xfer(data)
rx_data = ret[(len(data)-rx_len):]
#print(len(data), rx_len, type(ret), len(ret), ret, type(rx_data), len(rx_data), rx_data)
return bytes(rx_data)
else:
raise Exception('Communications mode not configured correctly!')
def write_reg(self, address, data):
""" Write a value into a register. Accepts an integer from 0-255 for single byte writes, or a byte array for multiple byte writes """
if type(data) == int:
if (data > 255) or (data < 0):
raise Exception('Data value out of range, got:{} min:0 max:255'.format(data))
data = bytes([data])
d = bytearray()
d.append(address)
d.extend(data)
self.txrx(d)
def read_reg(self, address, len=1):
d = bytearray()
d.append(address | (1<<6))
return self.txrx(d,len)
def strobe(self, command):
""" Send the 4-bit strobe command """
d = bytearray()
d.append(command<<4)
return self.txrx(d)
def dump_regs(self):
for reg in self.regs:
val = ord(self.read_reg(reg['address']))
bits = list('{:08b}'.format(val))
vals = ', '.join(['{:}:{:}'.format(name,val) for name, val in zip(reg['read'], bits) if name != '' and name != '--'])
print('0x{:02x} ({:}): 0x{:02x} [{:}]'.format(
reg['address'],
reg['name'],
val,
vals
))
def set_channel(self, channel):
""" Set the RF channel. Frequency = (2400.001 + 0.5*channel)MHz """
self.write_reg(0x0F, channel)
def set_id(self, id):
""" Set the radio id, where ID is a 32-bit number """
val = struct.pack('>I',id)
self.write_reg(0x06, val)
# v = struct.unpack('>I', self.read_reg(0x06,len=4))[0]
#
# if(v != id):
# raise Exception('failed to set id, read:0x{:08x} expected:0x{:08x}'.format(v,id))
def set_packet_length(self, packet_length):
if (packet_length > 64) or (packet_length < 1):
raise Exception('Packet length out of range, got:{} min:1 max:64'.format(packet_length))
self.packet_length = packet_length
self.write_reg(0x03, packet_length-1)
def transmit(self, data):
""" Transmit a data packet """
if len(data) > self.packet_length:
raise Exception('packet data too long, length:{} maximum:{}'.format(len(data),self.packet_length))
if self.mode == 'spi':
# Send the whole TX sequence at once to avoid long turnarounds when calling the SPI driver.
# Especially due to this SPI bug: https://github.com/doceme/py-spidev/issues/117
cmd = bytearray()
cmd.append(0b1110 << 4) # Strobe: FIFO write pointer reset
cmd.append(0x05) # Write message register
cmd.extend(data) # data to transfer
cmd.extend(bytes(self.packet_length - len(data))) # Fill remaining space with 0s to avoid leaking stale data
self.spi.xfer(cmd)
self.strobe(0b1101) # TX
else:
payload = bytearray()
payload.extend(data)
payload.extend(bytes(self.packet_length-len(data)))
self.strobe(0b1110) # Fifo write pointer reset
self.write_reg(0x05, payload) # Write packet to FIFO
self.strobe(0b1101) # TX
while GPIO.input(self.pins['io2']) == GPIO.HIGH:
pass
def blocking_receive(self):
""" Wait for one packet to be recevied """
self.strobe(0b1100) # RX
while GPIO.input(self.pins['io2']) == GPIO.HIGH:
pass
if self.mode == 'spi':
# Send the whole RX sequence at once to avoid long turnarounds when calling the SPI driver.
# Especially due to this SPI bug: https://github.com/doceme/py-spidev/issues/117
cmd = bytearray()
cmd.append(0b1111 << 4) # Strobe: RX fifo read reset
cmd.append((1<<6) | 0x00) # Read register 0
cmd.append(0) # dummy byte for reading register 0
cmd.append((1<<6) | 0x05) # read message register
cmd.extend(bytes(self.packet_length)) # dummy bytes for reading message
ret = bytes(self.spi.xfer(cmd))
mode_reg = ret[2]
if mode_reg & 0b00100000:
raise RxError('CRC error on receive')
if mode_reg & 0b01000000:
raise RxError('FEC error on receive')
#return self.read_reg(0x05, len=self.packet_length)
return ret[4:]
else:
mode_reg = ord(self.read_reg(0x00))
if mode_reg & 0b00100000:
raise RxError('CRC error on receive')
if mode_reg & 0b01000000:
raise RxError('FEC error on receive')
self.strobe(0b1111) # RX FIFO read reset
return self.read_reg(0x05, len=self.packet_length)
if __name__ == "__main__":
import argparse
def tx_example():
from datetime import datetime
pins = {
'cs':10,
'ck':9,
'da':11,
'io1':5,
'io2':6,
}
r = A7106(pins=pins)
while True:
data = datetime.now().isoformat().encode('utf-8')
payload = bytearray()
payload.append(len(data))
payload.extend(data)
print('sending packet, data_length={} data={}'.format(len(data),data))
r.transmit(payload)
time.sleep(0.5)
def rx_example():
r = A7106()
while True:
try:
payload = r.blocking_receive()
l = int(payload[0])
data = payload[1:(l+1)]
print('got packet, data_length={} data={}'.format(len(data), data))
except RxError as e:
print(e)
parser = argparse.ArgumentParser(description='A7106 radio demonstrator')
parser.add_argument('--mode', help='Radio mode: tx or rx')
args = parser.parse_args()
if args.mode == 'tx':
tx_example()
elif args.mode == 'rx':
rx_example()
else:
print('invalid mode, exiting')