> ## Documentation Index
> Fetch the complete documentation index at: https://docs.encord.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Agents Reference Task Agents

# Custom Workflow Agents

## models

### TaskAgentReturnStruct Objects

```python theme={"dark"}
@dataclass
class TaskAgentReturnStruct()
```

Return this from your agent and we will handle propagating the updates in batches

#### pathway

The pathway that the task will follow on task completion

#### label\_row

The label to be saved (if present)

#### label\_row\_priority

The priority of the label row to be saved.

### TaskCompletionResult Objects

```python theme={"dark"}
class TaskCompletionResult(BaseModel)
```

Data model to hold information about the completion result of
`encord_agents.tasks.QueueRunner` agents.

## dependencies

#### dep\_client

```python theme={"dark"}
def dep_client() -> EncordUserClient
```

Dependency to provide an authenticated user client.

**Example:**

```python theme={"dark"}
from encord.user_client import EncordUserClient
from encord_agents.tasks.dependencies import dep_client
...
@runner.stage("<my_stage_name>")
def my_agent(
    client: Annotated[EncordUserClient, Depends(dep_client)]
) -> str:
    # Client will authenticated and ready to use.
    client.get_dataset("")
```

#### dep\_storage\_item

```python theme={"dark"}
def dep_storage_item(storage_item: StorageItem) -> StorageItem
```

Get the storage item associated with the underlying agent task.

