The Runner
executes tasks in a sequential order. It is useful for debugging and testing the Workflow. Use this for simple workflows or for testing out functionality before you scale compute it with the QueueRunner
.
Basic Usage
The basic usage pattern of the Runner
follows three steps:
- Initialize the runner
- Implement the logic for each stage in your Workflow you want to capture with the runner
- Execute the runner
The following example shows how to initialize the runner and implement the logic for each stage in your Workflow you want to capture with the runner.
from encord.objects.ontology_labels_impl import LabelRowV2
from encord_agents.tasks import Runner
# Step 1: Initialization
# Initialize the runner
# Project hash is optional but allows you to "fail fast"
# if you misconfigure the stages.
runner = Runner(project_hash="<your_project_hash>")
# Step 2: Definition
# Define agent logic for a specific stage
@runner.stage(stage="my_stage_name") # or stage="<stage_uuid>"
def process_task(lr: LabelRowV2) -> str | None:
# Modify the label row as needed
lr.set_priority(0.5)
# Return the pathway name or UUID where the task should go next
return "next_stage"
# Step 3: Execution
if __name__ == "__main__":
# via the CLI
runner.run()
# or via code
runner(
project_hash="<your_project_hash">,
refresh_every=3600, # seconds
num_retries = 1,
task_batch_size = 1,
)
To execute the runner via the CLI, you can do:
# simple
python example_agent.py --project-hash <your_project_hash>
# use help for additional configurations
python example_agent.py --help
Running Agents
Basic Execution
runner.run() # will run the runner as CLI tool
runner() # will run the runner directly
Both options:
- Connect to your Encord project
- Poll for tasks in the configured stages
- Execute your agent functions on each task
- Move tasks according to returned pathway
- Retry failed tasks up to
num_retries
times
See the configuration options below.
Command Line Interface
The runner exposes configuration via CLI:
python my_agent.py \
--project-hash "<project_hash>" \
--task-batch-size 1 \
--num-retries 3
--refresh-every 3600 # seconds
Order of execution
The runner processes tasks by emptying the queue for "stage_1"
first, then successively emptying the queue for "stage_2"
. If you set the refresh_every
argument, the runner repolls both queues after emptying the initial set. This ensures data that arrived in the queue after the initial poll is picked up in the subsequent iteration. If an execution’s time already exceeds the refresh_every
threshold, the agent instantly polls for new tasks.
To illustrate the order of execution, see the pseudo-code below.
# ⚠️ PSEUDO CODE - not intended for copying ⚠️
def execute(self, refresh_every = None):
timestamp = datetime.now()
while True:
# self.agents ≈ [stage_1, stage_2]
for agent in self.agents:
for task in agent.get_tasks():
# Inject params based on task
stage.execute(solve_dependencies(task, agent))
if refresh_every is None:
break
else:
# repeat after timestamp + timedelta(seconds=refresh_every)
# or straight away if already exceeded
...
Error Handling
The runner:
- Retries failed tasks up to
num_retries
times (default: 3). Changes to the label row are not rolled back.
- Logs errors for debugging
- Continues processing other tasks if a task fails
- Bundles updates for better performance (configurable via
task_batch_size
)
Configuration
Initialization
::: encord_agents.tasks.runner.Runner.init
options:
show_if_no_docstring: false
show_subodules: false
Runtime Configuration
There are two ways to execute the runner.
- Either run the runner directly from your code:
...
runner = Runner()
...
runner(project_hash="<your_project_hash>") # See all params below 👇
- Or run it using the command-line interface (CLI) by employing the
runner.run()
function.
Suppose you have an example.py
file that looks like this:
...
runner = Runner()
...
if __name__ == "__main__":
runner.run()
Then, the runner functions as a CLI tool, accepting the same arguments as when executed in code.
$ python example.py --help
Usage: example.py [OPTIONS]
Execute the runner.
╭─ Options ──────────────────────────────────────────────────────────╮
│ --refresh-every INTEGER Fetch task statuses from the Encord │
│ Project every `refresh_every` seconds. │
│ If `None`, the runner will exit once │
│ task queue is empty. │
│ [default: None] │
│ --num-retries INTEGER If an agent fails on a task, how many │
│ times should the runner retry it? │
│ [default: 3] │
│ --task-batch-size INTEGER Number of tasks for which labels are │
│ loaded into memory at once. │
│ [default: 300] │
│ --project-hash TEXT The project hash if not defined at │
│ runner instantiation. │
│ [default: None] │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────╯
By default, the Runner bundles task updates for better performance with a batch size of 300. For debugging or when immediate updates are needed, you can set task_batch_size=1:
# Via CLI
python my_agent.py --task-batch-size 1
Or in code
runner(task_batch_size=1)
Scaling with the QueueRunner
The QueueRunner is an advanced runner designed for parallel processing of multiple tasks, ideal for speeding up execution of large task volumes.
Both the Runner and QueueRunner share the same interface. The primary distinction lies in their execution:
- The Runner executes tasks sequentially using its
run()
function.
- The QueueRunner converts your implementations into functions that accept a task specification as a JSON string and return a
encord_agents.tasks.models.TaskCompletionResult
as a JSON string. This stringified JSON format is necessary for passing messages over queues, which typically do not support custom object types.
Here’s an example of how this difference manifests:
=== “The (sequential) Runner
”
runner = Runner()
@runner.stage("my_stage")
def my_agent(task: AgentTask, label_row: LabelRowV2):
...
runner()
=== “The (parallel) QueueRunner
”
queue_runner = QueueRunner() # Change the runner to the queue runner
# The implementation stays the same
@queue_runner.stage("my_stage")
def my_agent(task: AgentTask, label_row: LabelRowV2):
...
# Change the execution to use the queue runner
for agent in queue_runner.get_agent_stages():
your_task_queue = []
for task in agent.get_tasks():
your_task_queue.append(task)
for task in your_queue:
result = my_agent(task)
Please refer to the Celery example or Modal example for more information.
Comparison with Queue Runner
The key differences between QueueRunner
and the sequential Runner
are:
Feature | Runner | QueueRunner |
---|
Execution Model | Executes tasks sequentially in a single process | Designed for distributed execution across multiple processes |
Project Hash | Optional at initialization | Required at initialization |
Function Wrapping | Executes your function directly with injected dependencies | Additionally wraps your function to handle JSON task specifications |
Execution Control | Handles task polling and execution | You control task distribution and execution through your queue system |
Scaling | Not suitable for scaling | Suitable for scaling |