forked from Pbartek/pyobd-pi
-
Notifications
You must be signed in to change notification settings - Fork 391
/
Copy pathobd.py
315 lines (258 loc) · 12 KB
/
obd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# -*- coding: utf-8 -*-
########################################################################
# #
# python-OBD: A python OBD-II serial module derived from pyobd #
# #
# Copyright 2004 Donour Sizemore ([email protected]) #
# Copyright 2009 Secons Ltd. (www.obdtester.com) #
# Copyright 2009 Peter J. Creath #
# Copyright 2016 Brendan Whitfield (brendan-w.com) #
# #
########################################################################
# #
# obd.py #
# #
# This file is part of python-OBD (a derivative of pyOBD) #
# #
# python-OBD is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 2 of the License, or #
# (at your option) any later version. #
# #
# python-OBD is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with python-OBD. If not, see <http://www.gnu.org/licenses/>. #
# #
########################################################################
import logging
from .OBDResponse import OBDResponse
from .__version__ import __version__
from .commands import commands
from .elm327 import ELM327
from .protocols import ECU_HEADER
from .utils import scan_serial, OBDStatus
logger = logging.getLogger(__name__)
class OBD(object):
"""
Class representing an OBD-II connection
with it's assorted commands/sensors.
"""
def __init__(self, portstr=None, baudrate=None, protocol=None, fast=True,
timeout=0.1, check_voltage=True, start_low_power=False):
self.interface = None
self.supported_commands = set(commands.base_commands())
self.fast = fast # global switch for disabling optimizations
self.timeout = timeout
self.__last_command = b"" # used for running the previous command with a CR
self.__last_header = ECU_HEADER.ENGINE # for comparing with the previously used header
self.__frame_counts = {} # keeps track of the number of return frames for each command
logger.info("======================= python-OBD (v%s) =======================" % __version__)
self.__connect(portstr, baudrate, protocol,
check_voltage, start_low_power) # initialize by connecting and loading sensors
self.__load_commands() # try to load the car's supported commands
logger.info("===================================================================")
def __connect(self, portstr, baudrate, protocol, check_voltage,
start_low_power):
"""
Attempts to instantiate an ELM327 connection object.
"""
if portstr is None:
logger.info("Using scan_serial to select port")
port_names = scan_serial()
logger.info("Available ports: " + str(port_names))
if not port_names:
logger.warning("No OBD-II adapters found")
return
for port in port_names:
logger.info("Attempting to use port: " + str(port))
self.interface = ELM327(port, baudrate, protocol,
self.timeout, check_voltage,
start_low_power)
if self.interface.status() >= OBDStatus.ELM_CONNECTED:
break # success! stop searching for serial
else:
logger.info("Explicit port defined")
self.interface = ELM327(portstr, baudrate, protocol,
self.timeout, check_voltage,
start_low_power)
# if the connection failed, close it
if self.interface.status() == OBDStatus.NOT_CONNECTED:
# the ELM327 class will report its own errors
self.close()
def __load_commands(self):
"""
Queries for available PIDs, sets their support status,
and compiles a list of command objects.
"""
if self.status() != OBDStatus.CAR_CONNECTED:
logger.warning("Cannot load commands: No connection to car")
return
logger.info("querying for supported commands")
pid_getters = commands.pid_getters()
for get in pid_getters:
# PID listing commands should sequentially become supported
# Mode 1 PID 0 is assumed to always be supported
if not self.test_cmd(get, warn=False):
continue
# when querying, only use the blocking OBD.query()
# prevents problems when query is redefined in a subclass (like Async)
response = OBD.query(self, get)
if response.is_null():
logger.info("No valid data for PID listing command: %s" % get)
continue
# loop through PIDs bit-array
for i, bit in enumerate(response.value):
if bit:
mode = get.mode
pid = get.pid + i + 1
if commands.has_pid(mode, pid):
self.supported_commands.add(commands[mode][pid])
# set support for mode 2 commands
if mode == 1 and commands.has_pid(2, pid):
self.supported_commands.add(commands[2][pid])
logger.info("finished querying with %d commands supported" % len(self.supported_commands))
def __set_header(self, header):
if header == self.__last_header:
return
r = self.interface.send_and_parse(b'AT SH ' + header + b' ')
if not r:
logger.info("Set Header ('AT SH %s') did not return data", header)
return OBDResponse()
if "\n".join([m.raw() for m in r]) != "OK":
logger.info("Set Header ('AT SH %s') did not return 'OK'", header)
return OBDResponse()
self.__last_header = header
def close(self):
"""
Closes the connection, and clears supported_commands
"""
self.supported_commands = set()
if self.interface is not None:
logger.info("Closing connection")
self.__set_header(ECU_HEADER.ENGINE)
self.interface.close()
self.interface = None
def status(self):
""" returns the OBD connection status """
if self.interface is None:
return OBDStatus.NOT_CONNECTED
else:
return self.interface.status()
def low_power(self):
""" Enter low power mode """
if self.interface is None:
return OBDStatus.NOT_CONNECTED
else:
return self.interface.low_power()
def normal_power(self):
""" Exit low power mode """
if self.interface is None:
return OBDStatus.NOT_CONNECTED
else:
return self.interface.normal_power()
# not sure how useful this would be
# def ecus(self):
# """ returns a list of ECUs in the vehicle """
# if self.interface is None:
# return []
# else:
# return self.interface.ecus()
def protocol_name(self):
""" returns the name of the protocol being used by the ELM327 """
if self.interface is None:
return ""
else:
return self.interface.protocol_name()
def protocol_id(self):
""" returns the ID of the protocol being used by the ELM327 """
if self.interface is None:
return ""
else:
return self.interface.protocol_id()
def port_name(self):
""" Returns the name of the currently connected port """
if self.interface is not None:
return self.interface.port_name()
else:
return ""
def is_connected(self):
"""
Returns a boolean for whether a connection with the car was made.
Note: this function returns False when:
obd.status = OBDStatus.ELM_CONNECTED
"""
return self.status() == OBDStatus.CAR_CONNECTED
def print_commands(self):
"""
Utility function meant for working in interactive mode.
Prints all commands supported by the car.
"""
for c in self.supported_commands:
print(str(c))
def supports(self, cmd):
"""
Returns a boolean for whether the given command
is supported by the car
"""
return cmd in self.supported_commands
def test_cmd(self, cmd, warn=True):
"""
Returns a boolean for whether a command will
be sent without using force=True.
"""
# test if the command is supported
if not self.supports(cmd):
if warn:
logger.warning("'%s' is not supported" % str(cmd))
return False
# mode 06 is only implemented for the CAN protocols
if cmd.mode == 6 and self.interface.protocol_id() not in ["6", "7", "8", "9"]:
if warn:
logger.warning("Mode 06 commands are only supported over CAN protocols")
return False
return True
def query(self, cmd, force=False):
"""
primary API function. Sends commands to the car, and
protects against sending unsupported commands.
"""
if self.status() == OBDStatus.NOT_CONNECTED:
logger.warning("Query failed, no connection available")
return OBDResponse()
# if the user forces, skip all checks
if not force and not self.test_cmd(cmd):
return OBDResponse()
self.__set_header(cmd.header)
logger.info("Sending command: %s" % str(cmd))
cmd_string = self.__build_command_string(cmd)
messages = self.interface.send_and_parse(cmd_string)
# if we're sending a new command, note it
# first check that the current command WASN'T sent as an empty CR
# (CR is added by the ELM327 class)
if cmd_string:
self.__last_command = cmd_string
# if we don't already know how many frames this command returns,
# log it, so we can specify it next time
if cmd not in self.__frame_counts:
self.__frame_counts[cmd] = sum([len(m.frames) for m in messages])
if not messages:
logger.info("No valid OBD Messages returned")
return OBDResponse()
return cmd(messages) # compute a response object
def __build_command_string(self, cmd):
""" assembles the appropriate command string """
cmd_string = cmd.command
# if we know the number of frames that this command returns,
# only wait for exactly that number. This avoids some harsh
# timeouts from the ELM, thus speeding up queries.
if self.fast and cmd.fast and (cmd in self.__frame_counts):
cmd_string += str(self.__frame_counts[cmd]).encode()
# if we sent this last time, just send a CR
# (CR is added by the ELM327 class)
if self.fast and (cmd_string == self.__last_command):
cmd_string = b""
return cmd_string