diff --git a/docs/flyte_agents/developing_agents.md b/docs/flyte_agents/developing_agents.md index fe55630248..c226970cc7 100644 --- a/docs/flyte_agents/developing_agents.md +++ b/docs/flyte_agents/developing_agents.md @@ -41,46 +41,72 @@ To create a new async agent, extend the [`AsyncAgentBase`](https://github.com/fl - `get`: This method retrieves the job resource (jobID or output literal) associated with the task, such as a BigQuery job ID or Databricks task ID. - `delete`: Invoking this method will send a request to delete the corresponding job. +Below is a skeleton for an example async agent. Modify it as needed. + ```python +# agent.py from typing import Optional from dataclasses import dataclass + +from flyteidl.core.execution_pb2 import TaskExecution from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate from flytekit.extend.backend.base_agent import AsyncAgentBase, AgentRegistry, Resource, ResourceMeta @dataclass -class BigQueryMetadata(ResourceMeta): +class ExampleMetadata(ResourceMeta): """ This is the metadata for the job. For example, the id of the job. """ + job_id: str -class BigQueryAgent(AsyncAgentBase): + +class ExampleAgent(AsyncAgentBase): def __init__(self): - super().__init__(task_type_name="bigquery", metadata_type=BigQueryMetadata) + super().__init__(task_type_name="example", metadata_type=ExampleMetadata) def create( self, task_template: TaskTemplate, inputs: Optional[LiteralMap] = None, **kwargs, - ) -> BigQueryMetadata: - job_id = submit_bigquery_job(inputs) - return BigQueryMetadata(job_id=job_id) - - def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource: - phase, outputs = get_job_status(resource_meta.job_id) - return Resource(phase=phase, outputs=outputs) - - def delete(self, resource_meta: BigQueryMetadata, **kwargs): - cancel_bigquery_job(resource_meta.job_id) - -# To register the bigquery agent -AgentRegistry.register(BigQueryAgent()) + ) -> ExampleMetadata: + print(f"create called task_template={task_template}") + # pull out plugin specific configuration from the task + custom = task_template.custom + # pull out the environment field set from the task + environment = custom["environment"] + + # submit job on external platform + # job_id = submit_job(inputs, environment) + job_id = "temp" + + # return metadata which will be used for following get & delete calls + return ExampleMetadata(job_id=job_id) + + def get(self, resource_meta: ExampleMetadata, **kwargs) -> Resource: + print(f"get called resource_meta={resource_meta}") + # query external platform for job status + # phase = get_job_status(resource_meta.job_id) + phase = TaskExecution.SUCCEEDED + + # phases can be TaskExecution.RUNNING, TaskExecution.SUCCEEDED, TaskExecution.FAILED, etc + return Resource(phase=phase) + + def delete(self, resource_meta: ExampleMetadata, **kwargs): + print(f"delete called resource_meta={resource_meta}") + # cancel job on external platform + # cancel_job(resource_meta.job_id) + pass + + +# To register the example agent +AgentRegistry.register(ExampleAgent()) ``` -For an example implementation, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43). +For an example implementation of a real agent, see the [BigQuery agent](https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L43). #### Sync agent interface specification @@ -193,7 +219,7 @@ By running agents independently, you can thoroughly test and validate your agent controlled environment before deploying them to the production cluster. By default, all agent requests will be sent to the default agent service. However, -you can route particular task requests to designated agent services by adjusting the FlytePropeller configuration. +you can route particular task requests to designated agent services by adjusting the FlytePropeller configuration. ```yaml plugins: diff --git a/docs/flyte_agents/testing_agents_in_a_local_python_environment.md b/docs/flyte_agents/testing_agents_in_a_local_python_environment.md index ef26642d6b..d3ebf43927 100644 --- a/docs/flyte_agents/testing_agents_in_a_local_python_environment.md +++ b/docs/flyte_agents/testing_agents_in_a_local_python_environment.md @@ -14,48 +14,121 @@ You can test agents locally without running the backend server. To test an agent locally, create a class for the agent task that inherits from `SyncAgentExecutorMixin` or `AsyncAgentExecutorMixin`. These mixins can handle synchronous and asynchronous tasks, respectively, and allow flytekit to mimic FlytePropeller's behavior in calling the agent. -## BigQuery example +## Example Task -To test the BigQuery agent, copy the following code to a file called `bigquery_task.py`, modifying as needed. +To test the example agent defined in {ref}`developing agents `, copy the following code to a file called `task.py`, modifying as needed. + +```python +# task.py +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any, Dict, Optional + +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct + +from flytekit import PythonFunctionTask +from flytekit.configuration import SerializationSettings +from flytekit.extend import TaskPlugins +from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin + + +@dataclass +class ExampleConfig(object): + """ + ExampleConfig should be used to configure an ExampleTask. + """ + + environment: str + + +# Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task. +# This task extends PythonFunctionTask but you can extend different base tasks depending on your needs (ie. SQLTask) +class ExampleTask(AsyncAgentExecutorMixin, PythonFunctionTask[ExampleConfig]): + # This must match the task type defined in the agent + _TASK_TYPE = "example" + + def __init__( + self, + task_config: Optional[ExampleConfig], + task_function: Callable, + **kwargs, + ) -> None: + outputs = None + super().__init__( + task_config=task_config, + task_function=task_function, + task_type=self._TASK_TYPE, + **kwargs, + ) + + def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + """ + Return plugin-specific data as a serializable dictionary. This is required for your plugin to access task_template.custom. + """ + config = { + "environment": self.task_config.environment, + } + s = Struct() + s.update(config) + return json_format.MessageToDict(s) + + +# Register the Example Task into the flytekit core plugin system +TaskPlugins.register_pythontask_plugin(ExampleConfig, ExampleTask) +``` ```{note} -In some cases, you will need to store credentials in your local environment when testing locally. -For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent. +The ExampleTask implements `get_custom` which is originally defined in the base Task object. You will need to implement +`_get_custom` if you wish to pass plugin-specific data through the task_template's `custom` field. ``` -Add `AsyncAgentExecutorMixin` or `SyncAgentExecutorMixin` to the class to tell flytekit to use the agent to run the task. +Flytekit will automatically use the agent to run the task in the local execution. ```python -class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]): - ... +# example.py +from flytekit.configuration.default_images import DefaultImages +from flytekit import task -class ChatGPTTask(SyncAgentExecutorMixin, PythonTask): - ... +# Import agent to trigger agent registration +from .agent import ExampleAgent +# Import task to trigger task registration and pass plugin specific config +from .task import ExampleConfig -``` -Flytekit will automatically use the agent to run the task in the local execution. -```python -bigquery_doge_coin = BigQueryTask( - name=f"bigquery.doge_coin", - inputs=kwtypes(version=int), - query_template="SELECT * FROM `bigquery-public-data.crypto_dogecoin.transactions` WHERE version = @version LIMIT 10;", - output_structured_dataset_type=StructuredDataset, - task_config=BigQueryConfig(ProjectID="flyte-test-340607") -) +@task(task_config=ExampleConfig(environment="dev"), container_image=DefaultImages.default_image()) +def say_hello(name: str) -> str: + print(f"Hello, {name}!") + return f"Hello, {name}!" + ``` -You can run the above example task locally and test the agent with the following command: +You can run locally and test the agent with the following command: ```bash -pyflyte run bigquery_task.py bigquery_doge_coin --version 10 +pyflyte run example.py example --name world +Running Execution on local. +create called task_template= +get called resource_meta=ExampleMetadata(job_id='temp') ``` -You can also run a BigQuery task in your Python interpreter to test the agent locally. +If it doesn't appear that your agent is running you can debug what might be going on by passing the `-v` verbose flag to `pyflyte`. + +## Existing Flyte Agent Tasks + +You can also run a existing Flyte Agent tasks in your Python interpreter to test the agent locally. ![](https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/agents/bigquery_task.png) + +```{note} + +In some cases, you will need to store credentials in your local environment when testing locally. +For example, you need to set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable when running BigQuery tasks to test the BigQuery agent. + +``` + ## Databricks example To test the Databricks agent, copy the following code to a file called `databricks_task.py`, modifying as needed.