> ## Documentation Index
> Fetch the complete documentation index at: https://docs.encord.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Sequential Runner

The `Runner` executes tasks sequentially, making it ideal for debugging and testing workflows. Use it for simple workflows or for testing functionality before scaling up with the [`QueueRunner`](/agents-documentation/Task-Agents/Execution/Multi-Processing).

## Basic Usage

The basic usage pattern of the `Runner` follows three steps:

1. Initialize the runner
2. Implement the logic for each stage in your Workflow you want to capture with the runner
3. Execute the runner

The following example shows how to initialize the runner and implement the logic for each stage in your Workflow you want to capture with the runner.

```python title="example_agent.py" theme={"dark"}
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks import Runner
from encord_agents.tasks.models import TaskAgentReturnStruct

# Step 1: Initialization
# Initialize the runner
# Project hash is optional but allows you to "fail fast" if stages are misconfigured
runner = Runner(project_hash="<your_project_id>")

# Step 2: Definition
# Define agent logic for a specific stage
@runner.stage(stage="my_stage_name")  # or stage="<stage_uuid>"
def process_task(label_row: LabelRowV2) -> TaskAgentReturnStruct:
    # Modify the label row as needed
    label_row.set_priority(0.5)

    # Return the pathway name or UUID where the task should go next
    # TaskAgentReturnStruct batches calls to the server
    return TaskAgentReturnStruct(pathway="Next Stage", label_row=label_row)

# Step 3: Execution
if __name__ == "__main__":
    # using the CLI
    runner.run()

    # or using code
    runner(
        project_hash="<your_project_id>",
        refresh_every= 3600,  # seconds
        num_retries = 1,
        task_batch_size = 50,
    )
```

To execute the runner via the CLI, you can do:

```shell theme={"dark"}
# simple
python example_agent.py --project-hash <your_project_id>
# use help for additional configurations
python example_agent.py --help
```

## Running Agents

### Basic Execution

```python theme={"dark"}
runner.run()  # will run the runner as CLI tool
runner()      # will run the runner directly
```

Both options:

1. Connect to your Encord project
2. Poll for tasks in the configured stages
3. Execute your agent functions on each task
4. Move tasks according to returned pathway
5. Retry failed tasks up to `num_retries` times

