Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add streaming event data mode (6.0 rewrite, part 2) #94

Open
wants to merge 4 commits into
base: eb/sc-183031/rewrite-with-pull-model
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/LaunchDarkly.EventSource/Configuration.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using LaunchDarkly.Logging;

namespace LaunchDarkly.EventSource
Expand Down Expand Up @@ -47,6 +48,12 @@ public sealed class Configuration
/// <seealso cref="ConfigurationBuilder.ErrorStrategy(ErrorStrategy)"/>
public ErrorStrategy ErrorStrategy { get; }

/// <summary>
/// A set of field names that are expected to appear before the data field in streaming mode.
/// </summary>
/// <seealso cref="ConfigurationBuilder.ExpectFields(string[])"/>
public HashSet<string> ExpectFields { get; }

/// <summary>
/// The initial amount of time to wait before attempting to reconnect to the EventSource API.
/// </summary>
Expand Down Expand Up @@ -80,6 +87,13 @@ public sealed class Configuration
/// <seealso cref="ConfigurationBuilder.RetryDelayResetThreshold(TimeSpan)"/>
public TimeSpan RetryDelayResetThreshold { get; }

/// <summary>
/// True if EventSource should return not-fully-read events containing a stream for
/// reading the data.
/// </summary>
/// <seealso cref="ConfigurationBuilder.StreamEventData(bool)"/>
public bool StreamEventData { get; }

#endregion

#region Internal Constructor
Expand All @@ -91,11 +105,13 @@ internal Configuration(ConfigurationBuilder builder)

ConnectStrategy = builder._connectStrategy;
ErrorStrategy = builder._errorStrategy ?? ErrorStrategy.AlwaysThrow;
ExpectFields = builder._expectFields;
InitialRetryDelay = builder._initialRetryDelay;
LastEventId = builder._lastEventId;
Logger = logger ?? Logs.None.Logger("");
RetryDelayStrategy = builder._retryDelayStrategy ?? RetryDelayStrategy.Default;
RetryDelayResetThreshold = builder._retryDelayResetThreshold;
StreamEventData = builder._streamEventData;
}

#endregion
Expand Down
102 changes: 102 additions & 0 deletions src/LaunchDarkly.EventSource/ConfigurationBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading;
using LaunchDarkly.EventSource.Events;
using LaunchDarkly.EventSource.Exceptions;
using LaunchDarkly.Logging;

namespace LaunchDarkly.EventSource
Expand Down Expand Up @@ -30,11 +32,13 @@ public class ConfigurationBuilder
internal TimeSpan _initialRetryDelay = Configuration.DefaultInitialRetryDelay;
internal ConnectStrategy _connectStrategy;
internal ErrorStrategy _errorStrategy;
internal HashSet<string> _expectFields;
internal string _lastEventId;
internal ILogAdapter _logAdapter;
internal Logger _logger;
internal RetryDelayStrategy _retryDelayStrategy;
internal TimeSpan _retryDelayResetThreshold = Configuration.DefaultRetryDelayResetThreshold;
internal bool _streamEventData;

#endregion

Expand Down Expand Up @@ -80,6 +84,50 @@ public ConfigurationBuilder ErrorStrategy(ErrorStrategy errorStrategy)
return this;
}

/// <summary>
/// Specifies that the application expects the server to send certain fields in every event.
/// </summary>
/// <remarks>
/// <para>
/// This setting makes no difference unless you have enabled <see cref="StreamEventData(bool)"/>
/// mode. In that case, it causes EventSource to only use the streaming data mode for an event
/// if the specified fields have already been received; otherwise, it will buffer the whole
/// event (as if <see cref="StreamEventData(bool)"/> had not been enabled), to ensure that those
/// fields are not lost if they appear after the <c>data:</c> field.
/// </para>
/// <para>
/// For instance, if you had called <c>ExpectFields("event")</c>, then EventSource would be able
/// to use streaming data mode for the following SSE response--
/// </para>
/// <code>
/// event: hello
/// data: here is some very long streaming data
/// </code>
/// <para>
/// --but it would buffer the full event if the server used the opposite order:
/// </para>
/// <code>
/// data: here is some very long streaming data
/// event: hello
/// </code>
/// <para>
/// Such behavior is not automatic because in some applications, there might never be an
/// <c>event:</c> field, and EventSource has no way to anticipate this.
/// </para>
/// <para>
/// Specifying any field names other than <c>"event"</c> and <c>"id"</c> has no effect, since
/// the only fields defined in SSE are <c>event</c>, <c>id</c>, and <c>data</c> (not counting
/// <c>retry</c>, since that is a directive to the client rather than part of a message).
/// </para>
/// </remarks>
/// <param name="fieldNames">a list of SSE field names (case-sensitive)</param>
/// <returns>the builder</returns>
public ConfigurationBuilder ExpectFields(params string[] fieldNames)
{
_expectFields = fieldNames is null ? null : new HashSet<string>(fieldNames);
return this;
}

