Skip to content

Commit

Permalink
Implement redis broadcasting for BroadcastWebsocketManager (#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mark90 authored Mar 27, 2024
1 parent 69439c4 commit b284960
Show file tree
Hide file tree
Showing 43 changed files with 99 additions and 1,188 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 2.1.2rc3
current_version = 2.1.2rc4
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ Websockets can also be turned off with:
export ENABLE_WEBSOCKETS=False
```

More broadcaster info [here](https://pypi.org/project/broadcaster/)

If you want to use pickle for CACHE serialization you will need to set the `CACHE_HMAC_SECRET`:
```shell
export CACHE_HMAC_SECRET="SOMESECRET"
Expand Down
7 changes: 1 addition & 6 deletions orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@

"""This is the orchestrator workflow engine."""

__version__ = "2.1.2rc3"

import pathlib
import site

site.addsitedir(str(pathlib.Path(__file__).resolve().parent / "vendor/broadcaster"))
__version__ = "2.1.2rc4"

from orchestrator.app import OrchestratorCore
from orchestrator.settings import app_settings, oauth2_settings
Expand Down
56 changes: 55 additions & 1 deletion orchestrator/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from os import getenv
from typing import Any
from uuid import UUID

from redis import Redis
from redis.asyncio import Redis as AIORedis
from redis.asyncio.client import Pipeline, PubSub
from structlog import get_logger

from orchestrator.services.subscriptions import _generate_etag
Expand Down Expand Up @@ -87,3 +89,55 @@ async def fetch(_cursor: int = 0) -> tuple[int, list[bytes]]:

logger.debug("Deleted keys matching pattern", pattern=pattern, deleted=deleted)
return deleted


class RedisBroadcast:
"""Small wrapper around redis.asyncio.Redis used by websocket broadcasting.
Note:
redis.asyncio.Redis.from_url() returns a client which maintains a ConnectionPool.
This instance is thread-safe and does not create connections until needed.
However, you cannot instantiate this in one asyncio event loop and use it in
another event loop, as the created connections in the pool will then only
be usable by the loop they were created in.
"""

client: AIORedis

def __init__(self, redis_url: str):
self.client = AIORedis.from_url(redis_url)
self.redis_url = redis_url

@asynccontextmanager
async def pipeline(self) -> AsyncGenerator[Pipeline, None]:
"""Context to prepare a pipeline object for issueing multiple commands, such as .publish().
Automatically executes the pipeline afterwards (unless there was an exception).
"""
async with self.client.pipeline() as pipe:
yield pipe
await pipe.execute()

@asynccontextmanager
async def subscriber(self, *channels: str) -> AsyncGenerator[PubSub, None]:
"""Context to subscribe to one or more channels.
Automatically unsubscribes and releases the connection afterwards.
"""
pubsub = self.client.pubsub(ignore_subscribe_messages=True)
try:
await pubsub.subscribe(*channels)
yield pubsub
finally:
await pubsub.unsubscribe(*channels)
await pubsub.aclose() # type: ignore[attr-defined]

async def connect(self) -> None:
# Execute a simple command to ensure we can establish a connection
result = await self.client.ping()
logger.debug("RedisBroadcast can connect to redis", ping_result=result)

async def disconnect(self) -> None:
logger.debug("Closing redis client")
await self.client.aclose() # type: ignore[attr-defined]
logger.debug("Closed redis client")
1 change: 0 additions & 1 deletion orchestrator/vendor/README.md

This file was deleted.

Empty file removed orchestrator/vendor/__init__.py
Empty file.
1 change: 0 additions & 1 deletion orchestrator/vendor/broadcaster/.github/FUNDING.yml

This file was deleted.

29 changes: 0 additions & 29 deletions orchestrator/vendor/broadcaster/.github/workflows/publish.yml

This file was deleted.

69 changes: 0 additions & 69 deletions orchestrator/vendor/broadcaster/.github/workflows/test-suite.yml

This file was deleted.

9 changes: 0 additions & 9 deletions orchestrator/vendor/broadcaster/.gitignore

This file was deleted.

27 changes: 0 additions & 27 deletions orchestrator/vendor/broadcaster/LICENSE.md

This file was deleted.

118 changes: 0 additions & 118 deletions orchestrator/vendor/broadcaster/README.md

This file was deleted.

Empty file.
4 changes: 0 additions & 4 deletions orchestrator/vendor/broadcaster/broadcaster/__init__.py

This file was deleted.

Empty file.
26 changes: 0 additions & 26 deletions orchestrator/vendor/broadcaster/broadcaster/_backends/base.py

This file was deleted.

Loading

0 comments on commit b284960

Please sign in to comment.