Skip to content
This repository has been archived by the owner on Sep 22, 2023. It is now read-only.

Commit

Permalink
Merge pull request #60 from Indicio-tech/feature/notifications
Browse files Browse the repository at this point in the history
Use event bus to send notifications to Admin connections
  • Loading branch information
dbluhm authored Jun 2, 2021
2 parents a07fcbf + e35bb50 commit a0a9eb7
Show file tree
Hide file tree
Showing 70 changed files with 3,046 additions and 745 deletions.
30 changes: 28 additions & 2 deletions acapy_plugin_toolbox/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
"""Shortcut to group all."""
"""Shortcut to group all and rexports."""

from .group.all import setup
import os
import logging

from aries_cloudagent.protocols.problem_report.v1_0.message import (
ProblemReport
)
from aries_cloudagent.config.injection_context import InjectionContext

from . import (
basicmessage, connections, credential_definitions, dids, invitations,
issuer, mediator, routing, schemas, static_connections, taa
)
from .holder import v0_1 as holder

MODULES = [
basicmessage, connections, credential_definitions, dids, invitations,
issuer, mediator, routing, schemas, static_connections, taa, holder
]


async def setup(context: InjectionContext):
"""Load Toolbox Plugin."""
log_level = os.environ.get("ACAPY_TOOLBOX_LOG_LEVEL", logging.WARNING)
logging.getLogger("acapy_plugin_toolbox").setLevel(log_level)
print("Setting logging level of acapy_plugin_toolbox to", log_level)
for mod in MODULES:
await mod.setup(context)
187 changes: 55 additions & 132 deletions acapy_plugin_toolbox/basicmessage.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""BasicMessage Plugin."""
# pylint: disable=invalid-name, too-few-public-methods

import json
from datetime import datetime
from typing import Union
import re

from aries_cloudagent.connections.models.conn_record import ConnRecord
from aries_cloudagent.core.profile import ProfileSession
from aries_cloudagent.core.profile import ProfileSession, Profile
from aries_cloudagent.config.injection_context import InjectionContext
from aries_cloudagent.core.protocol_registry import ProtocolRegistry
from aries_cloudagent.protocols.connections.v1_0.manager import ConnectionManager
from aries_cloudagent.core.event_bus import Event, EventBus
from aries_cloudagent.messaging.base_handler import (
BaseHandler, BaseResponder, RequestContext
)
Expand All @@ -18,23 +19,17 @@
BaseRecord, BaseRecordSchema
)
from aries_cloudagent.messaging.valid import INDY_ISO8601_DATETIME
from aries_cloudagent.protocols.connections.v1_0.manager import (
ConnectionManager
)
from aries_cloudagent.protocols.basicmessage.v1_0.messages.basicmessage import BasicMessage
from aries_cloudagent.protocols.problem_report.v1_0.message import (
ProblemReport
)
from aries_cloudagent.storage.base import BaseStorage
from aries_cloudagent.storage.error import StorageNotFoundError
from marshmallow import fields

from .util import (
admin_only, datetime_from_iso, generate_model_schema, timestamp_utc_iso
admin_only, datetime_from_iso, generate_model_schema, send_to_admins
)

PROTOCOL_URI = "did:sov:BzCbsNYhMrjHiqZDTUASHg;spec/basicmessage/1.0"
BASIC_MESSAGE = f"{PROTOCOL_URI}/message"

ADMIN_PROTOCOL_URI = "https://github.com/hyperledger/" \
"aries-toolbox/tree/master/docs/admin-basicmessage/0.1"
GET = f"{ADMIN_PROTOCOL_URI}/get"
Expand All @@ -43,24 +38,54 @@
NEW = f"{ADMIN_PROTOCOL_URI}/new"

MESSAGE_TYPES = {
BASIC_MESSAGE: 'acapy_plugin_toolbox.basicmessage.BasicMessage',
GET: 'acapy_plugin_toolbox.basicmessage.Get',
SEND: 'acapy_plugin_toolbox.basicmessage.Send',
DELETE: 'acapy_plugin_toolbox.basicmessage.Delete'
}

BASIC_MESSAGE_EVENT_PATTERN = re.compile("^acapy::basicmessage::received$")


async def setup(
session: ProfileSession,
context: InjectionContext,
protocol_registry: ProblemReport = None
):
"""Setup the basicmessage plugin."""
if not protocol_registry:
protocol_registry = session.inject(ProtocolRegistry)
protocol_registry = context.inject(ProtocolRegistry)
protocol_registry.register_message_types(
MESSAGE_TYPES
)

event_bus = context.inject(EventBus)
event_bus.subscribe(BASIC_MESSAGE_EVENT_PATTERN, basic_message_event_handler)


async def basic_message_event_handler(profile: Profile, event: Event):
"""
Handle basic message events.
Send a notification to admins when messages are received.
"""

msg: BasicMessageRecord = BasicMessageRecord.deserialize(event.payload)
msg.state = BasicMessageRecord.STATE_RECV

notification = New(
connection_id=event.payload["connection_id"],
message=msg
)

responder = profile.inject(BaseResponder)
async with profile.session() as session:
await msg.save(session, reason="New message")
await send_to_admins(
session,
notification,
responder,
to_session_only=True
)


class BasicMessageRecord(BaseRecord):
"""BasicMessage Record."""
Expand Down Expand Up @@ -152,53 +177,6 @@ class Meta:
content = fields.Str(required=False)


def basic_message_init(
self,
*,
sent_time: Union[str, datetime] = None,
content: str = None,
localization: str = None,
**kwargs,
):
"""
Initialize basic message object.
Args:
sent_time: Time message was sent
content: message content
localization: localization
"""
# pylint: disable=protected-access
super(BasicMessage, self).__init__(**kwargs)
if not sent_time:
sent_time = timestamp_utc_iso()
if localization:
self._decorators["l10n"] = localization
self.sent_time = sent_time
self.content = content


BasicMessage, BasicMessageSchema = generate_model_schema(
name='BasicMessage',
handler='acapy_plugin_toolbox.basicmessage.BasicMessageHandler',
msg_type=BASIC_MESSAGE,
schema={
'sent_time': fields.Str(
required=False,
description="Time message was sent, ISO8601",
**INDY_ISO8601_DATETIME,
),
'content': fields.Str(
required=True,
description="Message content",
example="Hello",
)
},
init=basic_message_init
)


New, NewSchema = generate_model_schema(
name='New',
handler='acapy_plugin_toolbox.util.PassHandler',
Expand All @@ -214,74 +192,6 @@ def basic_message_init(
)


class BasicMessageHandler(BaseHandler):
"""Handler for received Basic Messages."""
# pylint: disable=protected-access

async def handle(self, context: RequestContext, responder: BaseResponder):
"""Handle received basic message."""
session = await context.session()
msg = BasicMessageRecord(
connection_id=context.connection_record.connection_id,
message_id=context.message._id,
sent_time=context.message.sent_time,
content=context.message.content,
state=BasicMessageRecord.STATE_RECV
)
await msg.save(session, reason='New message received.')

await responder.send_webhook(
"basicmessages",
{
"connection_id": context.connection_record.connection_id,
"message_id": context.message._id,
"content": context.message.content,
"state": "received",
},
)

session = await context.session()
connection_mgr = ConnectionManager(session)
storage = session.inject(BaseStorage)
admin_ids = map(
lambda record: record.tags['connection_id'],
filter(
lambda record: json.loads(record.value) == 'admin',
await storage.find_all_records(
ConnRecord.RECORD_TYPE_METADATA, {'key': 'group'}
)
)
)
admins = [
await ConnRecord.retrieve_by_id(session, id)
for id in admin_ids
]

if not admins:
return

admins = filter(lambda admin: admin.state == 'active', admins)
admin_verkeys = [
target.recipient_keys[0]
for admin in admins
for target in await connection_mgr.get_connection_targets(
connection=admin
)
]

notification = New(
connection_id=context.connection_record.connection_id,
message=msg
)

for verkey in admin_verkeys:
await responder.send(
notification,
reply_to_verkey=verkey,
to_session_only=True
)


Get, GetSchema = generate_model_schema(
name='Get',
handler='acapy_plugin_toolbox.basicmessage.GetHandler',
Expand Down Expand Up @@ -399,7 +309,7 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
)
except StorageNotFoundError:
report = ProblemReport(
explain_ltxt='Connection not found.',
description={"en":'Connection not found.'},
who_retries='none'
)
report.assign_thread_from(context.message)
Expand All @@ -411,7 +321,20 @@ async def handle(self, context: RequestContext, responder: BaseResponder):
localization=LocalizationDecorator(locale='en')
)

await responder.send(msg, connection_id=connection.connection_id)
connection_mgr = ConnectionManager(session)
targets = [
target
for target in await connection_mgr.get_connection_targets(
connection=connection
)
]

for target in targets:
await responder.send(
msg,
reply_to_verkey=target.recipient_keys[0],
reply_from_verkey=target.sender_key
)

record = BasicMessageRecord(
connection_id=context.message.connection_id,
Expand Down
Loading

0 comments on commit a0a9eb7

Please sign in to comment.