diff --git a/Functional.mon b/Functional.mon index 8c8cc4d..0a4d455 100644 --- a/Functional.mon +++ b/Functional.mon @@ -1097,17 +1097,27 @@ event Fn } static action isEmpty(listener l) returns boolean { return l.empty(); } - static action quit(listener l) { l.quit(); } + static action quit(listener l) returns listener { l.quit(); return l; } - /** TODO */ + /** + Takes a sequence of listeners, waits for a given timeout and then if any of the listeners are still valid quits them and calls an action. + + Equivalent to writing (plus handling for listeners having been quit first): + + on wait (timeout) { for l in listeners { l.quit(); } onTimeout(); } + + @param ls The listeners to quite upon the timeout. + @param timeout The number of seconds to wait before quitting the listeners. + @param onTimeout The function to call if the listeners didn't all terminate before the timeout. + */ static action onTimeout(sequence ls, float timeout, any onTimeout) { action> returns any failure := Util.getGenericAction(onTimeout); if ls.size() > 0 then { - on all wait(timeout) { - if Fn.reduce(Fn.map(ls, isEmpty), Fn._any) { - any _ := Fn.map(Fn.filter(ls, isEmpty, quit)); - failure(new sequence); + on wait(timeout) { + if Fn._any(Fn.map(ls, _not(isEmpty))) { + any _ := Fn.map(Fn.filter(ls, _not(isEmpty)), quit); + _ := failure(new sequence); } } } @@ -1124,6 +1134,8 @@ event Fn @param typeName The type of event to wait for. @param fieldName The field which will contain the values. @param onCompleted an action<> to be called if all the events arrive. + @returns A sequence of listeners suitable for passing to onTimeout. + @see onTimeout */ static action waitForAllCompleted(any seq, string typeName, string fieldName, any onCompleted) returns sequence { @@ -1131,7 +1143,7 @@ event Fn sequence values := Util.getGenericSequence(seq); if values.size() = 0 { any _ := success(new sequence); - return new sequence + return new sequence; } else { sequence valuesRemaining := values.clone(); listener l := on all any(typeName=typeName) as f { @@ -1164,6 +1176,8 @@ event Fn @param endEventName The event which signals all of the valueEventNames have been received. @param eventEventFields The fields in endEventName to filter on. @param onCompleted an action> to be called with all of the valueEventNames once endEventName has been received. + @returns A sequence of listeners suitable for passing to onTimeout. + @see onTimeout */ static action getAllEvents(string valueEventName, dictionary valueEventFields, string endEventName, dictionary endEventFields, any onComplete) returns sequence { @@ -1406,14 +1420,14 @@ event Functional @param typeName The type of event to wait for. @param fieldName The field which will contain the values. - @param timeout The timeout to wait for all the values to arrive. @param onCompleted an action<> to be called if all the events arrive within the timeout. - @param onTimeout an action> to be called if some of the events don't arrive within the timeout. The argument will be all the values which did not arrive. + @returns A Functional containing sequence of listeners suitable for calling onTimeout on. + @see onTimeout @see Fn.waitForAllCompleted */ - action waitForAllCompleted(string typeName, string fieldName, float timeout, any onCompleted, any onTimeout) + action waitForAllCompleted(string typeName, string fieldName, any onCompleted) returns Functional { - Fn.waitForAllCompleted(container, typeName, fieldName, timeout, onCompleted, onTimeout); + return Functional(Fn.waitForAllCompleted(container, typeName, fieldName, onCompleted)); } /** @@ -1445,14 +1459,28 @@ event Functional @param valueEventFields The fields in valueEventName to filter on. @param endEventName The event which signals all of the valueEventNames have been received. @param eventEventFields The fields in endEventName to filter on. - @param timeout The maximum time to wait for endEventName. @param onCompleted an action> to be called with all of the valueEventNames once endEventName has been received. - @param onTimeout an action> to be called with all of the valueEventNames if the timeout occurs before endEventName has been received. + @returns A Functional containing sequence of listeners suitable for calling onTimeout on. + @see onTimeout @see Fn.getAllEvents */ - static action getAllEvents(string valueEventName, dictionary valueEventFields, string endEventName, dictionary endEventFields, float timeout, any onComplete, any onTimeout) + static action getAllEvents(string valueEventName, dictionary valueEventFields, string endEventName, dictionary endEventFields, any onComplete) returns Functional { - Fn.getAllEvents(valueEventName, valueEventFields, endEventName, endEventFields, timeout, onComplete, onTimeout); + return Functional(Fn.getAllEvents(valueEventName, valueEventFields, endEventName, endEventFields, onComplete)); + } + /** + Takes a sequence of listeners, waits for a given timeout and then if any of the listeners are still valid quits them and calls an action. + + Equivalent to writing (plus handling for listeners having been quit first): + + on wait (timeout) { for l in listeners { l.quit(); } onTimeout(); } + + @param timeout The number of seconds to wait before quitting the listeners. + @param onTimeout The function to call if the listeners didn't all terminate before the timeout. + @see Fn.onTimeout + */ + action onTimeout(float timeout, any onTimeout) { + Fn.onTimeout(> container, timeout, onTimeout); } diff --git a/README.md b/README.md index 8c08876..51eca73 100644 --- a/README.md +++ b/README.md @@ -95,7 +95,7 @@ Another common pattern is having an asynchronous process with a completed event. as: - Fn.waitForAllCompleted(sequenceIDs, "Completed", "id", TIMEOUTSECS, onCompleted, onTimeout); + Functional(sequencIDs).waitForAllCompleted("Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout); Lastly we have wanting to receive all events up until a termination condition and then processing them as a collection. Rather than accumulating them all in a container manually with multiple listeners like this: @@ -106,15 +106,16 @@ Lastly we have wanting to receive all events up until a termination condition an Instead you can write: - Fn.getAllEvents("ValueEventName", {...}, "EndEventName", {...}, TIMEOUT, onComplete, onTimeout); + Functional.getAllEvents("ValueEventName", {...}, "EndEventName", {...}, onComplete).onTimeout(TIMEOUTSECS, onTimeout); Here is a list of the event/listener actions on `Fn`. | Action | Arguments | Return | Description | | ------ | --------- | ------ | ----------- | | listenForAnyOf | Sequence of values
Event type and field name
Additional fields
`action` | `sequence` | Create multiple listeners one for each value in the sequence. Call the given action for each matching event which arrives | -| waitForAllCompleted | Sequence of values
Event type and field name
timeout
`action<>` on success and `action>` on timeout | nothing | Take a list of values, wait for an event with each value to be received within a timeout. Call a success or timeout action | -| getAllEvents | Event type name
Dictionary of event fields
Event type name
Dictionary of event fields
timeout
`action>` on success and `action>` on timeout| nothing| Wait for all events of the first type and arguments until receiving an event of the second type and arguments, then call a method with a sequence of all received events. | +| waitForAllCompleted | Sequence of values
Event type and field name
timeout
`action<>` on success | `sequenceDictionary of event fields
Event type name
Dictionary of event fields
`action>` on success | `sequence` | Wait for all events of the first type and arguments until receiving an event of the second type and arguments, then call a method with a sequence of all received events. | +| onTimeout | `sequence`
`action<>` on timeout | nothing | Wait for a timeout and if any of the listeners remain active quit them and call the action. | ## Generators diff --git a/tests/Fluent/Input/test.mon b/tests/Fluent/Input/test.mon index a8f9351..7fc978e 100644 --- a/tests/Fluent/Input/test.mon +++ b/tests/Fluent/Input/test.mon @@ -170,18 +170,18 @@ monitor TestFn { Asserts.assertEquals("waitCompleted "+expected.toString(), expected, true); } - action waitTimeout(boolean expected, sequence missing) + action waitTimeout(boolean expected) { Asserts.assertEquals("waitTimeout "+expected.toString(), expected, true); - Asserts.assertEquals("waitTimeout missing", missing, [40]); } action testwaitforall() { + sequence b; sequence ids := [10, 20, 30]; sequence idsTimeout := [10, 20, 30, 40]; - Functional(ids).waitForAllCompleted("com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, true), Fn.partial(waitTimeout, false)); - Functional(idsTimeout).waitForAllCompleted("com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, false), Fn.partial(waitTimeout, true)); + Functional(ids).waitForAllCompleted("com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, true)).onTimeout(5., Fn.partial(waitTimeout, false)); + Functional(idsTimeout).waitForAllCompleted("com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, false)).onTimeout(5., Fn.partial(waitTimeout, true)); route TestEvent(10, "", ""); route TestEvent(20, "", ""); route TestEvent(30, "", ""); diff --git a/tests/Functional/Input/test.mon b/tests/Functional/Input/test.mon index 4879ce9..b1cb3af 100644 --- a/tests/Functional/Input/test.mon +++ b/tests/Functional/Input/test.mon @@ -358,24 +358,18 @@ monitor TestFn { Asserts.assertEquals("waitCompleted "+expected.toString(), expected, true); } - action waitTimeout(boolean expected, sequence missing) + action waitTimeout(boolean expected) { Asserts.assertEquals("waitTimeout "+expected.toString(), expected, true); - Asserts.assertEquals("waitTimeout missing", missing, [40]); } - action waitThrows() { - throw com.apama.exceptions.Exception("Exception from waitThrows", "testException"); - } action testwaitforall() { sequence ids := [10, 20, 30]; sequence idsTimeout := [10, 20, 30, 40]; - Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, true), Fn.partial(waitTimeout, false)); - Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", 5., Fn.partial(waitCompleted, false), Fn.partial(waitTimeout, true)); - Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", 5., waitThrows, waitThrows); - Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", 5., waitThrows, waitThrows); + Fn.onTimeout(Fn.waitForAllCompleted(ids, "com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, true)), 5., Fn.partial(waitTimeout, false)); + Fn.onTimeout(Fn.waitForAllCompleted(idsTimeout, "com.apamax.test.TestEvent", "id", Fn.partial(waitCompleted, false)), 5., Fn.partial(waitTimeout, true)); route TestEvent(10, "", ""); route TestEvent(20, "", ""); route TestEvent(30, "", ""); @@ -387,17 +381,16 @@ monitor TestFn { Asserts.assertEquals("getAllCompleted received", expected, actual); } - action getAllTimeout(sequence expected, boolean shouldBeCalled, sequence actual) + action getAllTimeout(sequence expected, boolean shouldBeCalled) { Asserts.assertEquals("getAllTimeout "+expected.toString(), shouldBeCalled, true); - Asserts.assertEquals("getAllTimeout received", expected, actual); } action testgetallevents() { sequence events := [TestEvent(1000, "a", "a"), TestEvent(1000, "b", "b"), TestEvent(1000, "c", "c")]; - Fn.getAllEvents("com.apamax.test.TestEvent", {"id":1000}, "com.apamax.test.TestEvent", {"id":2000}, 5., Fn.partial(getAllCompleted, [events, true]), Fn.partial(getAllTimeout, [events, false])); - Fn.getAllEvents("com.apamax.test.TestEvent", {"id":1000}, "com.apamax.test.TestEvent", {"id":3000}, 5., Fn.partial(getAllCompleted, [events, false]), Fn.partial(getAllTimeout, [events, true])); + Fn.onTimeout(Fn.getAllEvents("com.apamax.test.TestEvent", {"id":1000}, "com.apamax.test.TestEvent", {"id":2000}, Fn.partial(getAllCompleted, [events, true])), 5., Fn.partial(getAllTimeout, [events, false])); + Fn.onTimeout(Fn.getAllEvents("com.apamax.test.TestEvent", {"id":1000}, "com.apamax.test.TestEvent", {"id":3000}, Fn.partial(getAllCompleted, [events, false])), 5., Fn.partial(getAllTimeout, [events, true])); any t; for t in events { route t;