Skip to content

Commit

Permalink
Merge pull request #74 from graphbookai/dev
Browse files Browse the repository at this point in the history
Release v0.5
  • Loading branch information
rsamf authored Aug 25, 2024
2 parents fe9f666 + 9f71c02 commit e464290
Show file tree
Hide file tree
Showing 58 changed files with 2,542 additions and 851 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ RUN poetry install --no-root --no-directory
COPY . .
RUN make web

EXPOSE 8005 8006 8007
EXPOSE 8005 8006

CMD ["python", "graphbook/server.py", "--web", "--web_dir", "web/dist"]
CMD ["python", "graphbook/main.py", "--web", "--web_dir", "web/dist"]
14 changes: 7 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ Graphbook is in a very early stage of development, so expect minor bugs and rapi
- Autosaving and shareable serialized workflow files
- Registers node code changes without needing a restart
- Monitorable CPU and GPU resource usage
- (BETA) Remote subgraphs for scaling workflows on other Graphbook services

### Planned Features
- Graphbook services and remote DAGs for scalable workflows
- A `graphbook run` command to execute workflows in a CLI
- Step/Resource functions to reduce verbosity
- Step/Resource functions with decorators to reduce verbosity
- Human-in-the-loop Steps for manual feedback/control during DAG execution
- All-code workflows, so users never have to leave their IDE
- UI extensibility
Expand All @@ -58,29 +58,29 @@ Graphbook is in a very early stage of development, so expect minor bugs and rapi
### Install from PyPI
1. `pip install graphbook`
1. `graphbook`
1. Visit http://localhost:8007
1. Visit http://localhost:8005

### Install with Docker
1. Pull and run the downloaded image
```bash
docker run --rm -p 8005:8005 -p 8006:8006 -p 8007:8007 -v $PWD/workflows:/app/workflows rsamf/graphbook:latest
docker run --rm -p 8005:8005 -v $PWD/workflows:/app/workflows rsamf/graphbook:latest
```
1. Visit http://localhost:8007
1. Visit http://localhost:8005

