Skip to content

Commit

Permalink
Actually add the NibbleRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth committed Dec 24, 2024
1 parent 22b025f commit 0cd92b2
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions octopoes/octopoes/repositories/nibble_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import datetime
from typing import Any

from octopoes.models.transaction import TransactionRecord
from octopoes.repositories.repository import Repository
from octopoes.xtdb.client import XTDBSession


class NibbleRepository(Repository):
def get(self, nibble: str, valid_time: datetime) -> dict[str, Any]:
raise NotImplementedError

def get_all(self, valid_time: datetime) -> list[dict[str, Any]]:
raise NotImplementedError

def put(self, ini: dict[str, Any], valid_time: datetime):
raise NotImplementedError

def put_many(self, inis: list[dict[str, Any]], valid_time: datetime):
raise NotImplementedError

def history(self, nibble: str, with_docs: bool = False) -> list[TransactionRecord]:
raise NotImplementedError


class XTDBNibbleRepository(NibbleRepository):
def __init__(self, session: XTDBSession):
self.session = session

@classmethod
def _xtid(cls, nibble: str) -> str:
return f"NibbleIni|{nibble}"

@classmethod
def _inify(cls, ini: dict[str, Any]) -> dict[str, Any]:
ini["type"] = "NibbleIni"
ini["xt/id"] = cls._xtid(ini["id"])
return ini

@classmethod
def _deinify(cls, ini: dict[str, Any]) -> dict[str, Any]:
ini.pop("type", None)
ini.pop("xt/id", None)
return ini

def get(self, nibble: str, valid_time: datetime) -> dict[str, Any]:
return self._deinify(self.session.client.get_entity(self._xtid(nibble), valid_time))

def get_all(self, valid_time: datetime) -> list[dict[str, Any]]:
result = self.session.client.query(
'{:query {:find [(pull ?var [*])] :where [[?var :type "NibbleIni"]]}}', valid_time
)
return [self._deinify(item[0]) for item in result]

def put(self, ini: dict[str, Any], valid_time: datetime):
self.session.put(self._inify(ini), valid_time)
self.commit()

def put_many(self, inis: list[dict[str, Any]], valid_time: datetime):
for ini in inis:
self.session.put(self._inify(ini), valid_time)
self.commit()

def history(self, nibble: str, with_docs: bool = False) -> list[TransactionRecord]:
return self.session.client.get_entity_history(self._xtid(nibble), with_docs=with_docs)

def commit(self):
self.session.commit()

0 comments on commit 0cd92b2

Please sign in to comment.