-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpimp-my-gimp.py
331 lines (282 loc) · 10.3 KB
/
pimp-my-gimp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
#!/usr/bin/env python
#########
# Pimp my Gimp application
#
# May be run standalone or as a system service.
# Must be run as 'sudo' as required by neopixel library.
#########
# threading
from gevent import monkey
monkey.patch_all()
import threading
# system libraries
import sys
import os
import math
from typing import Callable
# NeoPixels
import scootpixels
# Audio effects
import scootsound
# Odometer
import scootodometer
# Detect if running on a Raspberry Pi
import raspi_detect
# IO
if raspi_detect.is_raspi:
import board
else:
class board:
pin = int
D18: pin = 0
# webserver libraries
from flask import Flask, render_template
from flask import send_from_directory # send raw file
from flask import request
from flask_socketio import SocketIO, emit
# argument parser
import argparse
# NeoPixel communication pin
PIXEL_PIN = board.D18 # GPIO 18 / pin 12
# NeoPixel total number of NeoPixels in the array
PIXEL_COUNT = 163
# NeoPixel idle color
PIXEL_COLOR_IDLE = (0, 0, 64)
# Encoder GPIO pin
ENCODER_PIN = 12 # GPIO 12 / pin 32
# Encoder counts per revolution of the wheel
ENCODER_PULSES_PER_REV = 8
# Encoder pulses per linear foot
# wheel diameter is 7.5", so circumference is pi * 7.5
ENCODER_PULSES_PER_FOOT = float(ENCODER_PULSES_PER_REV) / (math.pi * (7.5/12.0))
# Time since last encoder pulse after which the speed is assumed to be zero
ENCODER_SPEED_ZERO_THRESHOLD_S = 1
# Encoder speed smoothing coefficient (for exponential moving average)
ENCODER_SMOOTHING = 0.75
# Program cache directory for persistent data
CACHE_DIR = "cache/"
# application entrypoint
if __name__ == '__main__':
# parse arguments
parser = argparse.ArgumentParser(
prog='pimp-my-gimp.py',
description='Webserver and controller for enhanced mobility devices.',
epilog='May your journey be illuminated.')
parser.add_argument(
"--no-audio",
action="store_true",
help="disable audio output")
parser.add_argument(
"--no-odometer",
action="store_true",
help="disable odometer")
parser.add_argument(
"--no-light",
action="store_true",
help="disable LED output")
args = parser.parse_args()
audio_enabled = True
if args.no_audio:
print("Audio output disabled")
audio_enabled = False
odometer_enabled = True
if args.no_odometer:
odometer_enabled = False
print("Odometer disabled")
pixels_enabled = True
if args.no_light:
pixels_enabled = False
print("Light disabled")
# Initialize Flask app and SocketIO
app = Flask(__name__)
app.config['TEMPLATES_AUTO_RELOAD'] = True
socketio = SocketIO(app, cors_allowed_origins = "*", async_mode = "gevent")
@app.route("/")
def index():
"""
Serve the index page.
:return: Rendered index.html template.
"""
print(f"Endpoint '/': Accessed by {request.remote_addr}")
return render_template('index.html')
@app.route("/favicon.ico")
def favicon():
"""
Serve the favicon.ico file.
:return: favicon.ico from the static/images directory.
"""
print(f"Endpoint '/favicon.ico': Accessed by {request.remote_addr}")
return send_from_directory(
os.path.join(app.root_path, 'static/images'),
'favicon.ico',
mimetype='image/vnd.microsoft.icon')
@app.route("/manifest.json")
def manifest():
"""
Serve the manifest.json file.
:return: manifest.json from the static directory.
"""
print(f"Endpoint '/manifest.json': Accessed by {request.remote_addr}")
return send_from_directory(
os.path.join(app.root_path, 'static'),
'manifest.json',)
@app.route("/disco")
def disco():
"""
Handle the disco route to initiate a disco effect with sound and lights.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/disco': Accessed by {request.remote_addr}")
thread = sounds.play(sounds.sound_disco)
pixels.disco(2, 0.5)
thread.join()
pixels.solid(PIXEL_COLOR_IDLE)
print("... Endpoint '/disco' complete")
return ""
@app.route("/fireplace")
def fireplace():
"""
Handle the fireplace effect route.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/fireplace': Accessed by {request.remote_addr}")
thread = sounds.play(sounds.sound_fireplace)
pixels.fireplace()
thread.join()
pixels.solid(PIXEL_COLOR_IDLE)
print("... Endpoint '/fireplace' complete")
return ""
@app.route("/underlight")
def underlight():
"""
Handle the underlight route to start the underlight effect.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/underlight': Accessed by {request.remote_addr}")
thread = sounds.play(sounds.sound_underlight)
pixels.underlight()
thread.join()
pixels.solid(PIXEL_COLOR_IDLE)
print("... Endpoint '/underlight' complete")
return ""
@app.route("/energyweapon")
def energyweapon():
"""
Handle the energyweaspon effect route.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/energyweapon': Accessed by {request.remote_addr}")
thread = sounds.play(sounds.sound_energyweapon)
pixels.energyweapon()
thread.join()
pixels.solid(PIXEL_COLOR_IDLE)
print("... Endpoint '/energyweapon' complete")
return ""
@app.route("/meltdown")
def meltdown():
"""
Handle the meltdown route to perform the meltdown effect with flashing lights.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/meltdown': Accessed by {request.remote_addr}")
for count in range(3):
thread = sounds.play(sounds.sound_meltdown)
pixels.flash((255,255,255), 2)
pixels.flash((255,0,0), 1)
thread.join()
pixels.solid(PIXEL_COLOR_IDLE)
print("... Endpoint '/meltdown' complete")
return ""
@app.route("/color")
def color():
"""
Handle the color route to set pixels to a user-specified color.
:return: An empty string response after the effect.
"""
print(f"Endpoint '/color': Accessed by {request.remote_addr}")
idle_color = "#{:02x}{:02x}{:02x}".format(*PIXEL_COLOR_IDLE)
hex_color = request.args.get('rgb', default=idle_color, type=str)
# Check if the hex color starts with '#', and remove it
if hex_color.startswith('#'):
hex_color = hex_color[1:]
# Check if the remaining string has a length of 6
if len(hex_color) != 6:
print("Error: Invalid hex color length. Must be 6 characters long.")
return
# Check if all characters are valid hexadecimal digits
if not all(c in '0123456789abcdefABCDEF' for c in hex_color):
print("Error: Invalid hex color. Contains non-hexadecimal characters.")
print("Color selected: " + hex_color)
# Convert the characters from hex to integers
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
pixels.solid((r,g,b))
print("... Endpoint '/color' complete")
return ""
@app.route("/lights-out")
def lights_out():
"""
Handle the lights-out route to turn off all lights.
:return: An empty string response after turning off the lights.
"""
print(f"Endpoint '/lights-out': Accessed by {request.remote_addr}")
sounds.play(sounds.sound_lights_out).join()
pixels.off()
print("... Endpoint '/lights-out' complete")
return ""
@socketio.on('connect', namespace='/trajectory')
def trajectory_connect():
"""
Handle websocket connection for the /trajectory namespace.
Logs the IP address of the client that made the connection.
:return: None.
"""
client_ip = request.remote_addr # Gets the client's IP address
print(f"WebSocket client connected from {client_ip}: /trajectory")
print("Reading odometer cache.")
odometer_cache = scootodometer.ScootOdometerCache(CACHE_DIR + "odometer.ini")
print("... read last known position " + str(odometer_cache.get_distance()))
print("Initializing pixels")
pixels = scootpixels.ScootPixels(PIXEL_PIN, PIXEL_COUNT, pixels_enabled)
pixels.tricolor()
pixels.solid(PIXEL_COLOR_IDLE)
print("... pixels initialized")
print("Initializing odometer")
odometer = scootodometer.ScootOdometer(ENCODER_PIN,
ENCODER_SMOOTHING,
ENCODER_SPEED_ZERO_THRESHOLD_S,
odometer_cache.get_distance(),
odometer_enabled)
# WebSocket emit on encoder pulses
odometer.register_callback(lambda timestamp, position, speed, socketio = socketio :
socketio.emit('newdata', {
'timestamp': math.ceil(timestamp * 1000),
'position': position / ENCODER_PULSES_PER_FOOT,
'speed': speed / ENCODER_PULSES_PER_FOOT},
namespace='/trajectory')
)
# Update persistent data on encoder pulses
odometer.register_callback(lambda timestamp, position, speed, odometer_cache = odometer_cache:
# Write cache every 100 pulses
(position - odometer_cache.get_distance() > 100) and odometer_cache.set_distance(timestamp, position, speed)
)
print("... odometer initialized")
print("Initializing sounds")
sounds = scootsound.ScootSound(audio_enabled)
sounds.import_from_disk()
print("... sounds initialized")
try:
print("Starting Flask server")
socketio.run(app,
host = "0.0.0.0",
port = 80)
except KeyboardInterrupt:
print("Flask server terminated.")
finally:
odometer.deinit()
odometer_cache.deinit()
pixels.solid()
pixels.deinit()
print("Application terminated")
sys.exit(0)