See the [configuration options](.#runtime-configuration) below.

### Command Line Interface

The runner exposes configuration via CLI:

```bash theme={"dark"}
python my_agent.py \
    --project-hash "<project_id>" \
    --task-batch-size 50 \
    --num-retries 3
    --refresh-every 3600 # seconds
```

### Order of execution

The runner processes tasks by emptying the queue for `"stage_1"` first, then successively emptying the queue for `"stage_2"`. If you set the `refresh_every` argument, the runner re-polls both queues after emptying the initial set. This ensures data that arrived in the queue after the initial poll is picked up in the subsequent iteration. If an execution's time already exceeds the `refresh_every` threshold, the agent instantly polls for new tasks.

To illustrate the order of execution, see the pseudo-code below.

```python theme={"dark"}
# ⚠️  PSEUDO CODE - not intended for copying ⚠️
def execute(self, refresh_every = None):
    timestamp = datetime.now()
    while True:
        # self.agents ≈ [stage_1, stage_2]
        for agent in self.agents:  
            for task in agent.get_tasks():
                # Inject params based on task
                stage.execute(solve_dependencies(task, agent))  

        if refresh_every is None:
            break
        else:
            # repeat after timestamp + timedelta(seconds=refresh_every)
            # or straight away if already exceeded
            ...
```

### Error Handling

The runner:

* Retries failed tasks up to `num_retries` times (default: 3). Changes to the label row are not rolled back.
* Logs errors for debugging
* Continues processing other tasks if a task fails
* Bundles updates for better performance (configurable via `task_batch_size`)

## Configuration

### Initialization

```python theme={"dark"}
encord_agents.tasks.runner.Runner.__init__
    options:
        show_if_no_docstring: false
        show_subodules: false
```

### Runtime Configuration

There are two ways to execute the runner.

1. Either run the runner directly from your code:

```python theme={"dark"}
runner = Runner()
runner(project_hash="<your_project_id>")  # See available parameters below
```

2. Or run it using the command-line interface (CLI) by employing the `runner.run()` function.

Suppose you have an `example.py` file that looks like this:

```python title="example.py" theme={"dark"}
...
runner = Runner()
...
if __name__ == "__main__":
    runner.run()
```

Then, the runner functions as a CLI tool, accepting the same arguments as when executed in code.

```shell theme={"dark"}
$ python example.py --help

 Usage: example.py [OPTIONS]

 Execute the runner.

╭─ Options ──────────────────────────────────────────────────────────╮
│ --refresh-every   INTEGER  Fetch task statuses from the Encord     │
│                            Project every `refresh_every` seconds.  │
│                            If `None`, the runner will exit once    │
│                            task queue is empty.                    │
│                            [default: None]                         │
│ --num-retries     INTEGER  If an agent fails on a task, how many   │
│                            times should the runner retry it?       │
│                            [default: 3]                            │
│ --task-batch-size INTEGER  Number of tasks for which labels are    │
│                            loaded into memory at once.             │
│                            [default: 300]                          │
│ --project-hash    TEXT     The project id if not defined at      │
│                            runner instantiation.                   │
│                            [default: None]                         │
│ --help                     Show this message and exit.             │
╰────────────────────────────────────────────────────────────────────╯
```

### Performance Considerations

By default, the Runner [bundles](/sdk-documentation/sdk-references/http.bundle#bundle) task updates in batches of 300 for better performance. For debugging or when immediate updates are required, you can set `task_batch_size` to 1.

```shell theme={"dark"}
# Via CLI
python my_agent.py --task-batch-size 1
```

Or in code

```python theme={"dark"}
runner(task_batch_size=10)
```

To speed up your agent's label row updates, you can return a `struct` object. Encord then saves the results in batches.  This minimizes the time your agent spends writing data. To use this feature, write your code to return a struct object instead of performing individual saves.

<Info>
  When the `TaskAgentReturnStruct` is returned, [bundle](/sdk-documentation/sdk-references/http.bundle#bundle) is used to batch label row updates, significantly improving performance.
</Info>

```python theme={"dark"}
from encord_agents.tasks.models import TaskAgentReturnStruct
from encord.storage import StorageItem
from encord.objects import LabelRowV2

@runner.stage("Agent Stage")
def agent_stage(label_row: LabelRowV2, storage_item: StorageItem) -> TaskAgentReturnStruct:
    # Modify the label row in any manner
    # ...
    return TaskAgentReturnStruct(pathway="Modified label", label_row=label_row)
```

## Scaling with the `QueueRunner`

The **QueueRunner** is an advanced runner designed for parallel processing of multiple tasks, ideal for speeding up execution of large task volumes.

Both the **Runner** and **QueueRunner** share the same interface. The primary distinction lies in their execution:

* The **Runner** executes tasks sequentially using its `run()` function.
* The **QueueRunner** converts your implementations into functions that accept a task specification as a JSON string and return a `encord_agents.tasks.models.TaskCompletionResult` as a JSON string. This stringified JSON format is necessary for passing messages over queues, which typically do not support custom object types.

Here's an example of how this difference manifests:

<CodeGroup>
  ```python Sequential Runner theme={"dark"}
  runner = Runner()
  @runner.stage("my_stage")
  def my_agent(task: AgentTask, label_row: LabelRowV2):
      ...
  runner()
  ```

  ```python Queue Runner theme={"dark"}
  queue_runner = QueueRunner()  # Change the runner to the queue runner

  # The implementation stays the same
  @queue_runner.stage("my_stage")
  def my_agent(task: AgentTask, label_row: LabelRowV2):
      ...

  # Change the execution to use the queue runner
  for agent in queue_runner.get_agent_stages():
      your_task_queue = []
      for task in agent.get_tasks():
          your_task_queue.append(task)

      for task in your_task_queue:
          result = my_agent(task)
  ```
</CodeGroup>

<Tip>
  Refer to the [Celery example](/agents-documentation/Task-Agents/Execution/Multi-Processing#celery-integration) or [Modal example](/agents-documentation/Task-Agents/Execution/Multi-Processing#modal-integration) for more information.
</Tip>

## Comparison with Queue Runner

The key differences between `QueueRunner` and the sequential `Runner` are:

| Feature               | [Runner](/agents-documentation/Task-Agents/Execution/Sequential-Runner) | [QueueRunner](/agents-documentation/Task-Agents/Execution/Multi-Processing) |
| --------------------- | ----------------------------------------------------------------------- | --------------------------------------------------------------------------- |
| **Execution Model**   | Executes tasks sequentially in a single process                         | Designed for distributed execution across multiple processes                |
| **Project Hash**      | Optional at initialization                                              | Required at initialization                                                  |
| **Function Wrapping** | Executes your function directly with injected dependencies              | Additionally wraps your function to handle JSON task specifications         |
| **Execution Control** | Handles task polling and execution                                      | You control task distribution and execution through your queue system       |
| **Scaling**           | Not suitable for scaling                                                | Suitable for scaling                                                        |
