Skip to content

Commit

Permalink
Improve agent plugin documentation with running examples and addition…
Browse files Browse the repository at this point in the history
…al notes

Signed-off-by: Jason Parraga <[email protected]>
  • Loading branch information
Jason Parraga committed May 21, 2024
1 parent 16d2b14 commit 8cfeb6f
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 40 deletions.
62 changes: 44 additions & 18 deletions docs/flyte_agents/developing_agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,46 +41,72 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl
- `get`: This method retrieves the job resource (jobID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID.
- `delete`: Invoking this method will send a request to delete the corresponding job.

Below is a skeleton for an example async agent. Modify it as needed.

```python
# agent.py
from typing import Optional
from dataclasses import dataclass

from flyteidl.core.execution_pb2 import TaskExecution
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate
from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta


@dataclass
class BigQueryMetadata(ResourceMeta):
class ExampleMetadata(ResourceMeta):
"""
This is the metadata for the job. For example, the id of the job.
"""

job_id: str

class BigQueryAgent(AsyncAgentBase):

class ExampleAgent(AsyncAgentBase):
def __init__(self):
super().__init__(task_type_name="bigquery", metadata_type=BigQueryMetadata)
super().__init__(task_type_name="example", metadata_type=ExampleMetadata)

def create(
self,
task_template: TaskTemplate,
inputs: Optional[LiteralMap] = None,
**kwargs,
) -> BigQueryMetadata:
job_id = submit_bigquery_job(inputs)
return BigQueryMetadata(job_id=job_id)

def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
phase, outputs = get_job_status(resource_meta.job_id)
return Resource(phase=phase, outputs=outputs)

def delete(self, resource_meta: BigQueryMetadata, **kwargs):
cancel_bigquery_job(resource_meta.job_id)

# To register the bigquery agent
AgentRegistry.register(BigQueryAgent())
) -> ExampleMetadata:
print(f"create called task_template={task_template}")
# pull out plugin specific configuration from the task
custom = task_template.custom
# pull out the environment field set from the task
environment = custom["environment"]

# submit job on external platform
# job_id = submit_job(inputs, environment)
job_id = "temp"

# return metadata which will be used for following get & delete calls
return ExampleMetadata(job_id=job_id)

def get(self, resource_meta: ExampleMetadata, **kwargs) -> Resource:
print(f"get called resource_meta={resource_meta}")
# query external platform for job status
# phase = get_job_status(resource_meta.job_id)
phase = TaskExecution.SUCCEEDED

# phases can be TaskExecution.RUNNING, TaskExecution.SUCCEEDED, TaskExecution.FAILED, etc
return Resource(phase=phase)

def delete(self, resource_meta: ExampleMetadata, **kwargs):
print(f"delete called resource_meta={resource_meta}")
# cancel job on external platform
# cancel_job(resource_meta.job_id)
pass


# To register the example agent
AgentRegistry.register(ExampleAgent())
```

For an example implementation, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43).
For an example implementation of a real agent, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43).

#### Sync agent interface specification

Expand Down Expand Up @@ -193,7 +219,7 @@ By running agents independently, you can thoroughly test and validate your agent
controlled environment before deploying them to the production cluster.

By default, all agent requests will be sent to the default agent service. However,
you can route particular task requests to designated agent services by adjusting the FlytePropeller configuration.
you can route particular task requests to designated agent services by adjusting the FlytePropeller configuration.

```yaml
plugins:
Expand Down
117 changes: 95 additions & 22 deletions docs/flyte_agents/testing_agents_in_a_local_python_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,48 +14,121 @@ You can test agents locally without running the backend server.
To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`.
These mixins can handle synchronous and asynchronous tasks, respectively, and allow flytekit to mimic FlytePropeller's behavior in calling the agent.

## BigQuery example
## Example Task

To test the BigQuery agent, copy the following code to a file called `bigquery_task.py`, modifying as needed.
To test the example agent defined in {ref}`developing agents <developing_agents>`, copy the following code to a file called `task.py`, modifying as needed.