Visit the [docs](https://docs.graphbook.ai) to learn more on how to create custom nodes and workflows with Graphbook.

## Examples
We continually post examples of workflows and custom nodes in our [examples repo](https://github.com/graphbookai/graphbook-examples).

## Collaboration
Graphbook is in active development and very much welcomes contributors. This is a guide on how to run Graphbook in development mode. If you are simply using Graphbook, view the [Getting Started](#getting-started) section.
Graphbook is in active development and very much welcomes contributors. This is a guide on how to run Graphbook in development mode. If you are simply using Graphbook, view the [Getting Started](#getting-started) section.

### Run Graphbook in Development Mode
You can use any other virtual environment solution, but it is highly adviced to use [poetry](https://python-poetry.org/docs/) since our dependencies are specified in poetry's format.
1. Clone the repo and `cd graphbook`
1. `poetry install --with dev`
1. `poetry shell`
1. `python graphbook/server.py`
1. `python graphbook/main.py`
1. `cd web`
1. `npm install`
1. `npm run dev`
4 changes: 4 additions & 0 deletions docs/_static/concepts/graphbookworkers.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 4 additions & 0 deletions docs/_static/concepts/graphbookworkersgraph.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/concepts/workers-vis.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
71 changes: 71 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,74 @@ A Step goes through lifecycle methods upon processing a Note. The below methods
#. ``forward_note``: This is called after the Step processes each Note and is used to route the Note to a certain output slot.
#. ``on_end``: This is called at the end of each graph execution.

Workers
********

In order to maximize the utilization of the GPU during graph execution, we parallelize the preparation of inputs and outputs
for each BatchStep (an extension of Step) across a number of workers.
A BatchStep can require inputs to be prepared and outputs to be saved by workers.
Each worker is a separate process that can run in parallel with others.
Each worker has its own work queue and result queue for incoming work and outgoing results, respectively.
A worker is dedicated to either preparing inputs or saving outputs, but not both. Whether it is preparing inputs or saving outputs, the worker logic
is relatively the same.
The Graphbook worker implementation also accounts for graph changes.
In between graph executions, nodes can be added, removed, or modified, so the workers must adapt to these changes.

Logic Details
=============

.. image:: _static/concepts/graphbookworkersgraph.svg
:alt: Example Graph
:align: center

The above graph is used to explain the worker logic.

.. image:: _static/concepts/graphbookworkers.svg
:alt: Graphbook Worker Concepts Illustration
:align: center


The logic behind the workers is detailed in the following steps (1-6):

#.
A BatchStep prepares the item's parameter inputs and its belonging Note ID and Step ID. The IDs are Python's unique identifiers for each object taken with ``id()``.
We supply the workers with this information, so that when they are done processing the item, they can be matched later with the correct Note and Step.
The actual function, implemented by the BatchStep, is stored inside of a shared dictionary that the workers can access later.
#.
A BatchStep enqueues the item in one of the load and dump queues, so that the workers can access them.
#.
The workers will then dequeue the work from their work queues and execute the corresponding BatchStep's function (``load_fn()`` and ``dump_fn()``) on the item if the BatchStep still exists, but before they do that, they need to check the size of the result queue.
If the result queue is full, the worker will block until space is available.
#.
After the worker has finished processing the item, it will enqueue the result in the result queue. The workers cannot deliver the results directly to the consumer queues because they are provisioned
dynamically by the main process since the graph can have different nodes in between different executions.
#.
The main process will then dequeue the results from the result queues and deliver them to the correct corresponding consumer queues as long as the sum of the sizes of all consumer queues is less than a certain limit and if the consumer node still exists.
#.
The consumer nodes will then dequeue the results from their consumer queues and process them in their correct lifecycle method.
Completed load items will be delivered to ``on_item_batch(results: List[any], items: List[any], notes: List[Note])`` where results, items, and notes are in order; i.e. ``results[i]`` corresponds to input ``items[i]`` and belonging to note ``notes[i]``.
The size of the results, items, and notes lists will be equal to the batch size (or less if the batch size is not met).
Completed dumped items will not be delivered to any lifecycle method.
However, the BatchStep will still search for completed dumped items and keep track of which Note they belong to.
If all dumped items from a Note are completed, then the Note is considered finished and can be delivered to the next Step for processing.
We do this because if a following Step depends on the saving of a particular item from that Note, then that Step will execute too soon.

Worker Performance Visualization
=================================================

Sometimes, we do not know exactly how many workers will be needed. For this reason, Graphbook will offer an auto-scaling feature that will automatically adjust the number of workers based on the workload.
For now, Graphbook offers a visualization about the performance of the workers that can indicate to the user when there are too many or too few workers, so that they can manually adjust the number of workers that they need.
See example below:


.. image:: _static/concepts/workers-vis.png
:alt: Graphbook Worker Performance Visualization
:align: center


The visualization is in the form of a centered bar chart that shows the number of items that are enqueued in the work queues as red bars and the number of items that are in the result and consumer queues as green bars. Refer to the following when reading this chart:

#. If the red bars are consistently longer than the green bars and there's hardly any green, it indicates that there are too few workers.
#. If the red bars are consistently longer than the green bars but there is some green, then it indicates that the graph execution on the main process is just too slow to consume all of the results which, in turn, creates a conjestion in the workers work queues. This is because the result queues have a max size, and if they are full, the workers will be blocked until space is available while the work queues are being loaded. A max size per result queue is enforced to help prevent memory overloading issues.
#. If the green bars are consistently longer than the red bars, it indicates there may be enough or too many workers dependending on your system constraints.
#. If there are no visible bars, it indicates that the workers are not being utilized.
4 changes: 2 additions & 2 deletions docs/guides.rst
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ Then, create a new BatchStep class that uses the RMBG-1.4 model to remove the ba
masks = []
masks.append({"value": path, "type": "image"})
note["masks"] = masks
return {"removed_bg": removed_bg}
return removed_bg
This node will generate masks of the foreground using the RMBG-1.4 model and output the resulting mask as images by saving them to disk.
See that there is one notable difference in RemoveBackground compared to PokemonClassifier.
Expand Down
6 changes: 3 additions & 3 deletions docs/installing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Install from PyPI

#. ``pip install graphbook``
#. ``graphbook``
#. Visit http://localhost:8007
#. Visit http://localhost:8005

Install with Docker
===================
Expand All @@ -19,9 +19,9 @@ Install with Docker

.. code-block:: bash
docker run --rm -p 8005:8005 -p 8006:8006 -p 8007:8007 -v $PWD/workflows:/app/workflows rsamf/graphbook:latest
docker run --rm -p 8005:8005 -v $PWD/workflows:/app/workflows rsamf/graphbook:latest
#. Visit http://localhost:8007
#. Visit http://localhost:8005


Install from Source
Expand Down
32 changes: 32 additions & 0 deletions graphbook/doc2md.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from textwrap import dedent


def convert_to_md(docstring: str) -> str:
# docstring = docstring.strip()
docstring = dedent(docstring)
lines = docstring.split("\n")
md = ""
bold_key = False
for line in lines:
if line.strip() == "Args:":
md += "**Parameters**"
bold_key = True
elif line.strip() == "Returns:":
md += "**Returns**"
bold_key = False
elif line.strip() == "Raises:":
md += "**Raises**"
bold_key = False
else:
if bold_key:
entry = line.split(":")
if len(entry) > 1:
key = entry[0].strip()
value = entry[1].strip()
md += f"- **{key}**: {value}"
else:
md += line
md += "\n"
md = md.strip()

return md
17 changes: 17 additions & 0 deletions graphbook/exports.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import graphbook.steps as steps
import graphbook.resources.base as rbase
import graphbook.custom_nodes as custom_nodes
from graphbook.doc2md import convert_to_md
from aiohttp import web

default_exported_steps = {
Expand Down Expand Up @@ -55,6 +56,22 @@ def get_resources(self):
def get_all(self):
return {"steps": self.get_steps(), "resources": self.get_resources()}

def get_step_docstring(self, name):
if name in self.exported_steps:
docstring = self.exported_steps[name].__doc__
if docstring is not None:
docstring = convert_to_md(docstring)
return docstring
return None

def get_resource_docstring(self, name):
if name in self.exported_resources:
docstring = self.exported_resources[name].__doc__
if docstring is not None:
docstring = convert_to_md(docstring)
return docstring
return None

def get_exported_nodes(self):
# Create directory structure for nodes based on their category
def create_dir_structure(nodes):
Expand Down
73 changes: 73 additions & 0 deletions graphbook/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python
import argparse
from graphbook.web import start_web
import os.path as osp

DESCRIPTION = """
Graphbook | ML Workflow Framework
"""

workflow_dir = "./workflow"
nodes_dir = "custom_nodes"
docs_dir = "docs"


def get_args():
parser = argparse.ArgumentParser(
description=DESCRIPTION, formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("--num_workers", type=int, default=1)
parser.add_argument("--continue_on_failure", action="store_true")
parser.add_argument("--copy_outputs", action="store_true")

# Web subcommand
parser.add_argument("--media_dir", type=str, default="/")
parser.add_argument("--web_dir", type=str)
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int, default=8005)
parser.add_argument("--start_media_server", action="store_true")
parser.add_argument("--media_port", type=int, default=8006)
parser.add_argument(
"--root_dir",
type=str,
help="If setting this directory, workflow_dir, nodes_dir, and docs_dir will be ignored",
)
parser.add_argument(
"--workflow_dir",
type=str,
default=workflow_dir,
help="Path to the workflow directory",
)
parser.add_argument(
"--nodes_dir",
type=str,
default=osp.join(workflow_dir, nodes_dir),
help="Path to the custom nodes directory",
)
parser.add_argument(
"--docs_dir",
type=str,
default=osp.join(workflow_dir, docs_dir),
help="Path to the docs directory",
)

return parser.parse_args()


def resolve_paths(args):
if args.root_dir:
args.workflow_dir = args.root_dir
args.nodes_dir = osp.join(args.root_dir, nodes_dir)
args.docs_dir = osp.join(args.root_dir, docs_dir)
return args


def main():
args = get_args()
args = resolve_paths(args)

start_web(args)


if __name__ == "__main__":
main()
30 changes: 16 additions & 14 deletions graphbook/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from aiohttp import web
import os.path as osp


@web.middleware
async def cors_middleware(request: web.Request, handler):
if request.method == "OPTIONS":
Expand All @@ -18,53 +19,54 @@ async def cors_middleware(request: web.Request, handler):
response.headers["Access-Control-Allow-Credentials"] = "true"
return response


class MediaServer:
def __init__(
self,
address="0.0.0.0",
host="0.0.0.0",
port=8006,
root_path="./workflow",
):
self.address = address
self.host = host
self.port = port
self.root_path = root_path
routes = web.RouteTableDef()
self.routes = routes
middlewares = [cors_middleware]
self.app = web.Application(
middlewares=middlewares
)

self.app = web.Application(middlewares=middlewares)

@routes.put("/set")
async def set_var_handler(request: web.Request):
data = await request.json()
root_path = data.get("root_path")
if root_path:
self.root_path = root_path
return web.json_response({"root_path": self.root_path})

@routes.get(r"/{path:.*}")
async def handle(request: web.Request) -> web.Response:
path = request.match_info["path"]
full_path = osp.join(self.root_path, path)
if not osp.exists(full_path):
return web.HTTPNotFound()
with open(full_path, "rb") as f:
content = f.read()
return web.Response(body=content)


return web.FileResponse(full_path)

async def _async_start(self):
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.address, self.port, shutdown_timeout=0.5)
site = web.TCPSite(runner, self.host, self.port, shutdown_timeout=0.5)
await site.start()
await asyncio.Event().wait()

def start(self):
self.app.router.add_routes(self.routes)
print(f"Starting media server at {self.address}:{self.port}")
print(f"Starting media server at {self.host}:{self.port}")
try:
asyncio.run(self._async_start())
except KeyboardInterrupt:
print("Exiting media server")


def create_media_server(args):
server = MediaServer(host=args.host, port=args.media_port, root_path=args.media_dir)
server.start()
Loading

0 comments on commit e464290

Please sign in to comment.