Skip to content

Commit

Permalink
#61 SSE streaming manager
Browse files Browse the repository at this point in the history
  • Loading branch information
bne committed Jan 17, 2024
1 parent 96524b9 commit 59a1505
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 150 deletions.
56 changes: 50 additions & 6 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import typing
from json import JSONDecodeError
import json
from datetime import datetime
import pytz

import requests
from flag_engine import engine
Expand All @@ -15,11 +17,13 @@
from flagsmith.models import DefaultFlag, Flags, Segment
from flagsmith.offline_handlers import BaseOfflineHandler
from flagsmith.polling_manager import EnvironmentDataPollingManager
from flagsmith.streaming_manager import EventStreamManager
from flagsmith.utils.identities import generate_identities_data

logger = logging.getLogger(__name__)

DEFAULT_API_URL = "https://edge.api.flagsmith.com/api/v1/"
DEFAULT_REALTIME_API_URL = "https://realtime.flagsmith.com/"


class Flagsmith:
Expand All @@ -41,6 +45,7 @@ def __init__(
self,
environment_key: str = None,
api_url: str = None,
realtime_api_url: str = None,
custom_headers: typing.Dict[str, typing.Any] = None,
request_timeout_seconds: int = None,
enable_local_evaluation: bool = False,
Expand All @@ -51,11 +56,13 @@ def __init__(
proxies: typing.Dict[str, str] = None,
offline_mode: bool = False,
offline_handler: BaseOfflineHandler = None,
use_stream: bool = False,
):
"""
:param environment_key: The environment key obtained from Flagsmith interface.
Required unless offline_mode is True.
:param api_url: Override the URL of the Flagsmith API to communicate with
:param realtime_api_url: Override the URL of the Flagsmith real-time API
:param custom_headers: Additional headers to add to requests made to the
Flagsmith API
:param request_timeout_seconds: Number of seconds to wait for a request to
Expand All @@ -76,12 +83,14 @@ def __init__(
:param offline_handler: provide a handler for offline logic. Used to get environment
document from another source when in offline_mode. Works in place of
default_flag_handler if offline_mode is not set and using remote evaluation.
:param use_stream: Use real-time functionality via SSE as opposed to polling the API
"""

self.offline_mode = offline_mode
self.enable_local_evaluation = enable_local_evaluation
self.offline_handler = offline_handler
self.default_flag_handler = default_flag_handler
self.use_stream = use_stream
self._analytics_processor = None
self._environment = None

Expand Down Expand Up @@ -110,6 +119,13 @@ def __init__(
api_url = api_url or DEFAULT_API_URL
self.api_url = api_url if api_url.endswith("/") else f"{api_url}/"

realtime_api_url = realtime_api_url or DEFAULT_REALTIME_API_URL
self.realtime_api_url = (
realtime_api_url
if realtime_api_url.endswith("/")
else f"{realtime_api_url}/"
)

self.request_timeout_seconds = request_timeout_seconds
self.session.mount(self.api_url, HTTPAdapter(max_retries=retries))

Expand All @@ -124,20 +140,45 @@ def __init__(
"in the environment settings page."
)

self.environment_data_polling_manager_thread = (
EnvironmentDataPollingManager(
if self.use_stream:
self.update_environment()
stream_url = f"{self.realtime_api_url}sse/environments/{self._environment.api_key}/stream"

self.event_stream_thread = EventStreamManager(
stream_url=stream_url,
on_event=self.handle_stream_event,
daemon=True, # noqa
)

self.event_stream_thread.start()

else:
self.environment_data_polling_manager_thread = EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=environment_refresh_interval_seconds,
daemon=True, # noqa
)
)
self.environment_data_polling_manager_thread.start()
self.environment_data_polling_manager_thread.start()

if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)

def handle_stream_event(self, event):
event_data = json.loads(event.data)
stream_updated_at = datetime.fromtimestamp(event_data.get("updated_at"))

if stream_updated_at.tzinfo is None:
stream_updated_at = pytz.utc.localize(stream_updated_at)

environment_updated_at = self._environment.updated_at
if environment_updated_at.tzinfo is None:
environment_updated_at = pytz.utc.localize(environment_updated_at)

if stream_updated_at > environment_updated_at:
self.update_environment()

def get_environment_flags(self) -> Flags:
"""
Get all the default for flags for the current environment.
Expand Down Expand Up @@ -267,7 +308,7 @@ def _get_json_response(self, url: str, method: str, body: dict = None):
response.status_code,
)
return response.json()
except (requests.ConnectionError, JSONDecodeError) as e:
except (requests.ConnectionError, json.JSONDecodeError) as e:
raise FlagsmithAPIError(
"Unable to get valid response from Flagsmith API."
) from e
Expand All @@ -291,3 +332,6 @@ def _build_identity_model(self, identifier: str, **traits):
def __del__(self):
if hasattr(self, "environment_data_polling_manager_thread"):
self.environment_data_polling_manager_thread.stop()

if hasattr(self, "event_stream_thread"):
self.event_stream_thread.stop()
44 changes: 44 additions & 0 deletions flagsmith/streaming_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import threading
import requests
import logging
import sseclient

from flagsmith.exceptions import FlagsmithAPIError

logger = logging.getLogger(__name__)


class EventStreamManager(threading.Thread):
def __init__(
self, *args, stream_url, on_event, request_timeout_seconds=None, **kwargs
):
super().__init__(*args, **kwargs)
self._stop_event = threading.Event()
self.stream_url = stream_url
self.on_event = on_event
self.request_timeout_seconds = request_timeout_seconds

def run(self) -> None:
while not self._stop_event.is_set():
try:
with requests.get(
self.stream_url,
stream=True,
headers={"Accept": "application/json, text/event-stream"},
timeout=self.request_timeout_seconds,
) as response:
sse_client = sseclient.SSEClient(response)
for event in sse_client.events():
self.on_event(event)

except requests.exceptions.ReadTimeout:
pass

except (FlagsmithAPIError, requests.RequestException):
logger.exception("Error handling event stream")

def stop(self) -> None:
self._stop_event.set()

def __del__(self):
self._stop_event.set()
Loading

0 comments on commit 59a1505

Please sign in to comment.