The [`StorageItem`](/sdk-documentation/sdk-references/storage#storageitem-objects)
is useful for multiple things like

* Updating client metadata
* Reading file properties like storage location, fps, duration, DICOM tags, etc.

<Note>
  When marking an agent with the StorageItem dependency, we bulk fetch the storage items for the tasks
  and then inject them independently with each task. Trivial method for backwards compatibility. Can do: storage\_item: StorageItem directly.
</Note>

**Example**

```python theme={"dark"}
from encord.storage import StorageItem
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="<my_stage_name>")
def my_agent(storage_item: Annotated[StorageItem, Depends(dep_storage_item)]) -> str:
    print(storage_item.name)
    print(storage_item.client_metadata)
    ...
```

**Arguments**:

* `storage_item` - StorageItem

**Returns**:

The storage item.

#### dep\_single\_frame

```python theme={"dark"}
def dep_single_frame(storage_item: StorageItem) -> NDArray[np.uint8]
```

Dependency to inject the first frame of the underlying asset.

The downloaded asset will be named `lr.data_hash.{suffix}`.
When the function has finished, the downloaded file will be removed from the file system.

**Example:**

```python theme={"dark"}
from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_single_frame
...

@runner.stage("<my_stage_name>")
def my_agent(
    frame: Annotated[NDArray[np.uint8], Depends(dep_single_frame)]
) -> str:
    assert frame.ndim == 3, "Will work"
```

**Arguments**:

* `storage_item` - The Storage item. Automatically injected (see example above).

**Returns**:

Numpy array of shape \[h, w, 3] RGB colors.

#### dep\_video\_iterator

```python theme={"dark"}
def dep_video_iterator(
        storage_item: StorageItem) -> Generator[Iterator[Frame], None, None]
```

Dependency to inject a video frame iterator for doing things over many frames.
This will use OpenCV and the local backend on your machine.
Decoding support may vary dependent on the video format, codec and your local configuration

**Intended use**

```python theme={"dark"}
from encord_agents import FrameData
from encord_agents.tasks.dependencies import dep_video_iterator
...

@runner.stage("<my_stage_name>")
def my_agent(
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)]
) -> str:
    for frame in video_frames:
        print(frame.frame, frame.content.shape)
```

**Arguments**:

* `storage_item` - Automatically injected Storage item dependency.

**Raises**:

* `NotImplementedError` - Will fail for other data types than video.

**Yields**:

An iterator.

#### dep\_video\_sampler

```python theme={"dark"}
def dep_video_sampler(
    storage_item: StorageItem
) -> Generator[Callable[[float | Sequence[int]], Iterable[Frame]], None, None]
```

Dependency to inject a video sampler for doing things over many frames.
This will use OpenCV and the local backend on your machine.
Decoding support may vary dependent on the video format, codec and your local configuration.

**Arguments**:

* `storage_item` - Automatically injected Storage item dependency.

  **Example:**

```python theme={"dark"}
from encord_agents.tasks.dependencies import dep_video_sampler
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<stage_name_or_uuid>")
def my_agent(
    video_sampler: Annotated[Callable[[float | Sequence[int]], Iterable[Frame]], Depends(dep_video_sampler)],
) -> str | None:
    for frame in video_sampler(1/5):
        # Get every 5th frame
        # i.e: [0,5,10,15,...]
    for frame in video_sampler([1, 2, 3]):
        # Get frames 1, 2, 3
    ...
```

#### dep\_asset

```python theme={"dark"}
def dep_asset(storage_item: StorageItem) -> Generator[Path, None, None]
```

Get a local file path to data asset temporarily stored till end of task execution.

This dependency will fetch the underlying data asset based on a signed url.
It will temporarily store the data on disk. Once the task is completed, the
asset will be removed from disk again.

**Example:**

```python theme={"dark"}
from encord_agents.tasks.dependencies import dep_asset
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<stage_name_or_uuid>")
def my_agent(
    asset: Annotated[Path, Depends(dep_asset)],
) -> str | None:
    asset.stat()  # read file stats
    ...
```

**Returns**:

The path to the asset.

**Raises**:

* `ValueError` - if the underlying assets are not videos, images, or audio.
* `EncordException` - if data type not supported by SDK yet.

### Twin Objects

```python theme={"dark"}
@dataclass(frozen=True)
class Twin()
```

Dataclass to hold "label twin" information.

#### dep\_twin\_label\_row

```python theme={"dark"}
def dep_twin_label_row(
        twin_project_hash: str,
        init_labels: bool = True,
        include_task: bool = False) -> Callable[[LabelRowV2], Twin | None]
```

Dependency to link assets between two Projects. When your `Runner` in running on
`<project_hash_a>`, you can use this to get a `Twin` of labels and the underlying
task in the "twin project" with `<project_hash_b>`.

This is useful in situations like:

* When you want to transfer labels from a source project" to a sink project.
* If you want to compare labels to labels from other projects upon label submission.
* If you want to extend an existing project with labels from another project on the same underlying data.

**Example:**

```python theme={"dark"}
from encord.workflow.common import WorkflowTask
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks.dependencies import Twin, dep_twin_label_row
...
runner = Runner(project_hash="<project_hash_a>")

@runner.stage("<my_stage_name_in_project_a>")
def my_agent(
    project_a_label_row: LabelRowV2,
    twin: Annotated[
        Twin, Depends(dep_twin_label_row(twin_project_hash="<project_hash_b>"))
    ],
) -> str | None:
    label_row_from_project_b: LabelRowV2 = twin.label_row
    task_from_project_b: WorkflowTask = instance.get_answer(attribute=checklist_attribute)
```

**Arguments**:

* `twin_project_hash` - The project has of the twin project (attached to the same datasets)
  from which you want to load the additional data.
* `init_labels` - If true, the label row will be initialized before calling the agent.
* `include_task` - If true, the `task` field of the `Twin` will be populated. If population
  fails, e.g., for non-workflow projects, the task will also be None.

**Returns**:

The twin.

**Raises**:

* `encord.AuthorizationError` - if you do not have access to the project.

#### dep\_data\_lookup

```python theme={"dark"}
def dep_data_lookup(
        lookup: Annotated[DataLookup,
                          Depends(DataLookup.sharable)]) -> DataLookup
```

Get a lookup to easily retrieve data rows and storage items associated with the given task.

!!! warning "Deprecated"
`dep_data_lookup` is deprecated and will be removed in version 0.2.10.
Use `dep_storage_item` instead for accessing storage items.

**Migration Guide:**

```python theme={"dark"}
## Old way (deprecated)
from encord_agents.tasks.dependencies import dep_data_lookup, DataLookup

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    lookup: Annotated[DataLookup, Depends(dep_data_lookup)]
) -> str:
    storage_item = lookup.get_storage_item(task.data_hash)
    client_metadata = storage_item.client_metadata
    ...

## New way (recommended)
from encord_agents.tasks.dependencies import dep_storage_item

@runner.stage(stage="Agent 1")
def my_agent(
    task: AgentTask,
    storage_item: Annotated[StorageItem, Depends(dep_storage_item)]
) -> str:
    # storage_item is directly available
    client_metadata = storage_item.client_metadata

    # Update metadata
    storage_item.update(
        client_metadata={
            "new": "entry",
            **(client_metadata or {})
        }
    )
    ...
```

**Arguments**:

* `lookup` - The object that you can use to lookup data rows and storage items. Automatically injected.

**Returns**:

The (shared) lookup object.

## runner.queue\_runner

### QueueRunner Objects

```python theme={"dark"}
class QueueRunner(RunnerBase)
```

This class is intended to hold agent implementations.
It makes it easy to put agent task specifications into
a queue and then execute them in a distributed fashion.

Below is a template for how that would work.

*Example:*

```python theme={"dark"}
runner = QueueRunner(project_hash="...")

@runner.stage("Agent 1")
def my_agent_implementation() -> str:
    # ... do your thing
    return "<pathway_name>"

## Populate the queue
my_queue = ...
for stage in runner.get_agent_stages():
    for task in stage.get_tasks():
        my_queue.append(task.model_dump_json())

## Execute on the queue
while my_queue:
    task_spec = my_queue.pop()
    result_json = my_agent_implementation(task_spec)
    result = TaskCompletionResult.model_validate_json(result_json)
```

#### \_\_init\_\_

```python theme={"dark"}
def __init__(project_hash: str | UUID)
```

Initialize the QueueRunner with a project hash.

This is the hash of the project that you want to run the tasks on.

**Arguments**:

* `project_hash` - The hash of the project to run the tasks on.

#### stage

```python theme={"dark"}
def stage(
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
    | None = None,
    will_set_priority: bool = False
) -> Callable[[Callable[..., TaskAgentReturnType]], Callable[[str], str]]
```

Agent wrapper intended for queueing systems and distributed workloads.

Define your agent as you are used to with dependencies in the method declaration and
return the pathway from the project workflow that the task should follow upon completion.
The function will be wrapped in logic that does the following (in pseudo code):

When you have an `encord.workflow.stages.agent.AgentTask` instance at hand, let's call
it `task`, then you can call your `wrapped_function` with `task.model_dump_json()`.
Similarly, you can put `task.model_dump_json()` int a queue and read from that queue, e.g.,
from another instance/process, to execute `wrapped_function` there.

As the pseudo code indicates, `wrapped_function` understands how to take that string from
the queue and resolve all your defined dependencies before calling `your_function`.

```
@runner.stage("stage_name")
def my_function(...)
    ...

## is equivalent to

def wrapped_function(task_json_spec: str) -> str (result_json):
    task = fetch_task(task_sped)
    resources = load_resources(task)
    pathway = your_function(resources)  # <- this is where your code goes
    task.proceed(pathway)
    return TaskCompletionResult.model_dump_json()
```

**Arguments**:

* `stage` - The name or uuid of the stage that the function should be
  associated with.
* `label_row_metadata_include_args` - Arguments to be passed to
  `project.list_label_rows_v2(...)`
* `label_row_initialise_labels_args` - Arguments to be passed to
  `label_row.initialise_labels(...)`
* `will_set_priority` - Indicates whether you will be returning a `TaskAgentReturnStruct`
  with a `label_row_priority` field set. This field is only required if you are
  returning the priority of the label row but not depending on the label row it self.
  That is, if your function signature does not include a `LabelRowV2` parameter.

**Returns**:

The decorated function.

#### get\_agent\_stages

```python theme={"dark"}
def get_agent_stages() -> Iterable[AgentStage]
```

Get the agent stages for which there exist an agent implementation.

This function is intended to make it easy to iterate through all current
agent tasks and put the task specs into external queueing systems like
Celery or Modal.

For a concrete example, please see the doc string for the class it self.

Note that if you didn't specify an implementation (by decorating your
function with `@runner.stage`) for a given agent stage, the stage will
not show up by calling this function.

