Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull request for Alicat Python Library #15

Merged
merged 19 commits into from
Jan 5, 2022
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,48 @@ You can also set the gas type and flow rate / pressure.

```python
flow_controller.set_gas('N2')
flow_controller.set_gas(8) # Optionally set a gas by it's number; find the full gas table on page 52 of the Alicat manual.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove apostrophe - "its" - and move out from a code comment to the README itself. Something like "gases can be set by either their name or their index number" before this code block.

flow_controller.set_flow_rate(1.0)
flow_controller.set_pressure(20)
```

For firmwave 5v and greater, create and set gas mixes using COMPOSER software loaded into the device. Mixes can contain up to five gases, and are stored in gas indices 236-255.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ware, not wave.

Is there a way to read the firmware version through the serial commands? We could use this on init to set features and raise errors if someone tries to use an unsupported function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, you already do this and it's great.


```python
flow_controller.create_mix(mix_no=236, name="Mix1", gas1="N2", percent1=50, gas2="Ar", percent2=50)
flow_controller.set_gas(236)
flow_controller.delete_mix(236)
```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool!!


Additional features include override commands to increase device functionality.

```python
flow_controller.lock() # Lock the front display.
flow_controller.unlock() # Unlock the front display.
flow_controller.hold() # Hold the valve in its current position.
flow_controller.cancel_hold() # Cancel the valve hold.
flow_controller.tare_volumetric() # Tare volumetric hold.
flow_controller.tare_pressure() # Tare pressure.
flow_controller.reset_tot() # Reset totalizer, if totalizer functionality included.
```

For flow controllers, read and write PID loop settings for device tuning.

```python
flow_controller.write_PID_looptype("PD2I")
flow_controller.write_PID_P(4000)
flow_controller.write_PID_D(10)
flow_controller.write_PID_I(4000)
print(flow_controller.read_PID())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also cool! I've been meaning to add PID tweaking to this code since day 1, so it's great to see it here.


>>>{
'loop_type': 'PD2I',
'P': '4000',
'D': '10',
'I': '4000'
}
```

### Alicat Addressing

You can have multiple controllers on the same port by using Alicat's `A`-`D` addresses
Expand Down
13 changes: 12 additions & 1 deletion alicat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def command_line():
"to read devices routed through a converter.")
parser.add_argument('--address', '-a', default='A', type=str, help="The "
"device address, A-D. Should only be used if multiple "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was there all along, but might as well fix it now. I believe they go A-Z not A-D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it is A-Z! I changed it.

"flow controllers are connected to one port.")
"flow controllers are connected to one port or if"
"device ID is not A.")
parser.add_argument('--set-gas', '-g', default=None, type=str,
help="Sets the gas type. Supported gas types are: "
"'Air', 'Ar', 'CH4', 'CO', 'CO2', 'C2H6', 'H2', "
Expand All @@ -35,6 +36,16 @@ def command_line():
parser.add_argument('--stream', '-s', action='store_true',
help="Sends a constant stream of flow controller "
"data, formatted as a tab-separated table.")
parser.add_argument("--lock", "-l", action="store_true",
help="Locks device display.")
parser.add_argument("--unlock", "-u", action="store_true",
help="Unlocks device display.")
parser.add_argument("--hold", "-hd", action="store_true",
help="Holds the valve at the present value.")
parser.add_argument("--cancel-hold", "-c", action="store_true",
help="Cancel valve hold.")
parser.add_argument("--reset-totalizer", "-r", action="store_true",
help="Reset current value of totalizer to zero.")
args = parser.parse_args()

if args.port.startswith('tcp://'):
Expand Down
277 changes: 266 additions & 11 deletions alicat/serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,175 @@ def set_gas(self, gas, retries=2):
"""Set the gas type.

Args:
gas: The gas type, as a string. Supported gas types are:
gas: The gas type, as a string or integer. Supported gas types by string are:
'Air', 'Ar', 'CH4', 'CO', 'CO2', 'C2H6', 'H2', 'He', 'N2',
'N2O', 'Ne', 'O2', 'C3H8', 'n-C4H10', 'C2H2', 'C2H4',
'i-C2H10', 'Kr', 'Xe', 'SF6', 'C-25', 'C-10', 'C-8', 'C-2',
'C-75', 'A-75', 'A-25', 'A1025', 'Star29', 'P-5'

For the full gas table, please see page 52 of the controller manual here:
https://documents.alicat.com/manuals/DOC-MANUAL-MC.pdf

Gas mixes may only be called by their mix number.
"""
patrickfuller marked this conversation as resolved.
Show resolved Hide resolved
self._test_controller_open()

if gas not in self.gases:
raise ValueError("{} not supported!".format(gas))
command = '{addr}$${gas}\r'.format(addr=self.address,
gas=self.gases.index(gas))
if isinstance(gas, str) is False:
num = True
else:
num = False
Copy link
Contributor

@patrickfuller patrickfuller May 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with

num = isinstance(gas, int)

In the future, avoid using is this way. It doesn't quite mean what you think it means and it'll get you into trouble!

Also, in the future, determining if something is a string in python is harder than you'd expect. Python 2/3 handle it differently (str vs basestring) so it's better to check if number vs. not.

Copy link
Contributor

@patrickfuller patrickfuller May 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looking at this more, I'd recommend breaking up into some private methods. Something like:

def set_gas(self, gas)
    # if gas not in gases, etc at the front. Whatever's shared.
    if isinstance(gas, int):
        return self._set_gas_number(gas)
    else:
        return self._set_gas_name(gas)

def _set_gas_number(self, number):
    # write, read, check
    pass

def _set_gas_name(self, name):
    # write, read, check
    pass

Let me know if this makes sense to you!


if num is True:
command = '{addr}$${index}\r'.format(addr=self.address,
index=gas)
self._write_and_read(command, retries)
else:
if gas not in self.gases:
raise ValueError("{} not supported!".format(gas))
command = '{addr}$${gas}\r'.format(addr=self.address, gas=self.gases.index(gas))
self._write_and_read(command, retries)

read_reg46 = self._write_and_read('{addr}$$R46\r'.format(addr=self.address), retries)
reg46 = int(read_reg46.split()[-1])
bits = [32768, 16384, 8192, 4096, 2048, 1024, 512]

for u in range(0, 7):
reg46 = reg46 - bits[u]
if reg46 < 0:
reg46 = reg46 + bits[u]
elif reg46 < 255:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is usually a cleaner one line way of bitmashing. This looks something like

reg46 = int(read_reg46.split()[-1]) & 0b0000000111111111
# or even
reg46 = int(read_reg46.split()[-1]) & 0x1ff

The hex is shorter but the binary shows which bits are being turned off (if that's how you're thinking about this). Choose whichever one you find more readable and clear.


if num is True:
if gas != reg46:
raise IOError("Cannot set gas, gas set to Air.")
else:
if self.gases.index(gas) != reg46:
raise IOError("Cannot set gas.")

def create_mix(self, mix_no, name, gas1, percent1, gas2, percent2, gas3=None, percent3=None, gas4=None, percent4=None, gas5=None, percent5=None, retries=2):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably clean up this function by thinking through some other argument patterns. Python allows some fun interactions here. Let's try a few.

As lists:

create_mix(100, 'air', ['N2', 'O2', 'Ar'], [78, 21, 1])

You end up making the lists below so this cleans up your function. But it's kind of awkward looking to use.

What about a dictionary:

def create_mix(self, mix_no, name, gases):
    pass
create_mix(100, 'air', {'N2': 78, 'O2': 21, 'Ar': 1})

I really like this one. It'll allow you to really shorten your error checking code.

One more for fun would be kwarg unpacking.

def create_mix(self, mix_no, name, **gases):
    pass
create_mix(100, 'air', N2=78, O2=21, Ar=1)

That's nice too and would have the same benefits as passing the dictionary above. The main downside here is that it's a more complex python idiom which can slow new coders down.

"""Create a gas mix.

Gas mixes are made using COMPOSER software located from the front panel and over serial.
COMPOSER mixes can only be made on Alicat devices with firmware 5v or greater.

Args:
mix_no: The mix number. Gas mixes are stored in slots 236-255. A mix number of 0 will create a mix in
the earliest available spot.
name: A name for the gas that will appear on the front panel. Names greater than six letters will be
cut off.
gas#: Name of the gas, as a string. Supported gas types are:
'Air', 'Ar', 'CH4', 'CO', 'CO2', 'C2H6', 'H2', 'He', 'N2',
'N2O', 'Ne', 'O2', 'C3H8', 'n-C4H10', 'C2H2', 'C2H4',
'i-C2H10', 'Kr', 'Xe', 'SF6', 'C-25', 'C-10', 'C-8', 'C-2',
'C-75', 'A-75', 'A-25', 'A1025', 'Star29', 'P-5'
percent#: The percentage of the mix for the corresponding gas."""

self._test_controller_open()

read = '{addr}VE\r'.format(addr=self.address)
firmware = self._write_and_read(read, retries)
if "2v" in firmware or "3v" in firmware or "4v" in firmware or "GP" in firmware:
raise IOError("This unit does not support COMPOSER gas mixes.")

if mix_no < 236 or mix_no > 255:
raise ValueError("Mix number must be between 236-225!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

255


total_percent = 0
mix_list = [percent1, percent2, percent3, percent4, percent5]
for i in range(0, 5):
if mix_list[i] is not None:
total_percent += mix_list[i]
if total_percent != 100:
raise ValueError("Percentages of gas mix must add to 100%!")

if gas1 is not None and gas1 not in self.gases:
raise ValueError("{} not supported!".format(gas1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a dictionary inputted into the function, this could be something like:

if any(gas not in self.gases for gas in gases):
    raise ValueError("{} not supported!".format(gas))

(no Nones to handle either)

if gas2 is not None and gas2 not in self.gases:
raise ValueError("{} not supported!".format(gas2))
if gas3 is not None and gas3 not in self.gases:
raise ValueError("{} not supported!".format(gas3))
if gas4 is not None and gas4 not in self.gases:
raise ValueError("{} not supported!".format(gas4))
if gas5 is not None and gas5 not in self.gases:
raise ValueError("{} not supported!".format(gas5))

command = '{addr} GM {shortName} {mixNumber} {p1} {g1}' \
' {p2} {g2} {p3} {g3} {p4} {g4} {p5}' \
' {g5}\r'.format(addr=self.address, shortName=name, mixNumber=mix_no, p1=percent1,
g1=self.gases.index(gas1), p2=percent2, g2=self.gases.index(gas2),
p3=percent3 if gas3 is not None else "", g3=self.gases.index(gas3) if gas3 is not None else "", p4=percent4 if gas4 is not None else "",
g4=gas4 if gas4 is not None else "", p5=percent5 if gas5 is not None else "", g5=self.gases.index(gas5) if gas5 is not None else "")
Copy link
Contributor

@patrickfuller patrickfuller May 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also be cleaner and made to not trail so many spaces.

gas_list = ' '.join(' '.join(self.gases.index(gas), percent) for gas, percent in gases.items())
command = ' '.join(self.address, name, mix_no, gas_list) + '\r'

Double join read a little awkward but it avoids you repeating yourself. Could replace the inner join with a small format string as well to try helping readability.


line = self._write_and_read(command, retries)

# If a gas mix is not successfully created, a ? is returned.
if line == '?':
raise IOError("Unable to create mix.")

def delete_mix(self, mix_no, retries=2):
"""Delete a gas mix."""

self._test_controller_open()
command = "{addr}GD{mixNumber}\r".format(addr=self.address, mixNumber=mix_no)
line = self._write_and_read(command, retries)

if line == '?':
raise IOError("Unable to delete mix.")

def lock(self, retries=2):
"""Lock the display.

Only supported on devices with a display."""

self._test_controller_open()
command = '{addr}$$L\r'.format(addr=self.address)
self._write_and_read(command, retries)

def unlock(self, retries=2):
"""Unlock the display.

Only supported on devices with a display."""

self._test_controller_open()
command = '{addr}$$U\r'.format(addr=self.address)
self._write_and_read(command, retries)

def tare_pressure(self, retries=2):
"""Tare the pressure.

Should only be performed if device at ambient conditions
e.g. no flow, device open to atmosphere"""

self._test_controller_open()

command = '{addr}$$PC\r'.format(addr=self.address)
line = self._write_and_read(command, retries)

if line == '?':
raise IOError("Unable to tare pressure.")

def tare_volumetric(self, retries=2):
"""Tare volumetric flow.

Should only be performed if device at ambient conditions
e.g. no flow, device open to atmosphere"""

self._test_controller_open()
command = '{addr}$$V\r'.format(addr=self.address)
line = self._write_and_read(command, retries)
if line.split()[-1] != gas:
raise IOError("Could not set gas type")

if line == '?':
raise IOError("Unable to tare flow.")


def reset_tot(self, retries=2):
"""Reset the totalizer, only valid for mass flow or liquid Alicats with a totalizer"""

self._test_controller_open()
command = '{addr}T\r'.format(addr=self.address)
self._write_and_read(command, retries)


def flush(self):
"""Read all available information. Use to clear queue."""
Expand Down Expand Up @@ -180,7 +334,7 @@ def _write_and_read(self, command, retries=2):
"""Write a command and reads a response from the flow controller."""
self._test_controller_open()

for _ in range(retries+1):
for _ in range(retries + 1):
self.connection.write(command.encode('ascii'))
line = self._readline()
if line:
Expand Down Expand Up @@ -285,6 +439,99 @@ def set_pressure(self, pressure, retries=2):
self._set_control_point('pressure', retries)
self._set_setpoint(pressure, retries)

def hold(self, retries=2):
"""Override command to issue a valve hold.

For a single valve mass flow/pressure controller, hold the valve at the present value.
For a dual valve mass flow controller, hold the valve at the present value.
For a dual valve pressure controller, close both valves.
"""
self._test_controller_open()
command = '{addr}$$H\r'.format(addr=self.address)
self._write_and_read(command, retries)

def cancel_hold(self, retries=2):
"""Cancel valve hold."""

self._test_controller_open()
command = '{addr}$$C\r'.format(addr=self.address)
self._write_and_read(command, retries)

def read_PID(self, retries=2):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might get yelled at by pycodestyle for caps in the function name. The general rule is code style supersedes acronym capitalization, so it could push for read_pid instead.

Personally, I think it's fine either way.

"""Read the current PID values on the controller.

Values include the loop type, P value, D value, and I value.

Values returned as a dictionary"""

self._test_controller_open()

self.pid_keys = ['loop_type', 'P', 'D', 'I']

command = '{addr}$$r85\r'.format(addr=self.address)
read_loop_type = self._write_and_read(command, retries)
spl = read_loop_type.split()

if spl[3] == '2':
loop_type = 'PD2I'
elif spl[3] == '1' or spl[3] == '0':
loop_type = 'PD/PDF'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick here. I'd write as

loop_type = ['PD', 'PDF', 'PD2I'].index(int(spl[3]))

This way, you'll get better errors when spl[3] is malformed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readability here. Do in two lines even if one would work.

loop_num = int(spl[3])
loop_type = ['PD/PDF', 'PD/PDF', 'PD2I'].index(loop_num)


pid_values = [loop_type]
for register in range(21, 24):
value = self._write_and_read('{}$$r{}\r'.format(self.address, register))
value_spl = value.split()
pid_values.append(value_spl[3])

result = {k: (v if k == self.pid_keys[-1] else str(v))
for k, v in zip(self.pid_keys, pid_values)}

return result

def write_PID_looptype(self, looptype, retries=2):
"""Change the PID loop from PD/PDF to PD2I and vice versa

Done by changing the appropriate bits in register 85."""

self._test_controller_open()

if looptype == 'PD/PDF':
command = '{addr}$$w85=1\r'.format(addr=self.address)
elif looptype == 'PD2I':
command = '{addr}$$w85=2\r'.format(addr=self.address)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

command = '{addr}$$w85={loop_num}\r'.format(
    addr=self.address, 
    loop_num=['', 'PD/PDF', 'PD2I'].index(looptype)
)

else:
raise ValueError('Not a valid loop type.')

self._write_and_read(command, retries)

def write_PID_P(self, p_value, retries=2):
"""Changing P value for PID tuning.

P is the proportional control variable and controlls how fast setpoint can be achieved."""
self._test_controller_open()
value = p_value
command = '{addr}$$w21={v}\r'.format(addr=self.address, v=value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could save a line on this and just do:

command = '{addr}$$w21={v}\r'.format(addr=self.address, v=p_value)

Also applies to below functions.

self._write_and_read(command, retries)

def write_PID_D(self, d_value, retries=2):
"""Changing D value for PID tuning.

D is the derivative term and primarily operates to dampen overshoots and reduce oscillations."""
self._test_controller_open()
value = d_value
command = '{addr}$$w22={v}\r'.format(addr=self.address, v=value)
self._write_and_read(command, retries)

def write_PID_I(self, i_value, retries=2):
"""Changing I value for PID tuning.

I is the integral term and accounts for past behaviour to provide a control response.
Only used in PD2I tuning. It can be changed if loop type is PD/PDF but it will have no effect no control."""
self._test_controller_open()
value = i_value
command = '{addr}$$w23={v}\r'.format(addr=self.address, v=value)
self._write_and_read(command, retries)

def _set_setpoint(self, setpoint, retries=2):
"""Set the target setpoint.

Expand All @@ -297,10 +544,8 @@ def _set_setpoint(self, setpoint, retries=2):
setpoint=setpoint)
line = self._write_and_read(command, retries)

# Some Alicat models don't return the setpoint. This accounts for
# these devices.
try:
current = float(line.split()[-2])
current = float(line.split()[5])
except IndexError:
current = None

Expand Down Expand Up @@ -352,6 +597,16 @@ def command_line(args):
flow_controller.set_flow_rate(args.set_flow_rate)
if args.set_pressure is not None:
flow_controller.set_pressure(args.set_pressure)
if args.lock:
flow_controller.lock()
if args.unlock:
flow_controller.unlock()
if args.hold:
flow_controller.hold()
if args.cancel_hold:
flow_controller.cancel_hold()
if args.reset_totalizer:
flow_controller.reset_tot()
state = flow_controller.get()
if args.stream:
try:
Expand Down
Loading