-
Notifications
You must be signed in to change notification settings - Fork 1
/
readdatagrabber.py
239 lines (218 loc) · 11.3 KB
/
readdatagrabber.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#! /usr/bin/env python
"""Module to manipulate DataGrabber files.
This module provides functions to open and read DataGrabber files.
Data are stored as DataGrabberPoint objects, which are themselves filled
with DataGrabberChannel objects. This will provide a useful mapping for
conversion of DataGrabber files to another format.
Alan Kastengren, XSD, Argonne National Laboratory
Started: November 12, 2012
Credit to Daniel Duke for his pyDataGrabber.py code, which provided
useful ideas for this code.
Edits:
May 5, 2014: Add check that DataGrabber file exists.
February 24, 2015: Add function to display an excerpt of a channel.
December 13, 2016: Dan Duke added regex to line 87 to correctly parse number-as-string PVs which have leading whitespace.
January 24, 2017: Add the fread_channel_meta_data method to DataGrabberCoordinate
"""
import numpy as np
import os
import matplotlib.pyplot as plt
import re
import logging
# Set up logging
logger = logging.getLogger('readdatagrabber')
logger.addHandler(logging.NullHandler())
#Dictionary for converting datatype to number of bytes
bytes_per_point = {"byte":1, "short":2, "int":4, "float":4, "long":8, "double":8}
def fread_headers(filename,record_length_key="RecordLength",
data_type_key="BinaryDataType",
channel_num_key="NumberOfChannels",
coord_start="FileType=DataGrabberBinary",
chan_start="Channel"):
#Save a new copy of the filename, which will be useful later.
#Instantiate a class to hold the header info
file_data = []
#Throw an IOError if the file doesn't exist
if not os.path.isfile(filename):
logger.error("No such file exists: " + filename)
raise IOError
"""Loop through the DataGrabber file, saving headers."""
with open(filename,'rb') as dg_file:
#Read in a potential header. Trap possible extra \n
line = "a"
while line!="":
line = dg_file.readline().decode('ascii')
print(line)
#If this is a good coordinate header,
if line.startswith(coord_start):
#Instantiate a DataGrabberCoordinate instance for this coordinate.
current_coord = DataGrabberCoordinate(filename)
file_data.append(current_coord)
#Save it as a dictionary. Remove last two characters, which are /r/n.
current_coord.coordinate_header = fparse_header_line(line[:-2])
#Figure out how many channels there are.
num_channels = int(current_coord.coordinate_header[channel_num_key])
#For each channel:
for i in range(num_channels):
#Read in potential header.
line = dg_file.readline().decode('ascii')
while not(line.startswith(chan_start)):
#If we reach the end of the file here, this is a serious problem.
if line=="":
logger.error("Lost track of header for channel", i, ",\
coordinate #", len(file_data))
return file_data
#Otherwise, read another line
line = dg_file.readline().decode('ascii')
#If header is good, instantiate a new DataGrabberChannel object.
current_channel = DataGrabberChannel()
#Save filename for use in getting data in future.
current_channel.filename = filename
current_coord.channels.append(current_channel)
#Parse the header line into a dictionary. Remove last two characters, which are /r/n.
current_channel.channel_header = fparse_header_line(line[:-2])
#Add a file pointer to the dictionary.
current_channel.channel_header["FPointer"] = dg_file.tell()
# print dg_file.tell()
#Figure out how many bytes to skip.
num_bytes_per_point = bytes_per_point[current_channel.channel_header[data_type_key]]
num_points = int(current_channel.channel_header[record_length_key])
#Skip the appropriate number of bytes.
dg_file.seek(num_points * num_bytes_per_point,1)
logger.info("Found ", len(file_data)," positions.")
return file_data
def fparse_header_line(header,pair_delimit=" ",split_delimit="="):
"""Parses a DataGrabber header line into a dictionary"""
split_header = re.sub('= +','=',header).split(pair_delimit)
output = {}
for pair in split_header:
key_value = pair.split(split_delimit)
#If we have an invalid split (e.g. empty string), just skip
if len(key_value) <2: continue
output[key_value[0]] = key_value[1]
return output
def fprint_headers(filename,record_length_key="RecordLength",
data_type_key="BinaryDataType",
channel_num_key="NumberOfChannels",
coord_start="FileType=DataGrabberBinary",
chan_start="Channel"):
'''Prints the headers to the console.
'''
#Throw an IOError if the file doesn't exist
if not os.path.isfile(filename):
logger.error("No such file exists: " + filename)
raise IOError
"""Loop through the DataGrabber file, saving headers."""
with open(filename,'rb') as dg_file:
#Read in a potential header. Trap possible extra \n
line = "a"
while line!="":
line = dg_file.readline().decode('ascii')
#If this is a good coordinate header,
if line.startswith(coord_start):
#Print this coordinate header
print(line)
#Parse header into a dictionary. Remove last two characters, which are /r/n.
coordinate_header = fparse_header_line(line[:-2])
#Figure out how many channels there are.
num_channels = int(coordinate_header[channel_num_key])
#For each channel:
for i in range(num_channels):
#Read in potential header.
line = dg_file.readline().decode('ascii')
while not(line.startswith(chan_start)):
#If we reach the end of the file here, this is a serious problem.
if line=="":
logger.error("Lost track of header for channel", i)
#Otherwise, read another line
line = dg_file.readline().decode('ascii')
#Print the channel header to the console
print("--- " + line)
#Parse the header line into a dictionary. Remove last two characters, which are /r/n.
meta_data = fparse_header_line(line[:-2])
#Figure out how many bytes to skip.
num_bytes_per_point = bytes_per_point[meta_data[data_type_key]]
num_points = int(meta_data[record_length_key])
#Skip the appropriate number of bytes.
dg_file.seek(num_points * num_bytes_per_point,1)
class DataGrabberChannel():
"""Holder for channel headers and actual data from DataGrabber channel"""
def __init__(self):
self.channel_header = {}
self.data = None
self.filename = None
self.dtype_dict = {"byte":"b", "short":"h", "int":"i", "float":"f", "long":"l", "double":"d"}
def fread_data(self,normalize=False,endian="big",data_type_key="BinaryDataType",record_length_key="RecordLength"):
"""Read in the binary data from a DataGrabber file channel."""
#Come up with the appropriate dtype for the np.fromfile option
data_dtype = np.dtype('byte')
if endian=="little":
dtype_string = "<"+self.dtype_dict[self.channel_header[data_type_key]]
#print dtype_string
data_dtype = np.dtype(dtype_string)
else:
data_dtype = np.dtype(">"+self.dtype_dict[self.channel_header[data_type_key]])
#Use the np.fromfile function to read in data
with open(self.filename,'rb') as dg_file:
dg_file.seek(int(self.channel_header["FPointer"]),0)
self.data = np.fromfile(dg_file,dtype=data_dtype,count=int(self.channel_header[record_length_key]))
def fread_data_volts(self,normalize=False,endian="big",data_type_key="BinaryDataType",record_length_key="RecordLength",
scale_key='Scale',offset_key='Offset'):
'''Read in the binary data from a DataGrabber file channel, but in volts.
'''
#Use previous method to read in the data array
self.fread_data(normalize,endian,data_type_key,record_length_key)
scale_value = float(self.channel_header[scale_key])
offset_value = float(self.channel_header[offset_key])
self.data = scale_value * self.data + offset_value
class DataGrabberCoordinate():
"""Holder for header and channels from DataGrabber coordinate (point)"""
def __init__(self,filename):
self.coordinate_header = {}
self.channels = []
self.filename = filename
def fread_coord_data(self):
"""Read in the data for all channels of this position."""
for channel in self.channels:
channel.fread_data()
def fread_channel_meta_data(self,descriptor,key):
for channel in self.channels:
if channel.channel_header['UserDescription'] == descriptor:
return channel.channel_header[key]
def fchannel_by_name(self,descriptor):
'''Returns the channel with the desired UserDescription.
Inputs:
descriptor: text to match to value of UserDescription key.
Outputs:
readdatagrabber.DataGrabberChannel object.
'''
for channel in self.channels:
if channel.channel_header['UserDescription'] == descriptor:
return channel
else:
logger.error("Error in fchannel_by_name: channel name not found: " + descriptor)
return None
def fdisplay_excerpt(self,descriptor="PINDiode",end=False,num_points=1e4):
"""Module to display first part of a channel.
Inputs
descriptor: UserDescription for the desired channel.
end: if True, display last points, otherwise display beginning of trace
num_points: number of points to display
"""
#Pick the first data point, get channel with descriptor in it
for channel in self.channels:
if channel.channel_header["UserDescription"]==descriptor:
channel.fread_data()
#Only plot first 10000 pointscoord_object.channels[descriptor][
if channel.data.size>num_points:
if end:
x = np.arange(len(channel.data))
plt.plot(x[-num_points:],channel.data[-num_points:],'b.-')
else:
plt.plot(channel.data[:num_points],'b.-')
else:
plt.plot(channel.data,'b.-')
plt.show()
#Clear out the data to save memory
channel.data = None
return