**Returns**:

An iterable over `encord.workflow.stages.agent.AgentStage` objects
where the runner contains an agent implementation.

**Raises**:

* `AssertionError` - if the runner does not have an associated project.

## runner.runner\_base

### RunnerBase Objects

```python theme={"dark"}
class RunnerBase()
```

#### \_\_init\_\_

```python theme={"dark"}
def __init__(project_hash: str | UUID | None = None)
```

Initialize the runner with an optional project hash.

The `project_hash` will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.

**Arguments**:

* `project_hash` - The project hash that the runner applies to.

  Can be left unspecified to be able to reuse same runner on multiple projects.

## runner.sequential\_runner

### SequentialRunner Objects

```python theme={"dark"}
class SequentialRunner(RunnerBase)
```

Runs agents against Workflow projects.

When called, it will iteratively run agent stages until they are empty. By default, runner exits after finishing the tasks identified at the point of trigger. To automatically re-run, you can use the `refresh_every` keyword.

**Example:**

```python title="example_agent.py" theme={"dark"}
from uuid import UUID
from encord_agents.tasks import Runner
runner = Runner()

@runner.stage("<workflow_node_name>")
## or
@runner.stage("<workflow_node_uuid>")
def my_agent(task: AgentTask) -> str | UUID | None:
    ...
    return "pathway name"  # or pathway uuid

runner(project_hash="<project_hash>")  # (see __call__ for more arguments)
## or
if __name__ == "__main__":
    # for CLI usage: `python example_agent.py --project-hash "<project_hash>"`
    runner.run()
```

#### \_\_init\_\_

```python theme={"dark"}
def __init__(project_hash: str | None = None,
             *,
             pre_execution_callback: Callable[[Self], None] | None = None)
```

Initialize the runner with an optional project hash.

The `project_hash` will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.

**Arguments**:

* `project_hash` - The project hash that the runner applies to.

  Can be left unspecified to be able to reuse same runner on multiple projects.
* `pre_execution_callback` - Callable\[RunnerBase, None]

  Allows for optional additional validation e.g. Check specific Ontology form

