Skip to content

Commit

Permalink
Fix onTimeout, update tests and doc
Browse files Browse the repository at this point in the history
  • Loading branch information
matj-sag committed Dec 12, 2022
1 parent 3dd0680 commit 720c6b7
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 36 deletions.
58 changes: 43 additions & 15 deletions Functional.mon
Original file line number Diff line number Diff line change
Expand Up @@ -1097,17 +1097,27 @@ event Fn
}

static action isEmpty(listener l) returns boolean { return l.empty(); }
static action quit(listener l) { l.quit(); }
static action quit(listener l) returns listener { l.quit(); return l; }

/** TODO */
/**
Takes a sequence of listeners, waits for a given timeout and then if any of the listeners are still valid quits them and calls an action.

Equivalent to writing (plus handling for listeners having been quit first):

<tt>on wait (timeout) { for l in listeners { l.quit(); } onTimeout(); }</tt>

@param ls The listeners to quite upon the timeout.
@param timeout The number of seconds to wait before quitting the listeners.
@param onTimeout The function to call if the listeners didn't all terminate before the timeout.
*/
static action onTimeout(sequence<listener> ls, float timeout, any onTimeout)
{
action<sequence<any>> returns any failure := Util.getGenericAction(onTimeout);
if ls.size() > 0 then {
on all wait(timeout) {
if Fn.reduce(Fn.map(ls, isEmpty), Fn._any) {
any _ := Fn.map(Fn.filter(ls, isEmpty, quit));
failure(new sequence<any>);
on wait(timeout) {
if Fn._any(Fn.map(ls, _not(isEmpty))) {
any _ := Fn.map(Fn.filter(ls, _not(isEmpty)), quit);
_ := failure(new sequence<any>);
}
}
}
Expand All @@ -1124,14 +1134,16 @@ event Fn
@param typeName The type of event to wait for.
@param fieldName The field which will contain the values.
@param onCompleted an action<> to be called if all the events arrive.
@returns A sequence of listeners suitable for passing to onTimeout.
@see onTimeout
*/
static action waitForAllCompleted(any seq, string typeName, string fieldName, any onCompleted) returns sequence<listener>
{
action<sequence<any>> returns any success := Util.getGenericAction(onCompleted);
sequence<any> values := Util.getGenericSequence(seq);
if values.size() = 0 {
any _ := success(new sequence<any>);
return new sequence<listener>
return new sequence<listener>;
} else {
sequence<any> valuesRemaining := values.clone();
listener l := on all any(typeName=typeName) as f {
Expand Down Expand Up @@ -1164,6 +1176,8 @@ event Fn
@param endEventName The event which signals all of the valueEventNames have been received.
@param eventEventFields The fields in endEventName to filter on.
@param onCompleted an action<sequence<any>> to be called with all of the valueEventNames once endEventName has been received.
@returns A sequence of listeners suitable for passing to onTimeout.
@see onTimeout
*/
static action getAllEvents(string valueEventName, dictionary<string, any> valueEventFields, string endEventName, dictionary<string, any> endEventFields, any onComplete) returns sequence<listener>
{
Expand Down Expand Up @@ -1406,14 +1420,14 @@ event Functional

@param typeName The type of event to wait for.
@param fieldName The field which will contain the values.
@param timeout The timeout to wait for all the values to arrive.
@param onCompleted an action<> to be called if all the events arrive within the timeout.
@param onTimeout an action<sequence<any>> to be called if some of the events don't arrive within the timeout. The argument will be all the values which did not arrive.
@returns A Functional containing sequence of listeners suitable for calling onTimeout on.
@see onTimeout
@see Fn.waitForAllCompleted
*/
action waitForAllCompleted(string typeName, string fieldName, float timeout, any onCompleted, any onTimeout)
action waitForAllCompleted(string typeName, string fieldName, any onCompleted) returns Functional
{
Fn.waitForAllCompleted(container, typeName, fieldName, timeout, onCompleted, onTimeout);
return Functional(Fn.waitForAllCompleted(container, typeName, fieldName, onCompleted));
}

/**
Expand Down Expand Up @@ -1445,14 +1459,28 @@ event Functional
@param valueEventFields The fields in valueEventName to filter on.
@param endEventName The event which signals all of the valueEventNames have been received.
@param eventEventFields The fields in endEventName to filter on.
@param timeout The maximum time to wait for endEventName.
@param onCompleted an action<sequence<any>> to be called with all of the valueEventNames once endEventName has been received.
@param onTimeout an action<sequence<any>> to be called with all of the valueEventNames if the timeout occurs before endEventName has been received.
@returns A Functional containing sequence of listeners suitable for calling onTimeout on.
@see onTimeout
@see Fn.getAllEvents
*/
static action getAllEvents(string valueEventName, dictionary<string, any> valueEventFields, string endEventName, dictionary<string, any> endEventFields, float timeout, any onComplete, any onTimeout)
static action getAllEvents(string valueEventName, dictionary<string, any> valueEventFields, string endEventName, dictionary<string, any> endEventFields, any onComplete) returns Functional
{
Fn.getAllEvents(valueEventName, valueEventFields, endEventName, endEventFields, timeout, onComplete, onTimeout);
return Functional(Fn.getAllEvents(valueEventName, valueEventFields, endEventName, endEventFields, onComplete));
}
/**
Takes a sequence of listeners, waits for a given timeout and then if any of the listeners are still valid quits them and calls an action.

Equivalent to writing (plus handling for listeners having been quit first):

<tt>on wait (timeout) { for l in listeners { l.quit(); } onTimeout(); }</tt>

@param timeout The number of seconds to wait before quitting the listeners.
@param onTimeout The function to call if the listeners didn't all terminate before the timeout.
@see Fn.onTimeout
*/
action onTimeout(float timeout, any onTimeout) {
Fn.onTimeout(<sequence<listener>> container, timeout, onTimeout);
}


Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Another common pattern is having an asynchronous process with a completed event.

as:

Fn.waitForAllCompleted(sequenceIDs, "Completed", "id", TIMEOUTSECS, onCompleted, onTimeout);
Functional(sequencIDs).waitForAllCompleted("Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout);

Lastly we have wanting to receive all events up until a termination condition and then processing them as a collection. Rather than accumulating them all in a container manually with multiple listeners like this:

Expand All @@ -106,15 +106,16 @@ Lastly we have wanting to receive all events up until a termination condition an

Instead you can write:

Fn.getAllEvents("ValueEventName", {...}, "EndEventName", {...}, TIMEOUT, onComplete, onTimeout);
Functional.getAllEvents("ValueEventName", {...}, "EndEventName", {...}, onComplete).onTimeout(TIMEOUTSECS, onTimeout);

Here is a list of the event/listener actions on `Fn`.

| Action | Arguments | Return | Description |
| ------ | --------- | ------ | ----------- |
| listenForAnyOf | Sequence of values<br/>Event type and field name<br/>Additional fields<br/>`action<Eventtype>` | `sequence<listener>` | Create multiple listeners one for each value in the sequence. Call the given action for each matching event which arrives |
| waitForAllCompleted | Sequence of values<br/>Event type and field name<br/>timeout<br/>`action<>` on success and `action<sequence<any>>` on timeout | nothing | Take a list of values, wait for an event with each value to be received within a timeout. Call a success or timeout action |
| getAllEvents | Event type name<br/>Dictionary of event fields<br/>Event type name<br/>Dictionary of event fields<br/>timeout<br/>`action<sequence<any>>` on success and `action<sequence<any>>` on timeout| nothing| Wait for all events of the first type and arguments until receiving an event of the second type and arguments, then call a method with a sequence of all received events. |
| waitForAllCompleted | Sequence of values<br/>Event type and field name<br/>timeout<br/>`action<>` on success | `sequence<listener` | Take a list of values, wait for an event with each value to be received and call a method. |
| getAllEvents | Event type name<br/>Dictionary of event fields<br/>Event type name<br/>Dictionary of event fields<br/>`action<sequence<any>>` on success | `sequence<listener>` | Wait for all events of the first type and arguments until receiving an event of the second type and arguments, then call a method with a sequence of all received events. |
| onTimeout | `sequence<listener>`<br/>`action<>` on timeout | nothing | Wait for a timeout and if any of the listeners remain active quit them and call the action. |

## Generators

Expand Down
8 changes: 4 additions & 4 deletions tests/Fluent/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,18 @@ monitor TestFn {
Asserts.assertEquals("waitCompleted "+expected.toString(), expected, true);
}

action waitTimeout(boolean expected, sequence<any> missing)
action waitTimeout(boolean expected)
{
Asserts.assertEquals("waitTimeout "+expected.toString(), expected, true);
Asserts.assertEquals("waitTimeout missing", missing, [<any>40]);
}

action testwaitforall()
{
sequence<boolean> b;
sequence<integer> ids := [10, 20, 30];
sequence<integer> idsTimeout := [10, 20, 30, 40];
Functional(ids).waitForAllCompleted("com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, true), Fn.partial(waitTimeout, false));
Functional(idsTimeout).waitForAllCompleted("com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, false), Fn.partial(waitTimeout, true));
Functional(ids).waitForAllCompleted("com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, true)).onTimeout(5., Fn.partial(waitTimeout, false));
Functional(idsTimeout).waitForAllCompleted("com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, false)).onTimeout(5., Fn.partial(waitTimeout, true));
route TestEvent(10, "", "");
route TestEvent(20, "", "");
route TestEvent(30, "", "");
Expand Down
19 changes: 6 additions & 13 deletions tests/Functional/Input/test.mon
Original file line number Diff line number Diff line change
Expand Up @@ -358,24 +358,18 @@ monitor TestFn {
Asserts.assertEquals("waitCompleted "+expected.toString(), expected, true);
}

action waitTimeout(boolean expected, sequence<any> missing)
action waitTimeout(boolean expected)
{
Asserts.assertEquals("waitTimeout "+expected.toString(), expected, true);
Asserts.assertEquals("waitTimeout missing", missing, [<any>40]);
}

action waitThrows() {
throw com.apama.exceptions.Exception("Exception from waitThrows", "testException");
}

action testwaitforall()
{
sequence<integer> ids := [10, 20, 30];
sequence<integer> idsTimeout := [10, 20, 30, 40];
Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, true), Fn.partial(waitTimeout, false));
Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, false), Fn.partial(waitTimeout, true));
Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", 5., waitThrows, waitThrows);
Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", 5., waitThrows, waitThrows);
Fn.onTimeout(Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, true)), 5., Fn.partial(waitTimeout, false));
Fn.onTimeout(Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, false)), 5., Fn.partial(waitTimeout, true));
route TestEvent(10, "", "");
route TestEvent(20, "", "");
route TestEvent(30, "", "");
Expand All @@ -387,17 +381,16 @@ monitor TestFn {
Asserts.assertEquals("getAllCompleted received", expected, actual);
}

action getAllTimeout(sequence<any> expected, boolean shouldBeCalled, sequence<any> actual)
action getAllTimeout(sequence<any> expected, boolean shouldBeCalled)
{
Asserts.assertEquals("getAllTimeout "+expected.toString(), shouldBeCalled, true);
Asserts.assertEquals("getAllTimeout received", expected, actual);
}

action testgetallevents()
{
sequence<any> events := [<any>TestEvent(1000, "a", "a"), TestEvent(1000, "b", "b"), TestEvent(1000, "c", "c")];
Fn.getAllEvents("com.apamax.test.TestEvent", {"id":<any>1000}, "com.apamax.test.TestEvent", {"id":<any>2000}, 5., Fn.partial(getAllCompleted, [<any>events, true]), Fn.partial(getAllTimeout, [<any>events, false]));
Fn.getAllEvents("com.apamax.test.TestEvent", {"id":<any>1000}, "com.apamax.test.TestEvent", {"id":<any>3000}, 5., Fn.partial(getAllCompleted, [<any>events, false]), Fn.partial(getAllTimeout, [<any>events, true]));
Fn.onTimeout(Fn.getAllEvents("com.apamax.test.TestEvent", {"id":<any>1000}, "com.apamax.test.TestEvent", {"id":<any>2000}, Fn.partial(getAllCompleted, [<any>events, true])), 5., Fn.partial(getAllTimeout, [<any>events, false]));
Fn.onTimeout(Fn.getAllEvents("com.apamax.test.TestEvent", {"id":<any>1000}, "com.apamax.test.TestEvent", {"id":<any>3000}, Fn.partial(getAllCompleted, [<any>events, false])), 5., Fn.partial(getAllTimeout, [<any>events, true]));
any t;
for t in events {
route <TestEvent> t;
Expand Down

0 comments on commit 720c6b7

Please sign in to comment.