/// <summary>
/// Sets the initial amount of time to wait before attempting to reconnect to the EventSource API.
/// </summary>
Expand Down Expand Up @@ -237,6 +285,60 @@ public ConfigurationBuilder RetryDelayResetThreshold(TimeSpan retryDelayResetThr
return this;
}

/// <summary>
/// Specifies whether EventSource should return a <see cref="MessageEvent"/> to the
/// handler as soon as it receives the beginning of the event data, allowing the caller
/// to read the data incrementally.
/// </summary>
/// <remarks>
/// <para>
/// The default for this property is <see langword="false"/>, meaning that EventSource
/// will always read the entire event into memory before returning it.
/// </para>
/// <para>
/// If you set it to <see langword="true"/>, it will instead return the event as soon as
/// it sees a <c>data</c> field-- setting <see cref="MessageEvent.DataStream"/> to a
/// <see cref="System.IO.Stream"/> that reads directly from the data as it arrives. The
/// EventSource will perform any necessary parsing under the covers, so that for instance
/// if there are multiple <c>data:</c> lines in the event, the Stream will emit a newline
/// character between each and will omit the <c>data:</c> field names. The Stream will
/// report "end of stream" as soon as the event is terminated normally by a blank line.
/// If the stream is closed before normal termination of the event, reading the Stream
/// will throw a <see cref="StreamClosedWithIncompleteMessageException"/>.
/// </para>
/// <para>
/// This mode is designed for applications that expect very large data items to be
/// delivered over SSE. Use it with caution, since there are several limitations:
/// </para>
/// <list type="bullet">
/// <item><description>
/// The <see cref="MessageEvent"/> is constructed as soon as a <c>data:</c> field appears,
/// so it will only include fields that appeared before that point. In other words, if
/// the SSE server happens to send <c>data:</c> first and <c>event:</c> second,
/// <see cref="MessageEvent.Name"/> will not contain the value of the <c>event:</c> field
/// but will be <see cref="MessageEvent.DefaultName"/> instead. Therefore, you should only
/// use this mode if the server's behavior is predictable in this regard.
/// </description></item>
/// <item><description>
/// The SSE protocol specifies that an event should be processed only if it is terminated
/// by a blank line, but in this mode the handler will receive the event as soon as a
/// <c>data:</c> field appears-- so, if the stream happens to cut off abnormally without a
/// trailing blank line, technically you will be receiving an incomplete event that should
/// have been ignored. You will know this has happened if reading from the Stream throws
/// a <see cref="StreamClosedWithIncompleteMessageException"/>.
/// </description></item>
/// </list>
/// </remarks>
/// <param name="streamEventData">true if events should be dispatched immediately with
/// asynchronous data rather than read fully first</param>
/// <returns>the builder</returns>
/// <see cref="MessageEvent.DataStream"/>
public ConfigurationBuilder StreamEventData(bool streamEventData)
{
_streamEventData = streamEventData;
return this;
}

#endregion

#region Private methods
Expand Down
2 changes: 2 additions & 0 deletions src/LaunchDarkly.EventSource/EventSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ private async Task<FaultEvent> TryStartAsync(bool canReturnFaultEvent)
ReadBufferSize,
connectResult.ReadTimeout ?? Timeout.InfiniteTimeSpan,
_origin,
_configuration.StreamEventData,
_configuration.ExpectFields,
newCancellationToken,
_logger
);
Expand Down
Loading