#### stage

```python theme={"dark"}
def stage(
    stage: str | UUID,
    *,
    label_row_metadata_include_args: LabelRowMetadataIncludeArgs | None = None,
    label_row_initialise_labels_args: LabelRowInitialiseLabelsArgs
    | None = None,
    overwrite: bool = False,
    will_set_priority: bool = False
) -> Callable[[DecoratedCallable], DecoratedCallable]
```

Decorator to associate a function with an agent stage.

A function decorated with a stage is added to the list of stages
that will be handled by the runner.
The runner will call the function for every task which is in that
stage.

**Example:**

The function declaration can be any function that takes parameters
that are type annotated with the following types:

* \[Project]\[docs-project]: the `encord.project.Project`
  that the runner is operating on.
* \[LabelRowV2]\[docs-label-row]: the `encord.objects.LabelRowV2`
  that the task is associated with.
* \[AgentTask]\[docs-project]: the `encord.workflow.stages.agent.AgentTask`
  that the task is associated with.
* Any other type: which is annotated with a [dependency](/agents-documentation/Basics/Dependencies)

All those parameters will be automatically injected when the agent is called.

**Example:**

```python theme={"dark"}
runner = Runner()

@runner.stage("<stage_name_or_uuid>")
def my_func() -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"
```

```python theme={"dark"}
from typing import Iterator
from typing_extensions import Annotated

from encord.project import Project
from encord_agents.tasks import Depends
from encord_agents.tasks.dependencies import dep_video_iterator
from encord.workflow.stages.agent import AgentTask

runner = Runner()

def random_value() -> float:
    import random
    return random.random()

@runner.stage("<stage_name_or_uuid>")
def my_func(
    project: Project,
    lr: LabelRowV2,
    task: AgentTask,
    video_frames: Annotated[Iterator[Frame], Depends(dep_video_iterator)],
    custom: Annotated[float, Depends(random_value)]
) -> str | None:
    ...
    return "<pathway_name or pathway_uuid>"
```

**Arguments**:

* `stage` - The name or uuid of the stage that the function should be
  associated with.
* `label_row_metadata_include_args` - Arguments to be passed to
  `project.list_label_rows_v2(...)`
* `label_row_initialise_labels_args` - Arguments to be passed to
  `label_row.initialise_labels(...)`
* `overwrite` - Overwrite the method associated to this stage if it already exists
  will throw an error otherwise
* `will_set_priority` - Indicates whether you will be returning a `TaskAgentReturnStruct`
  with a `label_row_priority` field set. This field is only required if you are
  returning the priority of the label row but not depending on the label row it self.
  That is, if your function signature does not include a `LabelRowV2` parameter.

**Returns**:

The decorated function.

#### \_\_call\_\_

```python theme={"dark"}
def __call__(
    refresh_every: Annotated[
        Optional[int],
        Option(
            help=
            "Fetch task statuses from the Encord Project every `refresh_every` seconds. If `None`, the runner will exit once task queue is empty."
        ),
    ] = None,
    num_retries: Annotated[
        int,
        Option(
            help=
            "If an agent fails on a task, how many times should the runner retry it?"
        )] = 3,
    task_batch_size: Annotated[
        int,
        Option(
            help=
            "Number of tasks for which labels are loaded into memory at once."
        )] = 300,
    project_hash: Annotated[
        Optional[str],
        Option(help="The project hash if not defined at runner instantiation."
               )] = None,
    max_tasks_per_stage: Annotated[
        Optional[int],
        Option(
            help=
            "Max number of tasks to try to process per stage on a given run. If `None`, will attempt all",
        ),
    ] = None
) -> None
```

Run your agent `runner(...)`.

???+ info "Self-updating/Polling runner"
The runner can continuously poll new tasks in the project and execute the defined stage agents.
To do so, please set the `refresh_every` parameter.
When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.

**Arguments**:

* `refresh_every` - Fetch task statuses from the Encord Project every `refresh_every` seconds.
  If `None`, the runner will exit once task queue is empty.
* `num_retries` - If an agent fails on a task, how many times should the runner retry it?
* `task_batch_size` - Number of tasks for which labels are loaded into memory at once.
* `project_hash` - The project hash if not defined at runner instantiation.

**Returns**:

None

#### run

```python theme={"dark"}
def run() -> None
```

Execute the runner.

This function is intended to be called from the "main file".
It is an entry point to be able to run the agent(s) via your shell
with command line arguments.

**Example:**

You can then run execute the runner with:

to see the options is has (it's those from `Runner.__call__`).

```python title="example.py" theme={"dark"}
runner = Runner(project_hash="<your_project_hash>")

@runner.stage(stage="...")
def your_func() -> str:
    ...

if __name__ == "__main__":
    runner.run()
```

```shell theme={"dark"}
python example.py --help
```
