Skip to content

Rationale

billyrrr edited this page Aug 30, 2020 · 1 revision

We want to convert readable, maintainable, and easily testable Python code to be runnable at scale.

This means that we have 3 stages of declarations.

  1. Domain-Driven Models
  • Identifies the relationships toward other models
  • Defines custom business logic that may or may not be reducible to a database primitive
  1. Dynamic Tables
  • Defines database primitive and UDFs
  1. Raw data
  • Defines a row of data
  • Agnostic to operations

How this conversion will be implemented:

  • Accept user definition based on PeeWee grammars
  • Export View definitions in SQL using PeeWee
  • Create tables using Flink Table API in Flink Table environment

Example:

User defines a domain model and a view model.

from flask_boiler import domain_model, attrs
# from . import Location, User, Ticket
class MeetingBase(domain_model.DomainModel):
    class Meta:
        collection_name = "meetings"


class Meeting(MeetingBase):

    location = attrs.relation(nested=False, dm_cls='Location')
    users = attrs.relation(nested=False, dm_cls='User',
                           collection=list)
    tickets = attrs.relation(nested=False, dm_cls='Ticket',
                             collection=dict)
    status = attrs.bproperty()


class TicketBase(domain_model.DomainModel):

    class Meta:
        collection_name = "tickets"


class Ticket(TicketBase):
    role = attrs.bproperty()
    user = attrs.relation(nested=False, dm_cls='User')
    attendance = attrs.bproperty()


class UserBase(domain_model.DomainModel):
    pass


class User(UserBase):

    class Meta:
        collection_name = "users"

    first_name = attrs.bproperty()
    last_name = attrs.bproperty()
    organization = attrs.bproperty()
    hearing_aid_requested = attrs.bproperty()
    display_name = attrs.bproperty(import_enabled=False)

    @display_name.getter
    def display_name(self):
        return "{} {}".format(self.first_name, self.last_name)

class LocationBase(domain_model.DomainModel):

    class Meta:
        collection_name = "locations"

    latitude = attrs.bproperty()
    longitude = attrs.bproperty()
    address = attrs.bproperty()


class Location(LocationBase):

    pass

class MeetingSessionStore(Store):
    tickets = reference(dm_cls=Ticket, many=True)
    users = reference(dm_cls=User, many=True)
    meeting = reference(dm_cls=Meeting)
    location = reference(dm_cls=Location)


