Skip to content

Commit

Permalink
Merge pull request #84 from graphbookai/dev
Browse files Browse the repository at this point in the history
v0.6.0
  • Loading branch information
rsamf authored Sep 12, 2024
2 parents f3c5e45 + b0e5ebd commit 2b4b265
Show file tree
Hide file tree
Showing 55 changed files with 5,172 additions and 1,153 deletions.
26 changes: 24 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
## Overview
Graphbook is a framework for building efficient, visual DAG-structured ML workflows composed of nodes written in Python. Graphbook provides common ML processing features such as multiprocessing IO and automatic batching, and it features a web-based UI to assemble, monitor, and execute data processing workflows. It can be used to prepare training data for custom ML models, experiment with custom trained or off-the-shelf models, and to build ML-based ETL applications. Custom nodes can be built in Python, and Graphbook will behave like a framework and call lifecycle methods on those nodes.

<p align="center">
<a href="https://graphbook.ai">
<img src="https://media.githubusercontent.com/media/rsamf/public/main/docs/overview/huggingface-pipeline-demo.gif" alt="Huggingface Pipeline Demo" width="512">
</a>
<div align="center">Build, run, monitor!</div>
</p>

## Status
Graphbook is in a very early stage of development, so expect minor bugs and rapid design changes through the coming releases. If you would like to [report a bug](https://github.com/graphbookai/graphbook/issues/new?template=bug_report.md&labels=bug) or [request a feature](https://github.com/graphbookai/graphbook/issues/new?template=feature_request.md&labels=enhancement), please feel free to do so. We aim to make Graphbook serve our users in the best way possible.

Expand All @@ -44,15 +51,27 @@ Graphbook is in a very early stage of development, so expect minor bugs and rapi
- Autosaving and shareable serialized workflow files
- Registers node code changes without needing a restart
- Monitorable CPU and GPU resource usage
- (BETA) Remote subgraphs for scaling workflows on other Graphbook services
- (BETA) Third Party Plugins *

\* We plan on adding documentation for the community to build plugins, but for now, an example can be seen at
[example_plugin](example_plugin) and
[graphbook-huggingface](https://github.com/graphbookai/graphbook-huggingface)

### Planned Features
- A `graphbook run` command to execute workflows in a CLI
- Step/Resource functions with decorators to reduce verbosity
- Human-in-the-loop Steps for manual feedback/control during DAG execution
- All-code workflows, so users never have to leave their IDE
- UI extensibility
- And many optimizations for large data processing workloads
- Remote subgraphs for scaling workflows on other Graphbook services

### Supported OS
The following operating systems are supported in order of most to least recommended:
- Linux
- Mac
- Windows (not recommended) *

\* There may be issues with running Graphbook on Windows. With limited resources, we can only focus testing and development on Linux.

## Getting Started
### Install from PyPI
Expand All @@ -67,6 +86,9 @@ Graphbook is in a very early stage of development, so expect minor bugs and rapi
```
1. Visit http://localhost:8005

### Recommended Plugins
* [Huggingface](https://github.com/graphbookai/graphbook-huggingface)

Visit the [docs](https://docs.graphbook.ai) to learn more on how to create custom nodes and workflows with Graphbook.

## Examples
Expand Down
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

# Add list of installed plugins to the list below
plugins: ["example_plugin"]
2 changes: 1 addition & 1 deletion docs/_static/concepts/graphbookworkers.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 9 additions & 11 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Step Lifecycle
A Step goes through lifecycle methods upon processing a Note. The below methods are open for extension and are subject to change in future versions of Graphbook.

#. ``__init__``: The constructor of the Step. This is where you can set up the Step and its parameters. This will not be re-called if a node's code or its parameter values have not changed.
#. ``on_clear``: Called when the step is cleared. This is where you can delete any state that the Step has stored.
#. ``on_start``: This is called at the start of each graph execution.
#. ``on_before_items``: This is called before the Step processes each Note.
#. ``on_item``: This is called for each item.
Expand All @@ -37,11 +38,10 @@ In order to maximize the utilization of the GPU during graph execution, we paral
for each BatchStep (an extension of Step) across a number of workers.
A BatchStep can require inputs to be prepared and outputs to be saved by workers.
Each worker is a separate process that can run in parallel with others.
Each worker has its own work queue and result queue for incoming work and outgoing results, respectively.
A worker is dedicated to either preparing inputs or saving outputs, but not both. Whether it is preparing inputs or saving outputs, the worker logic
is relatively the same.
The Graphbook worker implementation also accounts for graph changes.
In between graph executions, nodes can be added, removed, or modified, so the workers must adapt to these changes.
In between graph executions, the workers are restarted to register any code changes, and the queues along with its elements are retained and given to the new workers.

Logic Details
=============
Expand All @@ -60,19 +60,15 @@ The above graph is used to explain the worker logic.
The logic behind the workers is detailed in the following steps (1-6):

#.
A BatchStep prepares the item's parameter inputs and its belonging Note ID and Step ID. The IDs are Python's unique identifiers for each object taken with ``id()``.
We supply the workers with this information, so that when they are done processing the item, they can be matched later with the correct Note and Step.
A BatchStep prepares the item's parameter inputs.
The actual function, implemented by the BatchStep, is stored inside of a shared dictionary that the workers can access later.
#.
A BatchStep enqueues the item in one of the load and dump queues, so that the workers can access them.
A BatchStep enqueues the item in one of the load and dump queues, so that the workers can access them. The item is stored in its respective queue based on the ``id()`` of the BatchStep.
#.
The workers will then dequeue the work from their work queues and execute the corresponding BatchStep's function (``load_fn()`` and ``dump_fn()``) on the item if the BatchStep still exists, but before they do that, they need to check the size of the result queue.
If the result queue is full, the worker will block until space is available.
If the result queue is full, the worker will block until space is available. The workers will rotate between queues in a round-robin fashion.
#.
After the worker has finished processing the item, it will enqueue the result in the result queue. The workers cannot deliver the results directly to the consumer queues because they are provisioned
dynamically by the main process since the graph can have different nodes in between different executions.
#.
The main process will then dequeue the results from the result queues and deliver them to the correct corresponding consumer queues as long as the sum of the sizes of all consumer queues is less than a certain limit and if the consumer node still exists.
After the worker has finished processing the item, it will enqueue the result in its respective result queue.
#.
The consumer nodes will then dequeue the results from their consumer queues and process them in their correct lifecycle method.
Completed load items will be delivered to ``on_item_batch(results: List[any], items: List[any], notes: List[Note])`` where results, items, and notes are in order; i.e. ``results[i]`` corresponds to input ``items[i]`` and belonging to note ``notes[i]``.
Expand All @@ -95,7 +91,9 @@ See example below:
:align: center


The visualization is in the form of a centered bar chart that shows the number of items that are enqueued in the work queues as red bars and the number of items that are in the result and consumer queues as green bars. Refer to the following when reading this chart:
The visualization is in the form of a centered bar chart that shows the number of items that are enqueued in the work queues as red bars and the number of items that are in the result and consumer queues as green bars.
Because the result queue has a max size of 32, each half of the chart is clipped at 32 to show a relative comparison between the two queue types.
Refer to the following when reading this chart:

#. If the red bars are consistently longer than the green bars and there's hardly any green, it indicates that there are too few workers.
#. If the red bars are consistently longer than the green bars but there is some green, then it indicates that the graph execution on the main process is just too slow to consume all of the results which, in turn, creates a conjestion in the workers work queues. This is because the result queues have a max size, and if they are full, the workers will be blocked until space is available while the work queues are being loaded. A max size per result queue is enforced to help prevent memory overloading issues.
Expand Down
24 changes: 12 additions & 12 deletions docs/guides.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ Inside this directory, create a new Python file called `my_first_nodes.py`, and
}
Outputs = ["A", "B"]
Category = "Custom"
def __init__(self, id, logger, prob):
super().__init__(id, logger)
def __init__(self, prob):
super().__init__()
self.prob = prob
def on_after_items(self, note: Note):
self.logger.log(note['message'])
self.log(note['message'])
def forward_note(self, note: Note) -> str:
if random.random() < self.prob:
Expand Down Expand Up @@ -79,8 +79,8 @@ Let's create a Source Step that generates fake data.
}
Outputs = ["message"]
Category = "Custom"
def __init__(self, id, logger, message):
super().__init__(id, logger)
def __init__(self, message):
super().__init__()
self.message = message
def load(self):
Expand Down Expand Up @@ -140,8 +140,8 @@ Create a new Source Step that loads the images and their labels:
}
}
def __init__(self, id, logger, image_dir: str):
super().__init__(id, logger)
def __init__(self, image_dir: str):
super().__init__()
self.image_dir = image_dir
def load(self):
Expand Down Expand Up @@ -195,7 +195,7 @@ Let's create the below BatchStep class that uses this model to classify the imag
batch_size,
item_key
):
super().__init__(id, logger, batch_size, item_key)
super().__init__(batch_size, item_key)
model_name = "imjeffhi/pokemon_classifier"
self.model = ViTForImageClassification.from_pretrained(model_name)
self.model = self.model.to('cuda') # If you do not have an Nvidia GPU, you can remove this line
Expand Down Expand Up @@ -224,12 +224,12 @@ Let's create the below BatchStep class that uses this model to classify the imag
predicted_id = self.model(**extracted).logits.argmax(-1)
for t, item, note in zip(predicted_id, items, notes):
item["prediction"] = self.model.config.id2label[t.item()]
self.logger.log(f"Predicted {item['value']} as {item['prediction']}")
self.log(f"Predicted {item['value']} as {item['prediction']}")
if item["prediction"] == note["name"]:
self.tp += 1
self.num_samples += 1
if self.num_samples > 0:
self.logger.log(f"Accuracy: {self.tp/self.num_samples:.2f}")
self.log(f"Accuracy: {self.tp/self.num_samples:.2f}")
.. _PyTorch: https://pytorch.org/
Expand Down Expand Up @@ -305,7 +305,7 @@ The top of your PokemonClassifier node should look like this:
model: ViTForImageClassification,
image_processor: ViTImageProcessor,
):
super().__init__(id, logger, batch_size, item_key)
super().__init__(batch_size, item_key)
self.model = model
self.image_processor = image_processor
self.tp = 0
Expand Down Expand Up @@ -433,7 +433,7 @@ Then, create a new BatchStep class that uses the RMBG-1.4 model to remove the ba
output_dir,
model: AutoModelForImageSegmentation,
):
super().__init__(id, logger, batch_size, item_key)
super().__init__(batch_size, item_key)
self.model = model
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
Expand Down
10 changes: 10 additions & 0 deletions example_plugin/example_plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .nodes import SimpleStep
from .nodes import SimpleResource
import os.path as osp
from graphbook.plugins import export, web


export("SimpleStep", SimpleStep)
export("SimpleResource", SimpleResource)

web(osp.realpath(osp.join(osp.dirname(__file__), "../web/dist/bundle.js")))
19 changes: 19 additions & 0 deletions example_plugin/example_plugin/nodes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import graphbook.steps as steps
import graphbook.resources as resources

class SimpleStep(steps.Step):
RequiresInput = True
Parameters = {}
Outputs = ["out"]
Category = "Simple"
def __init__(self):
super().__init__()

def on_item(self, item, note):
self.log(item)

class SimpleResource(resources.Resource):
Parameters = {}
Category = "Simple"
def __init__(self):
super().__init__("Hello, World!")
14 changes: 14 additions & 0 deletions example_plugin/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "example_plugin"
version = "0.0.0"
dependencies = [
]
requires-python = ">=3.10"
authors = [
]

description = ""
Loading

0 comments on commit 2b4b265

Please sign in to comment.