-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsvlib.py
247 lines (209 loc) · 6.42 KB
/
csvlib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
""" This file contains common routines for handling
Comma Separated Values (CSV) files.
Functions:
1. CSV write/load:
write_csv(filename, data, headers, append=False)
load_csv(filename, load_headers=[], nsamples=None, skip=0L)
2. MISC utilities
get_headers_index(headers, load_headers):
get_csv_headers(filename):
parse_value(astring)
3. Internal routines
_get_fp(filename, mode='r')
_close_fp(fp, filename)
"""
import sys, copy
import numpy
import netlist_parser
SEPARATOR = "\t"
def write_csv(filename, data, headers, append=False):
"""Writes data in CVS format to filename.
The headers have to be ordered according to the data order.
filename: a string, the path to the file to be written.
use 'stdout' to write to stdout
data: a numpy.array instance,
variables are swept across ROWS
time samples are swept along COLUMNS
equivalently: data[variable_index, sample_number]
headers: a list of strings, the signal names
headers[i] corresponds to data[i, :]
append: boolean value. If False, the file (if it exists)
will be rewritten, otherwise it will be appended to.
Returns: None
"""
if not append:
#sys.stdout.write("Writing data in CSV format to "+filename+"... ")
#sys.stdout.flush()
fp = _get_fp(filename, mode="w")
headers = copy.copy(headers)
if not headers[0][0] == '#':
headers[0] = '#'+headers[0]
for hi in range(len(headers)):
fp.write(headers[hi])
if hi < len(headers) - 1:
fp.write(SEPARATOR)
else:
fp.write("\n")
else:
#sys.stdout.write("Appending data in CSV format to "+filename+"... ")
#sys.stdout.flush()
fp = _get_fp(filename, mode="a")
# No headers to be written!
if not data.shape[0] == len(headers):
print "(W): write_csv(): data and headers don't match. Continuing anyway."
print "DATA: " + str(data.shape) + " headers length: "+str(len(headers))
for j in range(data.shape[1]):
for i in range(len(headers)):
fp.write("{0:g}".format(data[i, j]))
if i < len(headers) - 1:
fp.write(SEPARATOR)
fp.write("\n")
_close_fp(fp, filename)
#print "done."
def write_headers(filename, headers):
"""Writes headers in CVS format to filename."""
fp = _get_fp(filename, mode="w")
headers = copy.copy(headers)
if not headers[0][0] == '#':
headers[0] = '#'+headers[0]
for hi in range(len(headers)):
fp.write(headers[hi])
if hi < len(headers) - 1:
fp.write(SEPARATOR)
else:
fp.write("\n")
def _get_fp(filename, mode="r"):
if filename == 'stdout':
if mode == 'w' or mode == 'a':
fp = sys.stdout
else:
print "(EE) Mode %s is not supported for stdout." % (mode,)
fp = None
else:
fp = open(filename, mode)
return fp
def _close_fp(fp, filename):
try:
fp.flush()
except IOError:
pass
if filename == 'stdout':
pass
else:
fp.close()
def get_headers_index(headers, load_headers):
"""Creates a list of integers. Each element in the list is the COLUMN index
of the signal according to the supplied headers.
headers: list of strings, the signal names, as returned by get_csv_headers()
Returns a list of int."""
his = []
lowcase_headers = map(str.lower, headers)
for lh in load_headers:
try:
his = his + [lowcase_headers.index(lh.lower())]
except ValueError:
print "(W): header "+lh+" not found. Skipping."
except AttributeError:
print "(W): got spurious header "+str(lh)+" (not a string). Skipping."
return his
def get_csv_headers(filename):
"""Reads the file (filename) and returns a list of the signals inside.
The order of the signals in the list corresponds to the order of the signals in the file.
filname: the path to the file to be open
Returns a list of strings.
"""
fp = _get_fp(filename, mode="r")
headers = None
line = ""
while line == "":
line = fp.readline()
line = line.strip()
if line[0] == '#':
line = line[1:]
headers = line.split(SEPARATOR)
return headers
def load_csv(filename, load_headers=[], nsamples=None, skip=0L):
"""Reads data in CVS format from filename.
Supports:
- selective signal loading,
- loading up to a certain number of samples,
- skipping to a certain line.
to allow incremental reading of big files.
filename: a string, the path to the file to be read
load_headers: a list of strings, each one being a signal
to be loaded. Empty string -> read all signals
nsamples: int/long, number of samples to be read for each
signal. If None, read all available samples.
skip: index of the first sample to be read. Default: 0
Returns:
data: numpy.array containing the data, ordered according to
the order of load_headers (or the order on file),
headers: the names of the signals read from file,
pos: long, position of the last sample read +1, referred to the
sample #0 in the file.
EOF: boolean, True if the EOF was reached."""
if filename == 'stdout':
print "Can't load data from stdout."
return None, None, None, None
fp = _get_fp(filename, mode="r")
headers = None
data = None
sample_index = 0L
line_index = 0
EOF = False
for line in fp:#.readlines():
line = line.strip()
if line == '':
continue
if line[0] == '#' and headers is None:
line = line[1:]
if line[0] == '#' and headers is not None:
continue #comment
if headers is None:
headers = line.split(SEPARATOR)
if len(load_headers):
his = get_headers_index(headers, load_headers)
else:
his = range(len(headers))
else:
line_index = line_index + 1
if line_index < skip:
continue
if data is None:
data = numpy.zeros((len(his), 1))
else:
data = numpy.concatenate((data, numpy.zeros((len(his), 1))), axis=1)
data_values = line.split(SEPARATOR)
for i in range(len(data_values)):
if his.count(i) > 0:
data[his.index(i),-1] = parse_value(data_values[i])
else:
pass
sample_index = sample_index + 1
if nsamples is not None and sample_index == nsamples:
break
_close_fp(fp, filename)
if data is None or line == '' or nsamples > sample_index:
EOF = True
else:
EOF = False
pos = skip + sample_index
headers = map(headers.__getitem__, his)
return numpy.mat(data), headers, pos, EOF
def parse_value(astring):
try:
afloat = float(astring)
success = True
except ValueError:
success = False
if not success:
try:
afloat = netlist_parser.convert_units(astring)
success = True
print "(W): SPICE formatted value found."
except ValueError:
success = False
if not success:
print "(E): Unknown data format: "+astring
raise ValueError
return afloat