class MeetingSessionMixin:

    class Meta:
        exclude = ('obj_type',)

    latitude = attrs.bproperty(import_enabled=False)
    longitude = attrs.bproperty(import_enabled=False)
    address = attrs.bproperty(import_enabled=False)
    attending = attrs.bproperty(import_enabled=False)
    in_session = attrs.bproperty(import_enabled=False)
    num_hearing_aid_requested = attrs.bproperty(import_enabled=False)
    @property
    def meeting_id(self):
        return self.store.meeting.doc_id

    @property
    def _view_refs(self):
        for user_id, user in self.store.users.items():
            doc_ref = user.doc_ref / self.__class__.__name__ / self.store.meeting.doc_id
            yield doc_ref

    @in_session.getter
    def in_session(self):
        return self.store.meeting.status == "in-session"

    @in_session.setter
    def in_session(self, in_session):
        cur_status = self.store.meeting.status
        if cur_status == "in-session" and not in_session:
            self.store.meeting.status = "closed"
        elif cur_status == "closed" and in_session:
            self.store.meeting.status = "in-session"
        else:
            raise ValueError

    @latitude.getter
    def latitude(self):
        return self.store.location.latitude

    @longitude.getter
    def longitude(self):
        return self.store.location.longitude

    @address.getter
    def address(self):
        return self.store.location.address

    @attending.getter
    def attending(self):
        user_ids = [uid for uid in self.store.users.keys()]

        if self.store.meeting.status == "not-started":
            return list()

        res = list()
        for user_id in sorted(user_ids):
            ticket = self.store.tickets[user_id]
            user = self.store.users[user_id]
            if ticket.attendance:
                d = {
                    "name": user.display_name,
                    "organization": user.organization,
                    "hearing_aid_requested": user.hearing_aid_requested
                }
                res.append(d)

        return res

    @num_hearing_aid_requested.getter
    def num_hearing_aid_requested(self):
        count = 0
        for d in self.attending:
            if d["hearing_aid_requested"]:
                count += 1
        return count

    # @classmethod
    # def get_many_from_query(cls, query_d=None, once=False):
    #     """ Note that once kwarg apply to the snapshot but not the query.
    #
    #     :param query_d:
    #     :param once: attaches a listener to individual snapshots
    #     :return:
    #     """
    #     return [
    #         cls.get_from_meeting_id(meeting_id=obj.doc_id, once=once)
    #         for obj in Meeting.where(**query_d)]

    @classmethod
    def get(cls, doc_id=None, once=True, meeting=None, **kwargs):

        if meeting is None:
            m: Meeting = Meeting.get(doc_id=doc_id)
        else:
            m = meeting

        struct = dict()

        struct["meeting"] = (Meeting, m.doc_id)

        struct['users'] = dict()
        for user_ref in m.users:
            obj_type = User
            user_id = user_ref.id
            struct["users"][user_id] = (obj_type, user_ref.id)

        struct['tickets'] = dict()
        for user_id, ticket_ref in m.tickets.items():
            obj_type = Ticket
            struct["tickets"][user_id] = (obj_type, ticket_ref.id)

        struct["location"] = (Location, m.location.id)

        store = MeetingSessionStore.from_struct(struct)

        obj = super().get(store=store, once=once,
                          **kwargs)  # TODO: fix super() behavior
        # time.sleep(2)  # TODO: delete after implementing sync

        return obj

    @classmethod
    def get_many_from_query(cls, query_d=None, once=False):
        """ Note that once kwarg apply to the snapshot but not the query.

        :param query_d:
        :param once: attaches a listener to individual snapshots
        :return:
        """
        return [
            cls.get(doc_id=obj.doc_id, once=once)
            for obj in Meeting.where(**query_d)]

    def propagate_change(self):
        self.store.propagate_back()

class MeetingSessionC(MeetingSessionMixin, view_model.ViewModelR):

    class Meta:
        collection_name = 'meeting_sessions'

We want to create these elements in Flink:

(This is a draft and contains more error than information; will correct soon)

settings = EnvironmentSettings.new_instance()...
table_env = StreamTableEnvironment.create(env, settings)

# SQL query with a registered table
# register a table named "Meeting"
table_env.execute_sql("CREATE TABLE Meeting (meetingId STRING, status STRING) WITH (...)");
table_env.execute_sql("CREATE TABLE Ticket (meetingId STRING, locationId STRING, userId STRING, status STRING) WITH (...)");
table_env.execute_sql("CREATE TABLE User (userId STRING, firstName STRING, lastName STRING, organization STRING, hearingAidRequested BOOL, displayName STRING)")
table_env.execute_sql("CREATE TABLE Location (locationId STRING, latitude FLOAT, longitude FLOAT, address: STRING) WITH (...)")
# run a SQL query on the Table and retrieve the result as a new Table

MeetingSessionStore_User = table_env.sql_query(
"""
    SELECT Ticket.meetingId as meetingId, User.hearingAidRequested as hearingAidRequested
    FROM Ticket LEFT JOIN User ON Ticket.userId = User.userId

""");


MeetingSession_numHearingAidRequested = table_env.sql_query(
"""
SELECT meetingId, numHearingRequested.count 
FROM MeetingSession_hearingAid
""");


# Execute an INSERT SQL with a registered table
# register a TableSink
table_env.execute_sql("CREATE TABLE MeetingSession(...) WITH (...)")
# run an INSERT SQL on the Table and emit the result to the TableSink
table_env \
    .execute_sql("INSERT INTO MeetingSession ... ")
Clone this wiki locally