diff --git a/src/HttpHandler/Builder.fs b/src/Builder.fs similarity index 88% rename from src/HttpHandler/Builder.fs rename to src/Builder.fs index 7d741a0..fc5f7b7 100644 --- a/src/HttpHandler/Builder.fs +++ b/src/Builder.fs @@ -3,7 +3,6 @@ namespace Oryx -open Oryx.Pipeline type RequestBuilder() = member _.Zero() : HttpHandler = httpRequest @@ -12,14 +11,14 @@ type RequestBuilder() = member _.Return(content: 'TResult) : HttpHandler<'TResult> = singleton content member _.ReturnFrom(req: HttpHandler<'TResult>) : HttpHandler<'TResult> = req member _.Delay(fn) = fn () - member _.Combine(source, other) = source |> Core.bind (fun _ -> other) + member _.Combine(source, other) = source |> bind (fun _ -> other) member _.For(source: 'TSource seq, func: 'TSource -> HttpHandler<'TResult>) : HttpHandler<'TResult list> = source |> Seq.map func |> sequential /// Binds value of 'TValue for let! All handlers runs in same context within the builder. member _.Bind(source: HttpHandler<'TSource>, fn: 'TSource -> HttpHandler<'TResult>) : HttpHandler<'TResult> = - source |> Core.bind fn + source |> bind fn [] module Builder = diff --git a/src/Core.fs b/src/Core.fs new file mode 100644 index 0000000..baeb485 --- /dev/null +++ b/src/Core.fs @@ -0,0 +1,19 @@ +// Copyright 2020 Cognite AS +// SPDX-License-Identifier: Apache-2.0 + +namespace Oryx + +open System.Threading.Tasks + +type IHttpNext<'TSource> = + abstract member OnSuccessAsync: ctx: HttpContext * content: 'TSource -> Task + abstract member OnErrorAsync: ctx: HttpContext * error: exn -> Task + abstract member OnCancelAsync: ctx: HttpContext -> Task + +type HttpHandler<'TResult> = IHttpNext<'TResult> -> Task + +exception HttpException of (HttpContext * exn) with + override this.ToString() = + match this :> exn with + | HttpException(_, err) -> err.ToString() + | _ -> failwith "This should not never happen." diff --git a/src/Pipeline/Error.fs b/src/Error.fs similarity index 77% rename from src/Pipeline/Error.fs rename to src/Error.fs index 6a97b55..390dfe9 100644 --- a/src/Pipeline/Error.fs +++ b/src/Error.fs @@ -1,7 +1,6 @@ // Copyright 2020 Cognite AS // SPDX-License-Identifier: Apache-2.0 - -namespace Oryx.Pipeline +namespace Oryx open System @@ -9,12 +8,12 @@ open FSharp.Control.TaskBuilder open Oryx module Error = - /// Handler for protecting the pipeline from exceptions and protocol violations. - let protect<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> = + /// Handler for protecting the HttpHandler from exceptions and protocol violations. + let protect<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = fun next -> let mutable stopped = false - { new IAsyncNext<'TContext, 'TSource> with + { new IHttpNext<'TSource> with member _.OnSuccessAsync(ctx, content) = task { match stopped with @@ -47,12 +46,12 @@ module Error = |> source /// Handler for catching errors and then delegating to the error handler on what to do. - let catch<'TContext, 'TSource> - (errorHandler: 'TContext -> exn -> Pipeline<'TContext, 'TSource>) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TSource> = + let catch<'TSource> + (errorHandler: HttpContext -> exn -> HttpHandler<'TSource>) + (source: HttpHandler<'TSource>) + : HttpHandler<'TSource> = fun next -> - { new IAsyncNext<'TContext, 'TSource> with + { new IHttpNext<'TSource> with member _.OnSuccessAsync(ctx, content) = task { try @@ -66,7 +65,6 @@ module Error = match err with | PanicException error -> return! next.OnErrorAsync(ctx, error) | _ -> do! (errorHandler ctx err) next - } member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } @@ -79,22 +77,22 @@ module Error = | Error | Panic - /// Choose from a list of pipelines to use. The first middleware that succeeds will be used. Handlers will be + /// Choose from a list of HttpHandlers to use. The first middleware that succeeds will be used. Handlers will be /// tried until one does not produce any error, or a `PanicException`. - let choose<'TContext, 'TSource, 'TResult> - (handlers: (Pipeline<'TContext, 'TSource> -> Pipeline<'TContext, 'TResult>) list) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = + let choose<'TSource, 'TResult> + (handlers: (HttpHandler<'TSource> -> HttpHandler<'TResult>) list) + (source: HttpHandler<'TSource>) + : HttpHandler<'TResult> = fun next -> let exns: ResizeArray = ResizeArray() - { new IAsyncNext<'TContext, 'TSource> with + { new IHttpNext<'TSource> with member _.OnSuccessAsync(ctx, content) = let mutable state = ChooseState.Error task { let obv = - { new IAsyncNext<'TContext, 'TResult> with + { new IHttpNext<'TResult> with member _.OnSuccessAsync(ctx, content) = task { exns.Clear() // Clear to avoid buildup of exceptions in streaming scenarios. @@ -106,10 +104,10 @@ module Error = member _.OnErrorAsync(_, error) = task { match error, state with - | PanicException(_), st when st <> ChooseState.Panic -> + | PanicException _, st when st <> ChooseState.Panic -> state <- ChooseState.Panic return! next.OnErrorAsync(ctx, error) - | SkipException(_), st when st = ChooseState.NoError -> + | SkipException _, st when st = ChooseState.NoError -> // Flag error, but do not record skips. state <- ChooseState.Error | _, ChooseState.Panic -> @@ -122,7 +120,7 @@ module Error = member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - /// Proces handlers until `NoError` or `Panic`. + // Process handlers until `NoError` or `Panic`. for handler in handlers do if state = ChooseState.Error then state <- ChooseState.NoError @@ -134,9 +132,9 @@ module Error = () | ChooseState.Error, exns when exns.Count > 1 -> return! next.OnErrorAsync(ctx, AggregateException(exns)) - | ChooseState.Error, exns when exns.Count = 1 -> return! next.OnErrorAsync(ctx, exns.[0]) + | ChooseState.Error, exns when exns.Count = 1 -> return! next.OnErrorAsync(ctx, exns[0]) | ChooseState.Error, _ -> - return! next.OnErrorAsync(ctx, SkipException "Choose: No hander matched") + return! next.OnErrorAsync(ctx, SkipException "Choose: No handler matched") | ChooseState.NoError, _ -> () } @@ -151,12 +149,9 @@ module Error = |> source /// Error handler for forcing error. Use with e.g `req` computational expression if you need to "return" an error. - let fail<'TContext, 'TSource, 'TResult> - (err: Exception) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = + let fail<'TSource, 'TResult> (err: Exception) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> = fun next -> - { new IAsyncNext<'TContext, 'TSource> with + { new IHttpNext<'TSource> with member _.OnSuccessAsync(ctx, content) = next.OnErrorAsync(ctx, err) member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } @@ -164,12 +159,9 @@ module Error = /// Error handler for forcing a panic error. Use with e.g `req` computational expression if you need break out of /// the any error handling e.g `choose` or `catch`•. - let panic<'TContext, 'TSource, 'TResult> - (err: Exception) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = + let panic<'TSource, 'TResult> (err: Exception) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> = fun next -> - { new IAsyncNext<'TContext, 'TSource> with + { new IHttpNext<'TSource> with member _.OnSuccessAsync(ctx, content) = next.OnErrorAsync(ctx, PanicException(err)) diff --git a/src/Pipeline/Exception.fs b/src/Exception.fs similarity index 100% rename from src/Pipeline/Exception.fs rename to src/Exception.fs diff --git a/src/HttpHandler/Fetch.fs b/src/Fetch.fs similarity index 99% rename from src/HttpHandler/Fetch.fs rename to src/Fetch.fs index fb98f6e..2075157 100644 --- a/src/HttpHandler/Fetch.fs +++ b/src/Fetch.fs @@ -11,7 +11,6 @@ open System.Net.Http.Headers open System.Web open FSharp.Control.TaskBuilder -open Oryx.Pipeline [] module Fetch = @@ -101,7 +100,6 @@ module Fetch = response.Content ) - response.Dispose() return result with ex when not (ex :? HttpException) -> diff --git a/src/HttpHandler/HttpContext.fs b/src/HttpContext.fs similarity index 100% rename from src/HttpHandler/HttpContext.fs rename to src/HttpContext.fs diff --git a/src/HttpHandler/HttpError.fs b/src/HttpError.fs similarity index 100% rename from src/HttpHandler/HttpError.fs rename to src/HttpError.fs diff --git a/src/HttpHandler/HttpHandler.fs b/src/HttpHandler.fs similarity index 59% rename from src/HttpHandler/HttpHandler.fs rename to src/HttpHandler.fs index a3e5602..71a14d5 100644 --- a/src/HttpHandler/HttpHandler.fs +++ b/src/HttpHandler.fs @@ -1,3 +1,5 @@ +// Copyright 2020 Cognite AS +// SPDX-License-Identifier: Apache-2.0 namespace Oryx open System.IO @@ -6,42 +8,237 @@ open System.Threading open System.Threading.Tasks open FSharp.Control.TaskBuilder -open Oryx -open Oryx.Pipeline - -type IHttpNext<'TResult> = IAsyncNext -type HttpHandler<'TResult> = Pipeline +open FsToolkit.ErrorHandling -exception HttpException of (HttpContext * exn) with - override this.ToString() = - match this :> exn with - | HttpException(_, err) -> err.ToString() - | _ -> failwith "This should not never happen." +open Oryx [] module HttpHandler = - /// Run the HTTP handler in the given context. Returns content as result type. - let runAsync<'TResult> (handler: HttpHandler<'TResult>) = - Core.runAsync handler + /// Swap first with last arg so we can pipe onSuccess + let swapArgs fn = fun a b c -> fn c a b + + /// A next continuation for observing the final result. + let finish<'TResult> (tcs: TaskCompletionSource<'TResult>) : IHttpNext<'TResult> = + { new IHttpNext<'TResult> with + member x.OnSuccessAsync(_, response) = task { tcs.SetResult response } + member x.OnErrorAsync(ctx, error) = task { tcs.SetException error } + member x.OnCancelAsync(ctx) = task { tcs.SetCanceled() } } /// Run the HTTP handler in the given context. Returns content and throws exception if any error occured. - let runUnsafeAsync<'TResult> (handler: HttpHandler<'TResult>) = - Core.runUnsafeAsync handler + let runUnsafeAsync<'TResult> (handler: HttpHandler<'TResult>) : Task<'TResult> = + let tcs = TaskCompletionSource<'TResult>() + + task { + do! finish tcs |> handler + return! tcs.Task + } - /// Produce a single value using the default context. - let singleton<'TSource> = - Core.singleton HttpContext.defaultContext + /// Run the HTTP handler in the given context. Returns content as result type. + let runAsync<'TResult> (handler: HttpHandler<'TResult>) : Task> = + task { + try + let! value = runUnsafeAsync handler + return Ok value + with error -> + return Error error + } + + /// Produce the given content. + let singleton<'TSource> (content: 'TSource) : HttpHandler<'TSource> = + fun next -> next.OnSuccessAsync(HttpContext.defaultContext, content) /// Map the content of the HTTP handler. - let map<'TSource, 'TResult> = Core.map + let map<'TSource, 'TResult> (mapper: 'TSource -> 'TResult) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> = + fun next -> + //fun ctx content -> success ctx (mapper content) + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, content) = + try + next.OnSuccessAsync(ctx, mapper content) + with error -> + next.OnErrorAsync(ctx, error) + + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Bind the content of the HTTP handler.. + let bind<'TSource, 'TResult> + (fn: 'TSource -> HttpHandler<'TResult>) + (source: HttpHandler<'TSource>) + : HttpHandler<'TResult> = + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, content) = + task { + let handler = fn content + return! handler next + } + + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Run list of HTTP handlers concurrently. + let concurrent<'TSource, 'TResult> (handlers: seq>) : HttpHandler<'TResult list> = + fun next -> + task { + let res: Result array = + Array.zeroCreate (Seq.length handlers) + + let obv n = + { new IHttpNext<'TResult> with + member _.OnSuccessAsync(ctx, content) = task { res[n] <- Ok(ctx, content) } + member _.OnErrorAsync(ctx, err) = task { res[n] <- Error(ctx, err) } + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + + let tasks = handlers |> Seq.mapi (fun n handler -> handler (obv n)) + + let! _ = Task.WhenAll(tasks) + + let result = res |> List.ofSeq |> List.sequenceResultM + + match result with + | Ok results -> + let results, contents = results |> List.unzip + let bs = HttpContext.merge results + return! next.OnSuccessAsync(bs, contents) + | Error(_, err) -> raise err + } + + /// Run list of HTTP handlers sequentially. + let sequential<'TSource, 'TResult> (handlers: seq>) : HttpHandler<'TResult list> = + fun next -> + task { + let res = ResizeArray>() + + let obv = + { new IHttpNext<'TResult> with + member _.OnSuccessAsync(ctx, content) = task { Ok(ctx, content) |> res.Add } + member _.OnErrorAsync(ctx, err) = task { res.Add(Error(ctx, err)) } + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + + for handler in handlers do + do! handler obv + + let result = res |> List.ofSeq |> List.sequenceResultM + + match result with + | Ok results -> + let results, contents = results |> List.unzip + let bs = HttpContext.merge results + return! next.OnSuccessAsync(bs, contents) + | Error(_, err) -> raise err + } + + /// Chunks a sequence of HTTP handlers into sequential and concurrent batches. + let chunk<'TSource, 'TResult> + (chunkSize: int) + (maxConcurrency: int) + (handler: seq<'TSource> -> HttpHandler>) + (items: seq<'TSource>) + : HttpHandler> = + items + |> Seq.chunkBySize chunkSize + |> Seq.chunkBySize maxConcurrency + |> Seq.map (Seq.map handler >> concurrent) + |> sequential + // Collect results + |> map (Seq.ofList >> Seq.collect (Seq.collect id)) + + /// Handler that skips (ignores) the content and outputs unit. + let ignoreContent<'TSource> (source: HttpHandler<'TSource>) : HttpHandler = + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, content) = next.OnSuccessAsync(ctx, ()) + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source - /// Update (map) the context. + /// Caches the last content value and context. + let cache<'TSource> (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = + let mutable cache: (HttpContext * 'TSource) option = None + + fun next -> + task { + match cache with + | Some(ctx, content) -> return! next.OnSuccessAsync(ctx, content) + | _ -> + return! + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, content) = + task { + cache <- Some(ctx, content) + return! next.OnSuccessAsync(ctx, content) + } + + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + } + + /// Never produces a result. + let never _ = task { () } + + /// Completes the current request. + let empty (ctx: HttpContext) : HttpHandler = + fun next -> next.OnSuccessAsync(ctx, ()) + + /// Filter content using a predicate function. + let filter<'TSource> (predicate: 'TSource -> bool) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, value) = + task { + if predicate value then + return! next.OnSuccessAsync(ctx, value) + } + + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Validate content using a predicate function. Same as filter ut produces an error if validation fails. + let validate<'TSource> (predicate: 'TSource -> bool) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, value) = + if predicate value then + next.OnSuccessAsync(ctx, value) + else + next.OnErrorAsync(ctx, SkipException "Validation failed") + + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Retrieves the content. + let await<'TSource> () (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = + source |> map<'TSource, 'TSource> id + + /// Asks for the given HTTP context and produces a content value using the context. + let ask<'TSource> (source: HttpHandler<'TSource>) : HttpHandler = + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, _) = next.OnSuccessAsync(ctx, ctx) + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Update (asks) the context. let update<'TSource> (update: HttpContext -> HttpContext) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = - Core.update update source + fun next -> + { new IHttpNext<'TSource> with + member _.OnSuccessAsync(ctx, content) = + next.OnSuccessAsync(update ctx, content) - /// Bind the content. - let bind<'TSource, 'TResult> fn source = - Core.bind fn source + member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) + member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } + |> source + + /// Replaces the value with a constant. + let replace<'TSource, 'TResult> (value: 'TResult) (source: HttpHandler<'TSource>) : HttpHandler<'TResult> = + map (fun _ -> value) source /// Add HTTP header to context. let withHeader (header: string * string) (source: HttpHandler<'TSource>) : HttpHandler<'TSource> = @@ -123,41 +320,20 @@ module HttpHandler = member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } |> source - /// Chunks a sequence of HTTP handlers into sequential and concurrent batches. - let chunk<'TSource, 'TResult> = - Core.chunk HttpContext.merge - - /// Run list of HTTP handlers sequentially. - let sequential<'TSource, 'TResult> = - Core.sequential HttpContext.merge - - /// Run list of HTTP handlers concurrently. - let concurrent<'TSource, 'TResult> = - Core.concurrent HttpContext.merge - /// Catch handler for catching errors and then delegating to the error handler on what to do. - let catch<'TSource> = Error.catch + let catch<'TSource> = Error.catch<'TSource> /// Choose from a list of handlers to use. The first handler that succeeds will be used. - let choose<'TSource, 'TResult> = Error.choose + let choose<'TSource, 'TResult> = Error.choose<'TSource, 'TResult> /// Error handler for forcing error. Use with e.g `req` computational expression if you need to "return" an error. let fail<'TSource, 'TResult> error source = - Error.fail error source + Error.fail<'TSource, 'TResult> error source /// Error handler for forcing a panic error. Use with e.g `req` computational expression if you need break out of /// the any error handling e.g `choose` or `catch`•. let panic<'TSource, 'TResult> error source = - Error.panic error source - - /// Validate content using a predicate function. - let validate<'TSource> = Core.validate - - /// Handler that skips (ignores) the content and outputs unit. - let ignoreContent<'TSource> (source: HttpHandler<'TSource>) : HttpHandler = - source |> Core.ignoreContent - - let replace<'TSource, 'TResult> = Core.replace + Error.panic<'TSource, 'TResult> error source /// Parse response stream to a user specified type synchronously. let parse<'TResult> (parser: Stream -> 'TResult) (source: HttpHandler) : HttpHandler<'TResult> = @@ -348,12 +524,4 @@ module HttpHandler = |> source /// Starts a pipeline using an empty request with the default context. - let httpRequest: HttpHandler = - Core.empty HttpContext.defaultContext - - /// Caches the last content value and context. - let cache<'TSource> = Core.cache - - /// Asks for the given HTTP context and produces a content value using the context. - let ask<'TSource> (source: HttpHandler<'TSource>) : HttpHandler = - Core.ask source + let httpRequest: HttpHandler = empty HttpContext.defaultContext diff --git a/src/HttpHandler/Logging.fs b/src/Logging.fs similarity index 96% rename from src/HttpHandler/Logging.fs rename to src/Logging.fs index da1439f..0a3e153 100644 --- a/src/HttpHandler/Logging.fs +++ b/src/Logging.fs @@ -26,10 +26,10 @@ module Logging = let private getHeaderValue (headers: Map>) (key: string) : string = match headers.TryGetValue(key) with - | (true, v) -> + | true, v -> match Seq.tryHead v with | first -> if first.IsSome then first.Value else String.Empty - | (false, _) -> String.Empty + | false, _ -> String.Empty let private log' logLevel ctx content = match ctx.Request.Logger with @@ -45,7 +45,7 @@ module Logging = matches |> Seq.cast |> Seq.map (fun (match': Match) -> - match match'.Groups.[1].Value with + match match'.Groups[1].Value with | PlaceHolder.HttpMethod -> box request.Method | PlaceHolder.RequestContent -> ctx.Request.ContentBuilder @@ -56,7 +56,7 @@ module Logging = | PlaceHolder.ResponseHeader -> // GroupCollection returns empty string values for indexes beyond what was captured, therefore // we don't cause an exception here if the optional second group was not captured - getHeaderValue (ctx.Response.Headers) (match'.Groups.[3].Value) :> _ + getHeaderValue ctx.Response.Headers match'.Groups[3].Value :> _ | key -> // Look for the key in the extra info. This also enables custom HTTP handlers to add custom // placeholders to the format string. diff --git a/src/HttpHandler/Metrics.fs b/src/Metrics.fs similarity index 100% rename from src/HttpHandler/Metrics.fs rename to src/Metrics.fs diff --git a/src/Oryx.fsproj b/src/Oryx.fsproj index b0a867f..5a0547a 100644 --- a/src/Oryx.fsproj +++ b/src/Oryx.fsproj @@ -13,16 +13,15 @@ - - - - - - - - - - + + + + + + + + + diff --git a/src/Pipeline/Core.fs b/src/Pipeline/Core.fs deleted file mode 100644 index 8052c06..0000000 --- a/src/Pipeline/Core.fs +++ /dev/null @@ -1,268 +0,0 @@ -// Copyright 2020 Cognite AS -// SPDX-License-Identifier: Apache-2.0 - -namespace Oryx.Pipeline - -open System.Threading.Tasks - -open FSharp.Control.TaskBuilder -open FsToolkit.ErrorHandling -open Oryx - -type IAsyncNext<'TContext, 'TSource> = - abstract member OnSuccessAsync: ctx: 'TContext * content: 'TSource -> Task - abstract member OnErrorAsync: ctx: 'TContext * error: exn -> Task - abstract member OnCancelAsync: ctx: 'TContext -> Task - -type Pipeline<'TContext, 'TSource> = IAsyncNext<'TContext, 'TSource> -> Task - -module Core = - /// Swap first with last arg so we can pipe onSuccess - let swapArgs fn = fun a b c -> fn c a b - - /// A next continuation for observing the final result. - let finish<'TContext, 'TResult> (tcs: TaskCompletionSource<'TResult>) : IAsyncNext<'TContext, 'TResult> = - - { new IAsyncNext<'TContext, 'TResult> with - member x.OnSuccessAsync(_, response) = task { tcs.SetResult response } - member x.OnErrorAsync(ctx, error) = task { tcs.SetException error } - member x.OnCancelAsync(ctx) = task { tcs.SetCanceled() } } - - /// Run the HTTP handler in the given context. Returns content and throws exception if any error occured. - let runUnsafeAsync<'TContext, 'TResult> (handler: Pipeline<'TContext, 'TResult>) : Task<'TResult> = - let tcs = TaskCompletionSource<'TResult>() - - task { - do! finish tcs |> handler - return! tcs.Task - } - - /// Run the HTTP handler in the given context. Returns content as result type. - let runAsync<'TContext, 'TResult> (handler: Pipeline<'TContext, 'TResult>) : Task> = - task { - try - let! value = runUnsafeAsync handler - return Ok value - with error -> - return Error error - } - - /// Produce the given content. - let singleton<'TContext, 'TSource> (ctx: 'TContext) (content: 'TSource) : Pipeline<'TContext, 'TSource> = - fun next -> next.OnSuccessAsync(ctx, content) - - /// Map the content of the middleware. - let map<'TContext, 'TSource, 'TResult> - (mapper: 'TSource -> 'TResult) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = - - fun next -> - //fun ctx content -> succes ctx (mapper content) - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, content) = - try - next.OnSuccessAsync(ctx, mapper content) - with error -> - next.OnErrorAsync(ctx, error) - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - /// Bind the content of the middleware. - let bind<'TContext, 'TSource, 'TResult> - (fn: 'TSource -> Pipeline<'TContext, 'TResult>) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, content) = - task { - let handler = fn content - return! handler next - } - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - let concurrent<'TContext, 'TSource, 'TResult> - (merge: 'TContext list -> 'TContext) - (handlers: seq>) - : Pipeline<'TContext, 'TResult list> = - fun next -> - task { - let res: Result<'TContext * 'TResult, 'TContext * exn> array = - Array.zeroCreate (Seq.length handlers) - - let obv n ctx content = task { res.[n] <- Ok(ctx, content) } - - let obv n = - { new IAsyncNext<'TContext, 'TResult> with - member _.OnSuccessAsync(ctx, content) = task { res.[n] <- Ok(ctx, content) } - member _.OnErrorAsync(ctx, err) = task { res.[n] <- Error(ctx, err) } - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - - let tasks = handlers |> Seq.mapi (fun n handler -> handler (obv n)) - - let! _ = Task.WhenAll(tasks) - - let result = res |> List.ofSeq |> List.sequenceResultM - - match result with - | Ok results -> - let results, contents = results |> List.unzip - let bs = merge results - return! next.OnSuccessAsync(bs, contents) - | Error(_, err) -> raise err - } - - /// Run list pipelines sequentially. - let sequential<'TContext, 'TSource, 'TResult> - (merge: 'TContext list -> 'TContext) - (handlers: seq>) - : Pipeline<'TContext, 'TResult list> = - fun next -> - task { - let res = ResizeArray>() - - let obv = - { new IAsyncNext<'TContext, 'TResult> with - member _.OnSuccessAsync(ctx, content) = task { Ok(ctx, content) |> res.Add } - member _.OnErrorAsync(ctx, err) = task { res.Add(Error(ctx, err)) } - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - - for handler in handlers do - do! handler obv - - let result = res |> List.ofSeq |> List.sequenceResultM - - match result with - | Ok results -> - let results, contents = results |> List.unzip - let bs = merge results - return! next.OnSuccessAsync(bs, contents) - | Error(_, err) -> raise err - } - - /// Chunks a sequence of Middlewares into a combination of sequential and concurrent batches. - let chunk<'TContext, 'TSource, 'TResult> - (merge: 'TContext list -> 'TContext) - (chunkSize: int) - (maxConcurrency: int) - (handler: seq<'TSource> -> Pipeline<'TContext, seq<'TResult>>) - (items: seq<'TSource>) - : Pipeline<'TContext, seq<'TResult>> = - items - |> Seq.chunkBySize chunkSize - |> Seq.chunkBySize maxConcurrency - |> Seq.map (Seq.map handler >> concurrent merge) - |> sequential merge - // Collect results - |> map (Seq.ofList >> Seq.collect (Seq.collect id)) - - /// Handler that skips (ignores) the content and outputs unit. - let ignoreContent<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, unit> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, content) = next.OnSuccessAsync(ctx, ()) - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - let cache<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> = - let mutable cache: ('TContext * 'TSource) option = None - - fun next -> - task { - match cache with - | Some(ctx, content) -> return! next.OnSuccessAsync(ctx, content) - | _ -> - return! - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, content) = - task { - cache <- Some(ctx, content) - return! next.OnSuccessAsync(ctx, content) - } - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - } - - /// Never produces a result. - let never _ = task { () } - - /// Completes the current request. - let empty<'TContext> (ctx: 'TContext) : Pipeline<'TContext, unit> = - fun next -> next.OnSuccessAsync(ctx, ()) - - /// Filter content using a predicate function. - let filter<'TContext, 'TSource> - (predicate: 'TSource -> bool) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TSource> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, value) = - task { - if predicate value then - return! next.OnSuccessAsync(ctx, value) - } - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - /// Validate content using a predicate function. Same as filter ut produces an error if validation fails. - let validate<'TContext, 'TSource> - (predicate: 'TSource -> bool) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TSource> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, value) = - if predicate value then - next.OnSuccessAsync(ctx, value) - else - next.OnErrorAsync(ctx, SkipException "Validation failed") - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - /// Retrieves the content. - let await<'TContext, 'TSource> () (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TSource> = - source |> map<'TContext, 'TSource, 'TSource> id - - /// Returns the current environment. - let ask<'TContext, 'TSource> (source: Pipeline<'TContext, 'TSource>) : Pipeline<'TContext, 'TContext> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, _) = next.OnSuccessAsync(ctx, ctx) - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - /// Update (asks) the context. - let update<'TContext, 'TSource> - (update: 'TContext -> 'TContext) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TSource> = - fun next -> - { new IAsyncNext<'TContext, 'TSource> with - member _.OnSuccessAsync(ctx, content) = - next.OnSuccessAsync(update ctx, content) - - member _.OnErrorAsync(ctx, exn) = next.OnErrorAsync(ctx, exn) - member _.OnCancelAsync(ctx) = next.OnCancelAsync(ctx) } - |> source - - /// Replaces the value with a constant. - let replace<'TContext, 'TSource, 'TResult> - (value: 'TResult) - (source: Pipeline<'TContext, 'TSource>) - : Pipeline<'TContext, 'TResult> = - map (fun _ -> value) source diff --git a/version b/version index b5db170..f4965a3 100644 --- a/version +++ b/version @@ -1 +1 @@ -5.4.3 \ No newline at end of file +6.0.0 \ No newline at end of file