Skip to content

Commit

Permalink
Implementation: Storing conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
duogenesis committed Feb 5, 2025
1 parent 9ae1a0f commit c630f32
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 68 deletions.
2 changes: 1 addition & 1 deletion chat.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ WORKDIR /app
COPY service/chat/container/init-db.sh /init-db.sh
COPY service/chat/container/init.sql /init.sql
COPY service/chat/container/jq /bin/jq
COPY service/chat/container/mongooseim.toml /mongooseim.template.toml
COPY service/chat/container/mongooseim.toml /usr/lib/mongooseim/etc/mongooseim.toml

# Proxy
COPY antiabuse/__init__.py /app/antiabuse/__init__.py
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ services:
- ./database/__init__.py:/app/database/__init__.py:ro
- ./database/asyncdatabase/__init__.py:/app/database/asyncdatabase/__init__.py:ro
- ./service/chat:/app/service/chat:ro
- ./service/chat/container/mongooseim.toml:/mongooseim.template.toml:ro
- ./service/chat/container/mongooseim.toml:/usr/lib/mongooseim/etc/mongooseim.toml:ro
- ./service/chat/container/init.sql:/init.sql:ro
- ./service/chat/container/init-db.sh:/init-db.sh:ro
- ./chat.main.sh:/app/chat.main.sh:ro
Expand Down
6 changes: 6 additions & 0 deletions service/chat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ async def process_duo_message(
},
)

store_message(
maybe_message_body,
from_username=from_username,
to_username=to_username,
msg_id=id)

set_messaged(from_id=from_id, to_id=to_id)

upsert_conversation(
Expand Down
3 changes: 0 additions & 3 deletions service/chat/container/init-db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,3 @@ PGPASSWORD=$DUO_DB_PASS psql \
-f /init.sql

touch /db-initialized

envsubst < /mongooseim.template.toml > /tmp/out.toml
mv /tmp/out.toml /usr/lib/mongooseim/etc/mongooseim.toml
58 changes: 0 additions & 58 deletions service/chat/container/mongooseim.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,6 @@
max_fsm_queue = 10000
rdbms_server_type = "pgsql"

[outgoing_pools.rdbms.default]
scope = "global"
workers = 10

[outgoing_pools.rdbms.default.connection]
driver = "pgsql"
keepalive_interval = 30

host = "$DUO_DB_HOST"
port = $DUO_DB_PORT
database = "duo_chat"
username = "$DUO_DB_USER"
password = "$DUO_DB_PASS"

[[listen.http]]
ip_address = "0.0.0.0"
port = 5442
Expand All @@ -40,20 +26,12 @@

[modules.mod_offline_stub]

[modules.mod_mam.pm]

[shaper.normal]
max_rate = 1000

[shaper.fast]
max_rate = 50_000

[shaper.mam_shaper]
max_rate = 1

[shaper.mam_global_shaper]
max_rate = 1000

[acl]
local = [{}]

Expand Down Expand Up @@ -100,39 +78,3 @@
register = [
{acl = "all", value = "allow"}
]

mam_set_prefs = [
{acl = "all", value = "default"}
]

mam_get_prefs = [
{acl = "all", value = "default"}
]

mam_lookup_messages = [
{acl = "all", value = "default"}
]

mam_set_prefs_shaper = [
{acl = "all", value = "mam_shaper"}
]

mam_get_prefs_shaper = [
{acl = "all", value = "mam_shaper"}
]

mam_lookup_messages_shaper = [
{acl = "all", value = "mam_shaper"}
]

mam_set_prefs_global_shaper = [
{acl = "all", value = "mam_global_shaper"}
]

mam_get_prefs_global_shaper = [
{acl = "all", value = "mam_global_shaper"}
]

mam_lookup_messages_global_shaper = [
{acl = "all", value = "mam_global_shaper"}
]
96 changes: 91 additions & 5 deletions service/chat/mam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

# TODO: When fetching conversations, be careful about JIDs and bare JIDs

# TODO: Do I need to deal with message read indicators?

Q_INSERT_SERVER_USER = """
INSERT INTO
mam_server_user (server, user_name)
Expand All @@ -32,7 +34,8 @@
"""


# TODO: How to handle `id` collisions if two messages arrived in the same millisecond?
# TODO: How to handle `id` collisions if two messages arrived in the same
# millisecond? Maybe add `ON CONFLICT` with a new random ID
Q_INSERT_MESSAGE = """
INSERT INTO
mam_message (
Expand Down Expand Up @@ -116,7 +119,16 @@ class Query:
before: str | None


def process_query(
@dataclass(frozen=True)
class Message:
message_body: str
timestamp: datetime.datetime
from_username: str
to_username: str
id: str


def _process_query(
parsed_xml: Optional[etree._Element],
from_username: str
) -> Optional[Query]:
Expand Down Expand Up @@ -146,7 +158,7 @@ def process_query(
)


def forwarded_element(
def _forwarded_element(
message: etree.Element,
query: Query,
row_id: int,
Expand Down Expand Up @@ -201,7 +213,7 @@ async def maybe_get_conversation(
if parsed_xml is None:
return []

query = process_query(parsed_xml, from_username=from_username)
query = _process_query(parsed_xml, from_username=from_username)

if not query:
return None
Expand All @@ -213,6 +225,44 @@ async def maybe_get_conversation(
)


def store_message(
message_body: str,
from_username: str,
to_username: str,
msg_id: str
):
message = Message(
message_body=message_body,
timestamp=datetime.datetime(),
from_username=from_username,
to_username=to_username,
id=msg_id,
)

_store_message_batcher.enqueue(message)


def _process_store_message_batch(batch: List[Message]):
params_seq = [
dict(
id=microseconds_to_mam_message_id(message.timestamp * 1_000_000), # TODO
to_username=message.to_username,
from_username=message.from_username,
message=message_to_xml(
message_body=message.message_body,
to_username=message.to_username,
from_username=message.from_username,
id=message.id,
),
search_body=normalize_search_text(message.message_body),
)
for message in batch
]

with database.chat_tx('read committed') as tx:
tx.executemany(Q_INSERT_MESSAGE, params_seq)


async def _get_conversation(
query: Query,
from_username: str,
Expand Down Expand Up @@ -244,7 +294,7 @@ async def _get_conversation(
except:
continue

forwarded_etree = forwarded_element(
forwarded_etree = _forwarded_element(
message=message_etree,
query=query,
row_id=row_id,
Expand Down Expand Up @@ -413,3 +463,39 @@ def normalize_search_text(text: str | None) -> str | None:
re2 = re.sub(r"\s+", ' ', re1, flags=re.UNICODE)

return re2


# TODO
def message_to_xml(
message_body: str,
to_username: str,
from_username: str,
id: str,
) -> str:
message_etree = build_element(
'message',
attrib={
'from': from_username, # TODO: Add @duolicious.app
'to': to_username,
'type': 'chat',
'id': id,
},
ns='jabber:client',
)

body = build_element('body', text=message_body)

message_etree.extend([body])

return messages.append(
etree.tostring(
message_etree, encoding='unicode', pretty_print=False))


_store_message_batcher = Batcher[Message](
process_fn=_process_store_message_batch,
flush_interval=1.0,
min_batch_size=1,
max_batch_size=1000,
retry=False,
)

0 comments on commit c630f32

Please sign in to comment.