forked from Pbartek/pyobd-pi
-
Notifications
You must be signed in to change notification settings - Fork 391
/
Copy pathasynchronous.py
226 lines (189 loc) · 8.86 KB
/
asynchronous.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# -*- coding: utf-8 -*-
########################################################################
# #
# python-OBD: A python OBD-II serial module derived from pyobd #
# #
# Copyright 2004 Donour Sizemore ([email protected]) #
# Copyright 2009 Secons Ltd. (www.obdtester.com) #
# Copyright 2009 Peter J. Creath #
# Copyright 2016 Brendan Whitfield (brendan-w.com) #
# #
########################################################################
# #
# async.py #
# #
# This file is part of python-OBD (a derivative of pyOBD) #
# #
# python-OBD is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 2 of the License, or #
# (at your option) any later version. #
# #
# python-OBD is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with python-OBD. If not, see <http://www.gnu.org/licenses/>. #
# #
########################################################################
import time
import threading
import logging
from .OBDResponse import OBDResponse
from .obd import OBD
logger = logging.getLogger(__name__)
class Async(OBD):
"""
Class representing an OBD-II connection with it's assorted commands/sensors
Specialized for asynchronous value reporting.
"""
def __init__(self, portstr=None, baudrate=None, protocol=None, fast=True,
timeout=0.1, check_voltage=True, start_low_power=False,
delay_cmds=0.25):
self.__thread = None
super(Async, self).__init__(portstr, baudrate, protocol, fast,
timeout, check_voltage, start_low_power)
self.__commands = {} # key = OBDCommand, value = Response
self.__callbacks = {} # key = OBDCommand, value = list of Functions
self.__running = False
self.__was_running = False # used with __enter__() and __exit__()
self.__delay_cmds = delay_cmds
@property
def running(self):
return self.__running
def start(self):
""" Starts the async update loop """
if not self.is_connected():
logger.info("Async thread not started because no connection was made")
return
if len(self.__commands) == 0:
logger.info("Async thread not started because no commands were registered")
return
if self.__thread is None:
logger.info("Starting async thread")
self.__running = True
self.__thread = threading.Thread(target=self.run)
self.__thread.daemon = True
self.__thread.start()
def stop(self):
""" Stops the async update loop """
if self.__thread is not None:
logger.info("Stopping async thread...")
self.__running = False
self.__thread.join()
self.__thread = None
logger.info("Async thread stopped")
def paused(self):
"""
A stub function for semantic purposes only
enables code such as:
with connection.paused() as was_running
...
"""
return self
def __enter__(self):
"""
pauses the async loop,
while recording the old state
"""
self.__was_running = self.__running
self.stop()
return self.__was_running
def __exit__(self, exc_type, exc_value, traceback):
"""
resumes the update loop if it was running
when __enter__ was called
"""
if not self.__running and self.__was_running:
self.start()
return False # don't suppress any exceptions
def close(self):
""" Closes the connection """
self.stop()
super(Async, self).close()
def watch(self, c, callback=None, force=False):
"""
Subscribes the given command for continuous updating. Once subscribed,
query() will return that command's latest value. Optional callbacks can
be given, which will be fired upon every new value.
"""
# the dict shouldn't be changed while the daemon thread is iterating
if self.__running:
logger.warning("Can't watch() while running, please use stop()")
else:
if not force and not self.test_cmd(c):
# self.test_cmd() will print warnings
return
# new command being watched, store the command
if c not in self.__commands:
logger.info("Watching command: %s" % str(c))
self.__commands[c] = OBDResponse() # give it an initial value
self.__callbacks[c] = [] # create an empty list
# if a callback was given, push it
if hasattr(callback, "__call__") and (callback not in self.__callbacks[c]):
logger.info("subscribing callback for command: %s" % str(c))
self.__callbacks[c].append(callback)
def unwatch(self, c, callback=None):
"""
Unsubscribes a specific command (and optionally, a specific callback)
from being updated. If no callback is specified, all callbacks for
that command are dropped.
"""
# the dict shouldn't be changed while the daemon thread is iterating
if self.__running:
logger.warning("Can't unwatch() while running, please use stop()")
else:
logger.info("Unwatching command: %s" % str(c))
if c in self.__commands:
# if a callback was specified, only remove the callback
if hasattr(callback, "__call__") and (callback in self.__callbacks[c]):
self.__callbacks[c].remove(callback)
# if no more callbacks are left, remove the command entirely
if len(self.__callbacks[c]) == 0:
self.__commands.pop(c, None)
else:
# no callback was specified, pop everything
self.__callbacks.pop(c, None)
self.__commands.pop(c, None)
def unwatch_all(self):
""" Unsubscribes all commands and callbacks from being updated """
# the dict shouldn't be changed while the daemon thread is iterating
if self.__running:
logger.warning("Can't unwatch_all() while running, please use stop()")
else:
logger.info("Unwatching all")
self.__commands = {}
self.__callbacks = {}
def query(self, c, force=False):
"""
Non-blocking query().
Only commands that have been watch()ed will return valid responses
"""
if c in self.__commands:
return self.__commands[c]
else:
return OBDResponse()
def run(self):
""" Daemon thread """
# loop until the stop signal is received
while self.__running:
if len(self.__commands) > 0:
# loop over the requested commands, send, and collect the response
for c in self.__commands:
if not self.is_connected():
logger.info("Async thread terminated because device disconnected")
self.__running = False
self.__thread = None
return
# force, since commands are checked for support in watch()
r = super(Async, self).query(c, force=True)
# store the response
self.__commands[c] = r
# fire the callbacks, if there are any
for callback in self.__callbacks[c]:
callback(r)
time.sleep(self.__delay_cmds)
else:
time.sleep(0.25) # idle