-
Notifications
You must be signed in to change notification settings - Fork 1
Rationale
billyrrr edited this page Aug 30, 2020
·
1 revision
We want to convert readable, maintainable, and easily testable Python code to be runnable at scale.
This means that we have 3 stages of declarations.
- Domain-Driven Models
- Identifies the relationships toward other models
- Defines custom business logic that may or may not be reducible to a database primitive
- Dynamic Tables
- Defines database primitive and UDFs
- Raw data
- Defines a row of data
- Agnostic to operations
How this conversion will be implemented:
- Accept user definition based on PeeWee grammars
- Export View definitions in SQL using PeeWee
- Create tables using Flink Table API in Flink Table environment
Example:
User defines a domain model and a view model.
from flask_boiler import domain_model, attrs
# from . import Location, User, Ticket
class MeetingBase(domain_model.DomainModel):
class Meta:
collection_name = "meetings"
class Meeting(MeetingBase):
location = attrs.relation(nested=False, dm_cls='Location')
users = attrs.relation(nested=False, dm_cls='User',
collection=list)
tickets = attrs.relation(nested=False, dm_cls='Ticket',
collection=dict)
status = attrs.bproperty()
class TicketBase(domain_model.DomainModel):
class Meta:
collection_name = "tickets"
class Ticket(TicketBase):
role = attrs.bproperty()
user = attrs.relation(nested=False, dm_cls='User')
attendance = attrs.bproperty()
class UserBase(domain_model.DomainModel):
pass
class User(UserBase):
class Meta:
collection_name = "users"
first_name = attrs.bproperty()
last_name = attrs.bproperty()
organization = attrs.bproperty()
hearing_aid_requested = attrs.bproperty()
display_name = attrs.bproperty(import_enabled=False)
@display_name.getter
def display_name(self):
return "{} {}".format(self.first_name, self.last_name)
class LocationBase(domain_model.DomainModel):
class Meta:
collection_name = "locations"
latitude = attrs.bproperty()
longitude = attrs.bproperty()
address = attrs.bproperty()
class Location(LocationBase):
pass
class MeetingSessionStore(Store):
tickets = reference(dm_cls=Ticket, many=True)
users = reference(dm_cls=User, many=True)
meeting = reference(dm_cls=Meeting)
location = reference(dm_cls=Location)
class MeetingSessionMixin:
class Meta:
exclude = ('obj_type',)
latitude = attrs.bproperty(import_enabled=False)
longitude = attrs.bproperty(import_enabled=False)
address = attrs.bproperty(import_enabled=False)
attending = attrs.bproperty(import_enabled=False)
in_session = attrs.bproperty(import_enabled=False)
num_hearing_aid_requested = attrs.bproperty(import_enabled=False)
@property
def meeting_id(self):
return self.store.meeting.doc_id
@property
def _view_refs(self):
for user_id, user in self.store.users.items():
doc_ref = user.doc_ref / self.__class__.__name__ / self.store.meeting.doc_id
yield doc_ref
@in_session.getter
def in_session(self):
return self.store.meeting.status == "in-session"
@in_session.setter
def in_session(self, in_session):
cur_status = self.store.meeting.status
if cur_status == "in-session" and not in_session:
self.store.meeting.status = "closed"
elif cur_status == "closed" and in_session:
self.store.meeting.status = "in-session"
else:
raise ValueError
@latitude.getter
def latitude(self):
return self.store.location.latitude
@longitude.getter
def longitude(self):
return self.store.location.longitude
@address.getter
def address(self):
return self.store.location.address
@attending.getter
def attending(self):
user_ids = [uid for uid in self.store.users.keys()]
if self.store.meeting.status == "not-started":
return list()
res = list()
for user_id in sorted(user_ids):
ticket = self.store.tickets[user_id]
user = self.store.users[user_id]
if ticket.attendance:
d = {
"name": user.display_name,
"organization": user.organization,
"hearing_aid_requested": user.hearing_aid_requested
}
res.append(d)
return res
@num_hearing_aid_requested.getter
def num_hearing_aid_requested(self):
count = 0
for d in self.attending:
if d["hearing_aid_requested"]:
count += 1
return count
# @classmethod
# def get_many_from_query(cls, query_d=None, once=False):
# """ Note that once kwarg apply to the snapshot but not the query.
#
# :param query_d:
# :param once: attaches a listener to individual snapshots
# :return:
# """
# return [
# cls.get_from_meeting_id(meeting_id=obj.doc_id, once=once)
# for obj in Meeting.where(**query_d)]
@classmethod
def get(cls, doc_id=None, once=True, meeting=None, **kwargs):
if meeting is None:
m: Meeting = Meeting.get(doc_id=doc_id)
else:
m = meeting
struct = dict()
struct["meeting"] = (Meeting, m.doc_id)
struct['users'] = dict()
for user_ref in m.users:
obj_type = User
user_id = user_ref.id
struct["users"][user_id] = (obj_type, user_ref.id)
struct['tickets'] = dict()
for user_id, ticket_ref in m.tickets.items():
obj_type = Ticket
struct["tickets"][user_id] = (obj_type, ticket_ref.id)
struct["location"] = (Location, m.location.id)
store = MeetingSessionStore.from_struct(struct)
obj = super().get(store=store, once=once,
**kwargs) # TODO: fix super() behavior
# time.sleep(2) # TODO: delete after implementing sync
return obj
@classmethod
def get_many_from_query(cls, query_d=None, once=False):
""" Note that once kwarg apply to the snapshot but not the query.
:param query_d:
:param once: attaches a listener to individual snapshots
:return:
"""
return [
cls.get(doc_id=obj.doc_id, once=once)
for obj in Meeting.where(**query_d)]
def propagate_change(self):
self.store.propagate_back()
class MeetingSessionC(MeetingSessionMixin, view_model.ViewModelR):
class Meta:
collection_name = 'meeting_sessions'
We want to create these elements in Flink:
(This is a draft and contains more error than information; will correct soon)
settings = EnvironmentSettings.new_instance()...
table_env = StreamTableEnvironment.create(env, settings)
# SQL query with a registered table
# register a table named "Meeting"
table_env.execute_sql("CREATE TABLE Meeting (meetingId STRING, status STRING) WITH (...)");
table_env.execute_sql("CREATE TABLE Ticket (meetingId STRING, locationId STRING, userId STRING, status STRING) WITH (...)");
table_env.execute_sql("CREATE TABLE User (userId STRING, firstName STRING, lastName STRING, organization STRING, hearingAidRequested BOOL, displayName STRING)")
table_env.execute_sql("CREATE TABLE Location (locationId STRING, latitude FLOAT, longitude FLOAT, address: STRING) WITH (...)")
# run a SQL query on the Table and retrieve the result as a new Table
MeetingSessionStore_User = table_env.sql_query(
"""
SELECT Ticket.meetingId as meetingId, User.hearingAidRequested as hearingAidRequested
FROM Ticket LEFT JOIN User ON Ticket.userId = User.userId
""");
MeetingSession_numHearingAidRequested = table_env.sql_query(
"""
SELECT meetingId, numHearingRequested.count
FROM MeetingSession_hearingAid
""");
# Execute an INSERT SQL with a registered table
# register a TableSink
table_env.execute_sql("CREATE TABLE MeetingSession(...) WITH (...)")
# run an INSERT SQL on the Table and emit the result to the TableSink
table_env \
.execute_sql("INSERT INTO MeetingSession ... ")