Skip to content

Commit

Permalink
Merge pull request #339 from george0st/change
Browse files Browse the repository at this point in the history
TS602
  • Loading branch information
george0st authored May 18, 2024
2 parents 547a677 + d74eb99 commit 05ca332
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 64 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ The quality gate covers these test scenarios (✅ done, ✔ in-progress, ❌ pla
- ✅ TS502: Get data from on-line feature vector(s)
- **06 - Pipeline**
- ✅ TS601: Simple pipeline(s)
- ✔ TS602: Complex pipeline(s)
- ✅ TS602: Complex pipeline(s)
- ✔ TS602: Complex mass pipeline(s)
- **07 - Build model**
- ✅ TS701: Build CART model
- ❌ TS702: Build XGBoost model
Expand Down
82 changes: 32 additions & 50 deletions qgate_sln_mlrun/ts/ts06_pipeline/ts602.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,68 +26,50 @@ def long_desc(self):

def exec(self):
"""Simple pipeline during ingest"""
self._complex_pipeline(f"*/complex (event)")
self._class_complex(f"*/class_complex (event)")
self._complex(f"*/complex (event)")


@TSBase.handler_testcase
def _complex_pipeline(self, testcase_name):

# definition complex graph
#
# fn = mlrun.new_function("serving", kind="serving", image="mlrun/mlrun")
# graph = serving.set_topology("flow")
# graph.to(name="double", handler="mylib.double") \
# .to(name="add3", handler="mylib.add3") \
# .to(name="echo", handler="mylib.echo").respond()
#
# project.set_function(name="serving", func=fn, with_repo=True)
def _class_complex(self, testcase_name):

func = mlrun.code_to_function(f"ts602_fn",
kind="serving",
filename="./qgate_sln_mlrun/ts/ts06_pipeline/ts602_ext_code.py")
graph_echo = func.set_topology("flow")
graph_echo.to(class_name="TS602Pipeline", full_event=True, name="first") \
.to(handler="second", full_event=True, name="second") \
.to(handler="third", full_event=True, name="third").respond()
(graph_echo.to(class_name="TS602Pipeline", full_event=True, name="step1") \
.to(class_name="TS602Pipeline", full_event=True, name="step2") \
.to(class_name="TS602Pipeline", full_event=True, name="step3")
.to(class_name="TS602Pipeline", full_event=True, name="step4").respond())

# tests
echo_server = func.to_mock_server(current_function="*")
result = echo_server.test("", {"a": 5, "b": 7})
echo_server.wait_for_completion()

# # value check
# if result['calc']!=12:
# raise ValueError("Invalid calculation, expected value 12")

# transaction ingest from parquet to the featureset


## Define and add value mapping
# transaction_set = fs.FeatureSet("transactions",
# entities=[fs.Entity("source")],
# timestamp_key='timestamp',
# description="transactions feature set")
# main_categories = ["es_transportation", "es_health", "es_otherservices",
# "es_food", "es_hotelservices", "es_barsandrestaurants",
# "es_tech", "es_sportsandtoys", "es_wellnessandbeauty",
# "es_hyper", "es_fashion", "es_home", "es_contents",
# "es_travel", "es_leisure"]
#
# # One Hot Encode the newly defined mappings
# one_hot_encoder_mapping = {'category': main_categories,
# 'gender': list(transactions_data.gender.unique())}
#
# # Define the graph steps
# transaction_set.graph\
# .to(DateExtractor(parts = ['hour', 'day_of_week'], timestamp_col = 'timestamp'))\
# .to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True))\
# .to(OneHotEncoder(mapping=one_hot_encoder_mapping)).respond()
#
#
# # Add aggregations for 2, 12, and 24 hour time windows
# transaction_set.add_aggregation(name='amount',
# column='amount',
# operations=['avg','sum', 'count','max'],
# windows=['2h', '12h', '24h'],
# period='1h')
# value check
if result['calc']!=78177:
raise ValueError("Invalid calculation, expected value 78177")

@TSBase.handler_testcase
def _complex(self, testcase_name):

func = mlrun.code_to_function(f"ts602_fn",
kind="serving",
filename="./qgate_sln_mlrun/ts/ts06_pipeline/ts602_ext_code.py")
graph_echo = func.set_topology("flow")
graph_echo.to(handler="step1", full_event=True, name="step1") \
.to(handler="step2", full_event=True, name="step2") \
.to(handler="step3", full_event=True, name="step3") \
.to(handler="step4", full_event=True, name="step4").respond()

# tests
echo_server = func.to_mock_server(current_function="*")
result = echo_server.test("", {"a": 5, "b": 7})
echo_server.wait_for_completion()

# value check
if result['calc']!=78177:
raise ValueError("Invalid calculation, expected value 78177")


78 changes: 65 additions & 13 deletions qgate_sln_mlrun/ts/ts06_pipeline/ts602_ext_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,85 @@ def __init__(self, context, name=None, **kw):
self.kw = kw

def do(self, event):
if self.name=="first":
self.first(event)
if self.name=="step1":
self.step1(event)
elif self.name=="step2":
self.step2(event)
elif self.name=="step3":
self.step3(event)
elif self.name=="step4":
self.step4(event)
return event

def first(self, event):

def step1(self, event):
if isinstance(event, mlrun.serving.server.MockEvent):
data=event.body
else:
data=event
calc = data['a'] * data['b']
data['calc']=calc
return event

def step2(self, event):
if isinstance(event, mlrun.serving.server.MockEvent):
data=event.body
else:
data=event
calc = data['calc'] + data['a'] + data['b']
data['calc']=calc
return event

# data = {"calc": calc}
def step3(self, event):
if isinstance(event, mlrun.serving.server.MockEvent):
data=event.body
else:
data=event
calc = data['calc'] + min(data['a'], data['b'])
data['calc']=calc
return event

def second(event):
def step4(self, event):
if isinstance(event, mlrun.serving.server.MockEvent):
data=event.body
else:
data=event
calc = data['calc'] + pow(data['a'], data['b'])
data['calc']=calc
return event


def step1(event):
if isinstance(event, mlrun.serving.server.MockEvent):
data=event.body
data = event.body
else:
data=event
calc = data["a"] + data["b"]
# data = {"calc": calc}
data.clear()
data['calc']=calc
data = event
calc = data['a'] * data['b']
data['calc'] = calc
return event

def third(event):
def step2(event):
if isinstance(event, mlrun.serving.server.MockEvent):
data = event.body
else:
data = event
calc = data['calc'] + data['a'] + data['b']
data['calc'] = calc
return event

def step3(event):
if isinstance(event, mlrun.serving.server.MockEvent):
data = event.body
else:
data = event
calc = data['calc'] + min(data['a'], data['b'])
data['calc'] = calc
return event

def step4(event):
if isinstance(event, mlrun.serving.server.MockEvent):
data = event.body
else:
data = event
calc = data['calc'] + pow(data['a'], data['b'])
data['calc'] = calc
return event

0 comments on commit 05ca332

Please sign in to comment.