```python
# task.py
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, Dict, Optional

from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct

from flytekit import PythonFunctionTask
from flytekit.configuration import SerializationSettings
from flytekit.extend import TaskPlugins
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin


@dataclass
class ExampleConfig(object):
"""
ExampleConfig should be used to configure an ExampleTask.
"""

environment: str


# Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
# This task extends PythonFunctionTask but you can extend different base tasks depending on your needs (ie. SQLTask)
class ExampleTask(AsyncAgentExecutorMixin, PythonFunctionTask[ExampleConfig]):
# This must match the task type defined in the agent
_TASK_TYPE = "example"

def __init__(
self,
task_config: Optional[ExampleConfig],
task_function: Callable,
**kwargs,
) -> None:
outputs = None
super().__init__(
task_config=task_config,
task_function=task_function,
task_type=self._TASK_TYPE,
**kwargs,
)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
"""
Return plugin-specific data as a serializable dictionary. This is required for your plugin to access task_template.custom.
"""
config = {
"environment": self.task_config.environment,
}
s = Struct()
s.update(config)
return json_format.MessageToDict(s)


# Register the Example Task into the flytekit core plugin system
TaskPlugins.register_pythontask_plugin(ExampleConfig, ExampleTask)
```

```{note}
In some cases, you will need to store credentials in your local environment when testing locally.
For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent.
The ExampleTask implements `get_custom` which is originally defined in the base Task object. You will need to implement
`_get_custom` if you wish to pass plugin-specific data through the task_template's `custom` field.
```

Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task.
Flytekit will automatically use the agent to run the task in the local execution.
```python
class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
...
# example.py
from flytekit.configuration.default_images import DefaultImages
from flytekit import task

class ChatGPTTask(SyncAgentExecutorMixin, PythonTask):
...
# Import agent to trigger agent registration
from .agent import ExampleAgent
# Import task to trigger task registration and pass plugin specific config
from .task import ExampleConfig

```

Flytekit will automatically use the agent to run the task in the local execution.
```python
bigquery_doge_coin = BigQueryTask(
name=f"bigquery.doge_coin",
inputs=kwtypes(version=int),
query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;",
output_structured_dataset_type=StructuredDataset,
task_config=BigQueryConfig(ProjectID="flyte-test-340607")
)
@task(task_config=ExampleConfig(environment="dev"), container_image=DefaultImages.default_image())
def say_hello(name: str) -> str:
print(f"Hello, {name}!")
return f"Hello, {name}!"

```

You can run the above example task locally and test the agent with the following command:
You can run locally and test the agent with the following command:

```bash
pyflyte run bigquery_task.py bigquery_doge_coin --version 10
pyflyte run example.py example --name world
Running Execution on local.
create called task_template=<FlyteLiteral(TaskTemplate) id { resource_type: TASK name: "armada_test_agent.example.say_hello" } type: "example" metadata { runtime { type: FLYTE_SDK version: "1.12.0" flavor: "python" } retries { } } interface { inputs { variables { key: "name" value { type { simple: STRING } description: "name" } } } outputs { variables { key: "o0" value { type { simple: STRING } description: "o0" } } } } custom { fields { key: "environment" value { string_value: "dev" } } } container { image: "cr.flyte.org/flyteorg/flytekit:py3.11-1.12.0" args: "pyflyte-execute" args: "--inputs" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/inputs.pb" args: "--output-prefix" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9" args: "--raw-output-data-prefix" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/raw_output" args: "--checkpoint-path" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/checkpoint_output" args: "--prev-checkpoint" args: "/tmp/flyte-od3iwm33/raw/0991ba2310db3416e9ad85aba218c0d9/prev_checkpoint" args: "--resolver" args: "flytekit.core.python_auto_container.default_task_resolver" args: "--" args: "task-module" args: "armada_test_agent.example" args: "task-name" args: "say_hello" resources { } }>
get called resource_meta=ExampleMetadata(job_id='temp')
```

You can also run a BigQuery task in your Python interpreter to test the agent locally.
If it doesn't appear that your agent is running you can debug what might be going on by passing the `-v` verbose flag to `pyflyte`.

## Existing Flyte Agent Tasks

You can also run a existing Flyte Agent tasks in your Python interpreter to test the agent locally.

![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png)


```{note}
In some cases, you will need to store credentials in your local environment when testing locally.
For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent.
```

## Databricks example
To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed.

Expand Down

0 comments on commit 8cfeb6f

Please sign in to comment.