Skip to content

Commit

Permalink
Add k8 worker integration. Added k8 example
Browse files Browse the repository at this point in the history
  • Loading branch information
franalgaba committed Dec 5, 2023
1 parent 23f1641 commit d666c61
Show file tree
Hide file tree
Showing 11 changed files with 502 additions and 434 deletions.
4 changes: 4 additions & 0 deletions examples/kubernetes_simple/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM prefecthq/prefect:2-python3.11

COPY requirements.txt .
RUN pip install -r requirements.txt --trusted-host pypi.python.org --no-cache-dir
33 changes: 33 additions & 0 deletions examples/kubernetes_simple/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Kubernetes Deployment Example

This guide will walk you through the process of creating a Docker image and deploying an Action using Kubernetes on Google Cloud Platform (GCP).

## Prerequisites

- Google Cloud SDK installed and authenticated
- Docker installed
- Prefect installed

## Steps

1. Authenticate with Google Cloud:
```
gcloud auth login
```

2. Create a Docker repository in Artifact Registry:
```
gcloud artifacts repositories create prefect-test --repository-format=docker --location=us-west2 --description="Docker repository for testing Prefect"
```

3. Build the Docker image and push it to the Artifact Registry:
```
gcloud builds submit --region=us-west2 --tag us-west2-docker.pkg.dev/giza-platform/prefect-test/prefect-flow-test:v0
```

4. Deploy the Action:
```
python deployment.py
```

After following these steps, your Action should be successfully deployed on Kubernetes.
10 changes: 10 additions & 0 deletions examples/kubernetes_simple/build_deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

# Authenticate with Google Cloud
gcloud auth login

# Build the Docker image and push it to the Artifact Registry
gcloud builds submit --region=us-west2 --tag us-west2-docker.pkg.dev/giza-platform/prefect-test/prefect-flow-test:v0

# Deploy the Action
python deployment.py
31 changes: 31 additions & 0 deletions examples/kubernetes_simple/deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from prefect.deployments import Deployment

# from prefect import flow
# from prefect import task

from giza.action import action
from giza.task import task

@task
def preprocess():
print(f"Preprocessing...")


@task
def transform():
print(f"Transforming...")


# @model(id=1, version=1)
# @flow
@action
def inference():
# Load ONNX model for Action inference
preprocess()
transform()
print("Hello world!")

if __name__ == '__main__':

inference.deploy(name="inference-action-deployment-k8")
# inference.serve(name="inference-action-deployment")
57 changes: 43 additions & 14 deletions giza/action.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,69 @@
import os
from functools import wraps
from typing import Callable

from prefect import Flow
from prefect import flow as _flow
from prefect.flows import load_flow_from_entrypoint
from prefect.settings import PREFECT_API_URL, update_current_profile

LOCAL_SERVER = "http://localhost:4200"
REMOTE_SERVER = os.environ.get("REMOTE_SERVER")


class Action:
def __init__(self, flow: Flow, name: str):
self.name = name
self._flow = flow
self._set_settings()

def _set_settings(self):
# Remote server: http://34.128.165.144
update_current_profile(settings={PREFECT_API_URL: f"{REMOTE_SERVER}/api"})

def _update_api_url(self, api_url: str):
update_current_profile(settings={PREFECT_API_URL: api_url})

def get_flow(self):
return self._flow

def serve(self, name: str):
def deploy(self, name: str):
# Deploy the flow to the platform
self._flow.serve(name=name)
self._update_api_url(api_url=REMOTE_SERVER)
self._flow.deploy(
name=name,
# entrypoint="flows/deployment.py:inference",
work_pool_name="k8-pool",
image="europe-west1-docker.pkg.dev/giza-platform/prefect-test/prefect-flow-test:v0",
build=False,
push=False,
print_next_steps=False,
)

def serve(self, name: str):
# Deploy the flow locally
self._update_api_url(api_url=f"{LOCAL_SERVER}/api")
self._flow.serve(name=name, print_starting_message=True)

def execute(self):
# Implement the execution logic here
load_flow_from_entrypoint(self._flow)

def apply(self, inference_id: int):
# Implement the apply logic here
pass


def action(*args, **kwargs) -> Callable:
def decorator(function):
@_flow(name=function.__name__, *args, **kwargs)
@wraps(function)
def _inner(*args, **kwargs):
return function(*args, **kwargs)
if args and callable(args[0]):
function = args[0]
args = args[1:]
return action(*args, **kwargs)(function)
else:

def decorator(function):
@_flow(name=function.__name__, *args, **kwargs)
@wraps(function)
def _inner(*args, **kwargs):
return function(*args, **kwargs)

action_instance = Action(flow=_inner, name=function.__name__)
return action_instance.get_flow()
action_instance = Action(flow=_inner, name=function.__name__)
return action_instance

return decorator
return decorator
178 changes: 0 additions & 178 deletions giza/cairo/data_converter.py

This file was deleted.

Loading

0 comments on commit d666c61

Please sign in to comment.