forked from analogdevicesinc/pyadi-iio
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add adistream CLI tool to support data generation and streaming for tx devices like DACs. Update doc to include new tools optional dependency. Signed-off-by: SGudla <[email protected]>
- Loading branch information
1 parent
8bcfac1
commit cc61031
Showing
6 changed files
with
310 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
include LICENSE | ||
include README.md | ||
include adi/* | ||
include adi/tools/* | ||
|
||
exclude setup.cfg | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,293 @@ | ||
import argparse | ||
from time import sleep | ||
|
||
import adi | ||
import numpy as np | ||
from genalyzer import WaveformGen | ||
from pynput import keyboard | ||
|
||
|
||
class ADIStream(object): | ||
def __init__( | ||
self, | ||
classname, | ||
uri, | ||
v_ref_n, | ||
v_ref_p, | ||
npts, | ||
chn_list, | ||
v_req_l, | ||
v_req_u, | ||
out_freq, | ||
code_sel, | ||
wave_list, | ||
device_name, | ||
): | ||
self.duty_cyc = None | ||
self.sampling_freq = None | ||
self.data = [] | ||
self.line = 0 | ||
self.data_stream_abort = False | ||
self.classname = classname | ||
self.stream = eval( | ||
"adi." | ||
+ classname | ||
+ "(uri='" | ||
+ uri | ||
+ "', device_name='" | ||
+ device_name | ||
+ "')" | ||
) | ||
self.stream.tx_cyclic_buffer = True | ||
self.wave_list = wave_list | ||
|
||
self.chan_list = [] | ||
for val in chn_list: | ||
self.chan_list += ["voltage" + str(val)] | ||
self.stream.tx_enabled_channels = self.chan_list | ||
self.chn_cnt = len(chn_list) | ||
|
||
# get resolution | ||
self.resolution = self.stream.output_bits[0] | ||
if self.resolution > 16: | ||
self.stream._tx_data_type = np.int32 | ||
elif self.resolution > 8: | ||
self.stream._tx_data_type = np.int16 | ||
else: | ||
self.stream._tx_data_type = np.int8 | ||
|
||
self.npts = npts | ||
self.v_ref_n = v_ref_n | ||
self.v_ref_p = v_ref_p | ||
self.v_req_l = v_req_l | ||
self.v_req_u = v_req_u | ||
self.out_freq = out_freq | ||
self.code_sel = code_sel | ||
|
||
def write_buffered_data(self): | ||
|
||
# Send data from device | ||
self.stream.tx(self.data) | ||
|
||
if self.line == 0: | ||
print("Data streaming started >>") | ||
|
||
def get_data(self): | ||
|
||
try: | ||
self.sampling_freq = int(self.stream.sampling_frequency) | ||
except AttributeError: | ||
print( | ||
f"The device {self.classname} has no attribute named " | ||
"sampling_frequency" | ||
"" | ||
) | ||
|
||
if self.out_freq is None: | ||
self.out_freq = self.sampling_freq / self.npts | ||
|
||
if self.v_req_l is None: | ||
self.v_req_l = self.v_ref_n | ||
|
||
if self.v_req_u is None: | ||
self.v_req_u = self.v_ref_p | ||
|
||
if self.out_freq > self.sampling_freq / self.npts: | ||
print( | ||
f"The nearest possible output frequency that can be generated " | ||
f"with the current config is {self.sampling_freq / self.npts} Hz" | ||
) | ||
self.out_freq = self.sampling_freq / self.npts | ||
val = input("\nPress y to continue, any other key to abort: ") | ||
if val.lower() != "y": | ||
raise Exception("Recheck the configuration and try again..!") | ||
elif self.out_freq < self.sampling_freq / self.npts: | ||
self.stream.sampling_frequency = self.out_freq * self.npts | ||
print( | ||
"Actual generated output frequency: ", | ||
str(int(self.sampling_freq) // self.npts), | ||
) | ||
|
||
gen_obj = WaveformGen( | ||
self.npts, | ||
self.out_freq, | ||
self.code_sel, | ||
self.resolution, | ||
self.v_ref_n, | ||
self.v_ref_p, | ||
self.v_req_l, | ||
self.v_req_u, | ||
) | ||
|
||
if "pwm" in self.wave_list: | ||
self.duty_cyc = str( | ||
float(input("Enter duty cycle in percent: Eg. 25 for 25% duty-cycle: ")) | ||
/ 100.0 | ||
) | ||
|
||
# get data using the genalyzer apis | ||
for val in range(self.chn_cnt): | ||
if self.chn_cnt == 1: | ||
if self.wave_list[val] == "pwm": | ||
self.data = eval( | ||
"gen_obj.gen_pwm_wave(duty_cycle = " + self.duty_cyc + ")" | ||
) | ||
else: | ||
self.data = eval("gen_obj.gen_" + self.wave_list[val] + "_wave()") | ||
else: | ||
if self.wave_list[val] == "pwm": | ||
self.data.append( | ||
eval("gen_obj.gen_pwm_wave(duty_cycle = " + self.duty_cyc + ")") | ||
) | ||
else: | ||
self.data.append( | ||
eval("gen_obj.gen_" + self.wave_list[val] + "_wave()") | ||
) | ||
|
||
self.data = np.array(self.data) | ||
|
||
def key_press_event(self, key): | ||
if key == keyboard.Key.delete: | ||
self.data_stream_abort = True | ||
|
||
def do_data_streaming(self): | ||
self.get_data() | ||
|
||
listener = keyboard.Listener(on_press=self.key_press_event) | ||
listener.start() | ||
|
||
print("Press " "delete" " key to stop cyclic data streaming..") | ||
sleep(2) | ||
|
||
self.write_buffered_data() | ||
|
||
while not self.data_stream_abort: | ||
# This should halt the control for the cyclic mode to work. | ||
print("." * self.line, end="\r") | ||
|
||
self.line = self.line + 1 | ||
if self.line == 100: | ||
self.line = 1 | ||
print("\n", end="\r") | ||
|
||
self.stream.tx_destroy_buffer() | ||
print("\r\nData streaming finished\r\n") | ||
|
||
|
||
def run_adi_stream(): | ||
parser = argparse.ArgumentParser( | ||
description="ADI data streaming app", | ||
formatter_class=argparse.RawTextHelpFormatter, | ||
) | ||
parser.add_argument( | ||
"class", help="pyadi class name to stream data to", action="store" | ||
) | ||
parser.add_argument("uri", help="URI of target device", action="store") | ||
parser.add_argument( | ||
"neg_voltage_ref", | ||
help="Negative reference voltage of DAC in volts. \nInput 0 for unipolar DACs", | ||
action="store", | ||
type=float, | ||
default=0, | ||
) | ||
parser.add_argument( | ||
"pos_voltage_ref", | ||
help="Positive reference voltage of DAC in volts.", | ||
action="store", | ||
type=float, | ||
) | ||
|
||
parser.add_argument( | ||
"-d", | ||
"--device_name", | ||
help="Part name if the class supports more than 1 generic", | ||
action="store", | ||
default="", | ||
) | ||
parser.add_argument( | ||
"-n", | ||
"--data_points_per_wave", | ||
help="Number of data points required per wave\n" | ||
"Default value is 50 samples per wave", | ||
action="store", | ||
default=50, | ||
type=int, | ||
) | ||
parser.add_argument( | ||
"-cl", | ||
"--chn_list", | ||
help="Channels list to stream data to\nE.g. --chn_list 0 2 3 to stream data to channels 0, 2, 3\n" | ||
"Default is channel 0", | ||
nargs="+", | ||
action="store", | ||
default=[0], | ||
type=int, | ||
) | ||
parser.add_argument( | ||
"-vl", | ||
"--v_lower_req", | ||
help="Lower end Voltage required in volts. Should be in the accepted FSR.", | ||
action="store", | ||
type=float, | ||
) | ||
parser.add_argument( | ||
"-vu", | ||
"--v_upper_req", | ||
help="Upper end Voltage required in volts. Should be in the accepted FSR.", | ||
action="store", | ||
type=float, | ||
) | ||
parser.add_argument( | ||
"-f", | ||
"--output_freq", | ||
help="Output frequency required.\nNote: The effective output frequency per " | ||
"channel will be the output frequency required divided by the number of" | ||
" active channels enabled.\n" | ||
"Default will be the maximum sampling frequency supported by the app.", | ||
action="store", | ||
type=int, | ||
) | ||
parser.add_argument( | ||
"-c", | ||
"--code_sel", | ||
help="code data format to stream data in. \nAccepted:\n\t0 for binary offset" | ||
"\n\t1 for 2s-complement.\nDefault is 0 (binary offset)", | ||
action="store", | ||
default=0, | ||
type=int, | ||
choices=[0, 1], | ||
) | ||
parser.add_argument( | ||
"-w", | ||
"--wave_types", | ||
help="list of waveform type required. " | ||
"\nAccepted: \n\tsine" | ||
"\n\tcosine " | ||
"\n\ttriangular " | ||
"\n\tsquare " | ||
"\n\tpwm" | ||
"\nDefault is sine" | ||
"\nE.g. --wave_types sine square triangular ", | ||
nargs="+", | ||
action="store", | ||
default=["sine"], | ||
choices=["sine", "cosine", "triangular", "square", "pwm"], | ||
) | ||
|
||
args = vars(parser.parse_args()) | ||
|
||
app = ADIStream( | ||
args["class"], | ||
args["uri"], | ||
args["neg_voltage_ref"], | ||
args["pos_voltage_ref"], | ||
args["data_points_per_wave"], | ||
args["chn_list"], | ||
args["v_lower_req"], | ||
args["v_upper_req"], | ||
args["output_freq"], | ||
args["code_sel"], | ||
args["wave_types"], | ||
args["device_name"], | ||
) | ||
app.do_data_streaming() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters