-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ATP v3 update #105
ATP v3 update #105
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Fix that build error real quick should be good to go |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @jaredoconnell, I didn't make it all the way through, but here are some thoughts for you.
class ATPServer: | ||
stdin: io.FileIO | ||
stdout: io.FileIO | ||
input_pipe: io.FileIO | ||
output_pipe: io.FileIO | ||
stderr: io.FileIO | ||
step_object: typing.Any | ||
step_ids: typing.Dict[str, str] = {} # Run ID to step IDs | ||
encoder: cbor2.encoder | ||
decoder: cbor2.decoder | ||
user_out_buffer: io.StringIO | ||
encoder_lock = threading.Lock() | ||
plugin_schema: schema.SchemaType | ||
running_threads: typing.List[threading.Thread] = [] | ||
|
||
def __init__( | ||
self, | ||
stdin: io.FileIO, | ||
stdout: io.FileIO, | ||
input_pipe: io.FileIO, | ||
output_pipe: io.FileIO, | ||
stderr: io.FileIO, | ||
) -> None: | ||
self.stdin = stdin | ||
self.stdout = stdout | ||
self.input_pipe = input_pipe | ||
self.output_pipe = output_pipe | ||
self.stderr = stderr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure what all these annotations (e.g., lines 78-80, 82) accomplish: they don't actually define attributes, and your initializer doesn't set all of the attributes that you have annotations for, either (which is somewhat odd). I expect that the annotations serve some sort of documentation purpose, but what they really do is obscure the fact that you're setting a few class attributes, here. It would be worth separating the attributes' initializers from the other annotations.
Also, are you confident in your understanding of the differences between class attributes and instance attributes in Python? step_ids
, encoder_lock
, and running_threads
will be shared between all instances of ATPServer
, so long as they are only modified by the instances and not directly assigned to. This is might exactly what you want for encoder_lock
, but, if different instances of ATPServer
are supposed to be maintaining independent lists of step IDs and threads, this is going to be a problem. (Also, I'm not sure what kind of synchronization you need to coordinate access to these attributes between threads, but I don't see you using any.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be resolved in the next PR. I learned abut the differences between instance and class attributes after I wrote this part, but I forgot to go back and fix it.
# Run the read loop | ||
read_thread = threading.Thread(target=self.run_server_read_loop, args=()) | ||
read_thread.start() | ||
read_thread.join() # Wait for the read thread to finish. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the advantage of running a thread and then immediately waiting for it rather than just calling self.run_server_read_loop()
in the current thread?
# Don't reset stdout/stderr until after the read and step/signal threads are done. | ||
sys.stdout = original_stdout | ||
sys.stderr = original_stderr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is an important invariant to restore before returning, consider placing it in a finally
block and placing lines 118-124 in a try
block.
else: | ||
self.send_error_message(run_id, False, False, f"Unknown runtime message ID: {msg_id}") | ||
self.stderr.write(f"Unknown kind of runtime message: {msg_id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it notable that a message ID value of None
is handled differently from any other unrecognized message ID. How would you get a None
without triggering a decoding error or other exception?
(As an aside: I recommend using keyword parameters to pass literals like boolean values -- the values themselves don't offer any context to the reader when used as positional parameters (and, with two or more in a row, it's easy to screw them up), whereas prefixing them with a keyword makes it clear what they mean (and their ordering ceases to be important). Also, consider defining the parameters with default values, so that you don't need to specify the parameters in most cases -- I think you'll find the result is much more expressive.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It should also use the new send_error_message format.
I'll fix that, and add the keyword parameters in the next PR.
) | ||
except Exception as e: | ||
self.send_error_message(run_id, True, False, | ||
f"Error while calling step {run_id}/{work_start_msg.get('id', 'missing')}:" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller already verified that work_start_msg
includes "id"
, so you don't need to use get()
here.
Consider changing the signature of start_step()
to take the ID and the configuration as separate parameters rather than passing work_start_msg
: this will make this function more graceful, and it will decouple it from the message object.
def send_client_done(self): | ||
self.send_runtime_message(MessageType.CLIENT_DONE, {}) | ||
self.send_runtime_message(MessageType.CLIENT_DONE, "", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Granted, the type of the third parameter of send_runtime_message
is any
, but all the previous calls have sent dict
's...it seems like settling on a particular type would be good. (If you really want to send nothing and don't like {}
, how about None
?)
|
||
def read_results(self) -> (str, any, str): | ||
def read_single_result(self) -> (str, str, any, str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tuples are problematic from a maintenance perspective. Consider using a namedtuple
(or NamedTuple
) or defining a @dataclass
instead, which allow you to name the members, which frees you from having to get the ordering right and allows you to add context to the values.
Changes introduced with this PR
This update adds the changes that correspond to the go SDK ATP v3 update: arcalot/arcaflow-plugin-sdk-go#57
The error reporting in this version is massively improved. Now errors in the ATP and the plugin are properly reported to the ATP client, which is then reported by the engine.
Some of the suggestions by Webb were also addressed in this branch.
By contributing to this repository, I agree to the contribution guidelines.