Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor Work Items fixes #1116

Merged
merged 4 commits into from
Oct 26, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 45 additions & 29 deletions packages/main/src/RPA/Robocorp/WorkItems.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ def adapter(self):
return self._adapter

@property
def current(self):
def current(self) -> WorkItem:
if self._current is None:
raise RuntimeError("No active work item")

Expand All @@ -981,6 +981,14 @@ def current(self, value):

self._current = value

@property
def active_input(self) -> Optional[WorkItem]:
if self._current and self._current.parent_id is None: # input set as current
return self._current
if self.inputs: # other current item set, and taking the last input
return self.inputs[-1]
return None

def _load_adapter(self, default) -> Type[BaseAdapter]:
"""Load adapter by name, using env or given default."""
adapter = required_env("RPA_WORKITEMS_ADAPTER", default)
Expand Down Expand Up @@ -1089,7 +1097,7 @@ def _start_suite(self, *_):
return

try:
self.get_input_work_item()
self.get_input_work_item(_internal_call=True)
# pylint: disable=broad-except
except Exception as exc:
logging.warning("Failed to load input work item: %s", exc)
Expand All @@ -1109,17 +1117,15 @@ def _release_on_failure(self, attributes):
state=State.FAILED,
exception_type=Error.APPLICATION,
message=message,
_auto_release=True,
_internal_release=True,
Copy link
Contributor Author

@cmin764 cmin764 Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@osrjv Like you thought, wondering if now it makes sense to just release all the non-released items as FAIL during the failed Step Run. (iterating through all the inputs and checking if there's one that wasn't released and for each applying this kind of release)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, we decided to pull back the auto_release switch.

)

def _end_suite(self, _, attributes):
"""Robot Framework listener method, called when the suite ends."""
# pylint: disable=unused-argument
for item in self.inputs + self.outputs:
if item.is_dirty:
logging.warning(
"%s has unsaved changes that will be discarded", self.current
)
logging.warning("%s has unsaved changes that will be discarded", item)

self._release_on_failure(attributes)

Expand Down Expand Up @@ -1165,7 +1171,9 @@ def set_current_work_item(self, item: WorkItem):
self.current = item

@keyword
def get_input_work_item(self, _internal_call: bool = False):
def get_input_work_item(
self, auto_release: bool = True, _internal_call: bool = False
) -> WorkItem:
"""Load the next work item from the input queue, and set it as the active work
item.

Expand All @@ -1174,13 +1182,17 @@ def get_input_work_item(self, _internal_call: bool = False):
If the library import argument ``autoload`` is truthy (default),
this is called automatically when the Robot Framework suite
starts.

:param auto_release: Automatically release the lastly retrieved (or currently
set) input item when this is `True`.
"""
if not _internal_call:
self._raise_under_iteration("Get Input Work Item")

# Automatically release (with success) the lastly retrieved input work item
# when asking for the next one.
self.release_input_work_item(State.DONE, _auto_release=True)
# when asking for the next one. (or the currently set input if such set)
if auto_release:
self.release_input_work_item(State.DONE, _internal_release=True)

item_id = self.adapter.reserve_input()
item = WorkItem(item_id=item_id, parent_id=None, adapter=self.adapter)
Expand Down Expand Up @@ -1261,7 +1273,7 @@ def create_output_work_item(
"call `Get Input Work Item` first"
)

parent = self.inputs[-1]
parent = self.active_input
if parent.state is not None:
raise RuntimeError(
"Can't create any more output work items since the last input was "
Expand Down Expand Up @@ -1314,7 +1326,7 @@ def clear_work_item(self):
wi.clear_work_item()
wi.save_work_item()
"""
self.current.payload = {}
self.current.payload.clear()
self.remove_work_item_files("*")

@keyword
Expand Down Expand Up @@ -1711,11 +1723,11 @@ def _raise_under_iteration(self, action: str) -> None:
raise RuntimeError(f"Can't {action} while iterating input work items")

def _ensure_input_for_iteration(self) -> bool:
last_input = self.inputs[-1] if self.inputs else None
last_state = last_input.state if last_input else None
if not last_input or last_state:
# There are no inputs loaded yet or the last retrieved input work
# item is already processed. Time for trying to load a new one.
active_input = self.active_input
active_state = active_input.state if active_input else None
if not active_input or active_state:
# There are no inputs loaded yet or the lastly retrieved input work
# item is already processed. Time for trying to load a new one.
try:
self.get_input_work_item(_internal_call=True)
except EmptyQueue:
Expand All @@ -1730,6 +1742,7 @@ def for_each_input_work_item(
*args,
items_limit: int = 0,
return_results: bool = True,
auto_release: bool = True,
**kwargs,
) -> List[Any]:
"""Run a keyword or function for each work item in the input queue.
Expand All @@ -1746,6 +1759,8 @@ def for_each_input_work_item(
otherwise all the items are retrieved from the queue until depletion
:param return_results: Collect and return a list of results given each
keyword/function call if truthy
:param auto_release: Automatically release the lastly retrieved (or currently
set) input item when this is `True`.

Example:

Expand Down Expand Up @@ -1806,7 +1821,8 @@ def log_payloads():
result = to_call()
if return_results:
results.append(result)
self.release_input_work_item(State.DONE, _auto_release=True)
if auto_release:
self.release_input_work_item(State.DONE, _internal_release=True)

count += 1
if items_limit and count >= items_limit:
Expand All @@ -1823,7 +1839,7 @@ def release_input_work_item(
exception_type: Optional[Union[Error, str]] = None,
code: Optional[str] = None,
message: Optional[str] = None,
_auto_release: bool = False,
_internal_release: bool = False,
):
"""Release the lastly retrieved input work item and set its state.

Expand Down Expand Up @@ -1887,26 +1903,26 @@ def process_and_set_state():

process_and_set_state()
"""
# Note that `_auto_release` here is True when automatically releasing items.
# (internal call)
# Note that `_internal_release` here is True when automatically releasing items
# within our internal library logic.

last_input = self.inputs[-1] if self.inputs else None
if not last_input:
if _auto_release:
active_input = self.active_input
if not active_input:
if _internal_release:
# Have nothing to release and that's normal (reserving for the first
# time).
return
raise RuntimeError(
"Can't release without reserving first an input work item"
)
if last_input.state is not None:
if _auto_release:
if active_input.state is not None:
if _internal_release:
# Item already released and that's normal when reaching an empty queue
# and we ask for another item again. We don't want to set states twice.
return
raise RuntimeError("Input work item already released")
assert last_input.parent_id is None, "set state on output item"
assert last_input.id is not None, "set state on input item with null ID"
assert active_input.parent_id is None, "set state on output item"
assert active_input.id is not None, "set state on input item with null ID"

# RF automatically converts string "DONE" to State.DONE object if only `State`
# type annotation is used in the keyword definition.
Expand All @@ -1933,8 +1949,8 @@ def process_and_set_state():
exc_types = ", ".join(list(Error.__members__))
raise RuntimeError(f"Must specify failure type from: {exc_types}")

self.adapter.release_input(last_input.id, state, exception=exception)
last_input.state = state
self.adapter.release_input(active_input.id, state, exception=exception)
active_input.state = state

@keyword
def get_current_work_item(self) -> WorkItem:
Expand Down
Loading