diff --git a/CHANGELOG_NEXT.md b/CHANGELOG_NEXT.md index 1b5aaef35c..5c96d0201f 100644 --- a/CHANGELOG_NEXT.md +++ b/CHANGELOG_NEXT.md @@ -265,6 +265,10 @@ This CHANGELOG describes the merged but unreleased changes. Please see [CHANGELO * Refactored `Uninhabited` implementation for `Data.List.Elem`, `Data.List1.Elem`, `Data.SnocList.Elem` and `Data.Vect.Elem` so it can be used for homogeneous (===) and heterogeneous (~=~) equality. +* Added `System.Concurrency.channelGetNonBlocking` for the chez backend. + +* Added `System.Concurrency.channelGetWithTimeout` for the chez backend. + #### Contrib * `Data.List.Lazy` was moved from `contrib` to `base`. @@ -306,7 +310,6 @@ This CHANGELOG describes the merged but unreleased changes. Please see [CHANGELO * Add a missing function parameter (the flag) in the C implementation of `idrnet_recv_bytes` - #### Test * Replaced `Requirement` data type with a new record that can be used to create diff --git a/libs/base/System/Concurrency.idr b/libs/base/System/Concurrency.idr index d6ba41cc33..db600fe03d 100644 --- a/libs/base/System/Concurrency.idr +++ b/libs/base/System/Concurrency.idr @@ -72,19 +72,15 @@ data Condition : Type where [external] %foreign "scheme,racket:blodwen-make-cv" "scheme,chez:blodwen-make-condition" prim__makeCondition : PrimIO Condition - %foreign "scheme,racket:blodwen-cv-wait" "scheme,chez:blodwen-condition-wait" prim__conditionWait : Condition -> Mutex -> PrimIO () - %foreign "scheme,chez:blodwen-condition-wait-timeout" -- "scheme,racket:blodwen-cv-wait-timeout" prim__conditionWaitTimeout : Condition -> Mutex -> Int -> PrimIO () - %foreign "scheme,racket:blodwen-cv-signal" "scheme,chez:blodwen-condition-signal" prim__conditionSignal : Condition -> PrimIO () - %foreign "scheme,racket:blodwen-cv-broadcast" "scheme,chez:blodwen-condition-broadcast" prim__conditionBroadcast : Condition -> PrimIO () @@ -187,6 +183,10 @@ data Channel : Type -> Type where [external] prim__makeChannel : PrimIO (Channel a) %foreign "scheme:blodwen-channel-get" prim__channelGet : Channel a -> PrimIO a +%foreign "scheme,chez:blodwen-channel-get-non-blocking" +prim__channelGetNonBlocking : Channel a -> PrimIO (Maybe a) +%foreign "scheme,chez:blodwen-channel-get-with-timeout" +prim__channelGetWithTimeout : Channel a -> Int -> PrimIO (Maybe a) %foreign "scheme:blodwen-channel-put" prim__channelPut : Channel a -> a -> PrimIO () @@ -208,6 +208,23 @@ export channelGet : HasIO io => (chan : Channel a) -> io a channelGet chan = primIO (prim__channelGet chan) +||| Non-blocking version of channelGet (chez backend). +||| +||| @ chan the channel to receive on +partial +export +channelGetNonBlocking : HasIO io => (chan : Channel a) -> io (Maybe a) +channelGetNonBlocking chan = primIO (prim__channelGetNonBlocking chan) + +||| Timeout version of channelGet (chez backend). +||| +||| @ chan the channel to receive on +||| @ milliseconds how many milliseconds to wait until timeout +partial +export +channelGetWithTimeout : HasIO io => (chan : Channel a) -> (milliseconds : Nat) -> io (Maybe a) +channelGetWithTimeout chan milliseconds = primIO (prim__channelGetWithTimeout chan (cast milliseconds)) + ||| Puts a value on the given channel. ||| ||| @ chan the `Channel` to send the value over diff --git a/support/chez/support.ss b/support/chez/support.ss index 8162585b87..ffb37d4ee1 100644 --- a/support/chez/support.ss +++ b/support/chez/support.ss @@ -439,6 +439,60 @@ (condition-signal read-cv) the-val)) +(define (blodwen-channel-get-non-blocking ty chan) + (if (mutex-acquire (channel-read-mut chan) #f) + (let* ([val-box (channel-val-box chan)] + [read-box (channel-read-box chan)] + [read-cv (channel-read-cv chan)] + [the-val (unbox val-box)] + ) + (if (null? the-val) + (begin + (mutex-release (channel-read-mut chan)) + '()) + (begin + (set-box! val-box '()) + (set-box! read-box #t) + (mutex-release (channel-read-mut chan)) + (condition-signal read-cv) + (box the-val)) + )) + '())) + +(define (blodwen-channel-get-with-timeout ty chan timeout) + (let loop () + (let* ([sec (div timeout 1000)]) + (if (mutex-acquire (channel-read-mut chan) #f) + (let* ([val-box (channel-val-box chan)] + [val-cv (channel-val-cv chan)] + [the-val (unbox val-box)]) + (if (null? the-val) + (begin + ;; Wait for the condition timeout + (condition-wait val-cv (channel-read-mut chan) (make-time 'time-duration 0 sec)) + (let* ([the-val (unbox val-box)]) ; Check again after wait + (if (null? the-val) + (begin + (mutex-release (channel-read-mut chan)) + '()) ; Still empty after timeout + (let* ([read-box (channel-read-box chan)] + [read-cv (channel-read-cv chan)]) + ;; Value now available + (set-box! val-box '()) + (set-box! read-box #t) + (mutex-release (channel-read-mut chan)) + (condition-signal read-cv) + (box the-val))))) + (let* ([read-box (channel-read-box chan)] + [read-cv (channel-read-cv chan)]) + ;; Value available immediately + (set-box! val-box '()) + (set-box! read-box #t) + (mutex-release (channel-read-mut chan)) + (condition-signal read-cv) + (box the-val)))) + loop)))) ; Failed to acquire mutex + ;; Mutex (define (blodwen-make-mutex) @@ -499,7 +553,6 @@ (define (blodwen-clock-second time) (time-second time)) (define (blodwen-clock-nanosecond time) (time-nanosecond time)) - (define (blodwen-arg-count) (length (command-line))) diff --git a/tests/chez/channels007/Main.idr b/tests/chez/channels007/Main.idr new file mode 100644 index 0000000000..4958eda431 --- /dev/null +++ b/tests/chez/channels007/Main.idr @@ -0,0 +1,24 @@ +import System +import System.Concurrency + +-- Test that using channelGetNonBlocking works as expected. +main : IO () +main = do + chan <- makeChannel + threadID <- fork $ do + channelPut chan "Hello" + channelPut chan "Goodbye" + sleep 1 + case !(channelGetNonBlocking chan) of + Nothing => putStrLn "Nothing" + Just val' => putStrLn val' + sleep 1 + case !(channelGetNonBlocking chan) of + Nothing => putStrLn "Nothing" + Just val' => putStrLn val' + sleep 1 + case !(channelGetNonBlocking chan) of + Nothing => putStrLn "Nothing" + Just val' => putStrLn val' + sleep 1 + diff --git a/tests/chez/channels007/expected b/tests/chez/channels007/expected new file mode 100644 index 0000000000..9b5ee8a709 --- /dev/null +++ b/tests/chez/channels007/expected @@ -0,0 +1,3 @@ +Hello +Goodbye +Nothing diff --git a/tests/chez/channels007/run b/tests/chez/channels007/run new file mode 100644 index 0000000000..c6e6f2ab79 --- /dev/null +++ b/tests/chez/channels007/run @@ -0,0 +1,3 @@ +. ../../testutils.sh + +run Main.idr diff --git a/tests/chez/channels008/Main.idr b/tests/chez/channels008/Main.idr new file mode 100644 index 0000000000..a57b18035b --- /dev/null +++ b/tests/chez/channels008/Main.idr @@ -0,0 +1,23 @@ +import Data.Maybe +import System +import System.Concurrency + +-- Simple producing thread. +producer : Channel Nat -> Nat -> IO () +producer c n = ignore $ producer' n + where + producer' : Nat -> IO () + producer' Z = pure () + producer' (S n) = do + channelPut c n + sleep 1 + +-- Test that channelGetWithTimeout works as expected. +main : IO () +main = + do c <- makeChannel + tids <- for [0..11] $ \n => fork $ producer c n + vals <- for [0..11] $ \_ => channelGetWithTimeout c 5000 + ignore $ traverse (\t => threadWait t) tids + putStrLn $ show $ sum $ fromMaybe 0 <$> vals + diff --git a/tests/chez/channels008/expected b/tests/chez/channels008/expected new file mode 100644 index 0000000000..c3f407c095 --- /dev/null +++ b/tests/chez/channels008/expected @@ -0,0 +1 @@ +55 diff --git a/tests/chez/channels008/run b/tests/chez/channels008/run new file mode 100644 index 0000000000..c6e6f2ab79 --- /dev/null +++ b/tests/chez/channels008/run @@ -0,0 +1,3 @@ +. ../../testutils.sh + +run Main.idr