Return this from your agent and we will handle propagating the updates in batches
The pathway that the task will follow on task completion
The label to be saved (if present)
The priority of the label row to be saved.
Data model to hold information about the completion result of
encord_agents.tasks.QueueRunner
agents.
Dependency to provide an authenticated user client.
Example:
Get the storage item associated with the underlying agent task.
The StorageItem
is useful for multiple things like
Note: When marking a task agent with the StorageItem dependency, we will bulk fetch the storage items for the tasks and then inject them independently with each task. Trivial method for backwards compatibility. Can do: storage_item: StorageItem directly
Example
Arguments:
storage_item
- StorageItemReturns:
The storage item.
Dependency to inject the first frame of the underlying asset.
The downloaded asset will be named lr.data_hash.{suffix}
.
When the function has finished, the downloaded file will be removed from the file system.
Example:
Arguments:
storage_item
- The Storage item. Automatically injected (see example above).Returns:
Numpy array of shape [h, w, 3] RGB colors.
Dependency to inject a video frame iterator for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration
Intended use
Arguments:
storage_item
- Automatically injected Storage item dependency.Raises:
NotImplementedError
- Will fail for other data types than video.Yields:
An iterator.
Dependency to inject a video sampler for doing things over many frames. This will use OpenCV and the local backend on your machine. Decoding support may vary dependent on the video format, codec and your local configuration.
Arguments:
storage_item
- Automatically injected Storage item dependency.
Example:
Get a local file path to data asset temporarily stored till end of task execution.
This dependency will fetch the underlying data asset based on a signed url. It will temporarily store the data on disk. Once the task is completed, the asset will be removed from disk again.
Example:
Returns:
The path to the asset.
Raises:
ValueError
- if the underlying assets are not videos, images, or audio.EncordException
- if data type not supported by SDK yet.Dataclass to hold “label twin” information.
Dependency to link assets between two Projects. When your Runner
in running on
<project_hash_a>
, you can use this to get a Twin
of labels and the underlying
task in the “twin project” with <project_hash_b>
.
This is useful in situations like:
Example:
Arguments:
twin_project_hash
- The project has of the twin project (attached to the same datasets)
from which you want to load the additional data.init_labels
- If true, the label row will be initialized before calling the agent.include_task
- If true, the task
field of the Twin
will be populated. If population
fails, e.g., for non-workflow projects, the task will also be None.Returns:
The twin.
Raises:
encord.AuthorizationError
- if you do not have access to the project.Get a lookup to easily retrieve data rows and storage items associated with the given task.
!!! warning “Deprecated”
dep_data_lookup
is deprecated and will be removed in version 0.2.10.
Use dep_storage_item
instead for accessing storage items.
Migration Guide:
Arguments:
lookup
- The object that you can use to lookup data rows and storage items. Automatically injected.Returns:
The (shared) lookup object.
This class is intended to hold agent implementations. It makes it easy to put agent task specifications into a queue and then execute them in a distributed fashion.
Below is a template for how that would work.
Example:
Initialize the QueueRunner with a project hash.
This is the hash of the project that you want to run the tasks on.
Arguments:
project_hash
- The hash of the project to run the tasks on.Agent wrapper intended for queueing systems and distributed workloads.
Define your agent as you are used to with dependencies in the method declaration and return the pathway from the project workflow that the task should follow upon completion. The function will be wrapped in logic that does the following (in pseudo code):
When you have an encord.workflow.stages.agent.AgentTask
instance at hand, let’s call
it task
, then you can call your wrapped_function
with task.model_dump_json()
.
Similarly, you can put task.model_dump_json()
int a queue and read from that queue, e.g.,
from another instance/process, to execute wrapped_function
there.
As the pseudo code indicates, wrapped_function
understands how to take that string from
the queue and resolve all your defined dependencies before calling your_function
.
Arguments:
stage
- The name or uuid of the stage that the function should be
associated with.label_row_metadata_include_args
- Arguments to be passed to
project.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed to
label_row.initialise_labels(...)
will_set_priority
- Indicates whether you will be returning a TaskAgentReturnStruct
with a label_row_priority
field set. This field is only required if you are
returning the priority of the label row but not depending on the label row it self.
That is, if your function signature does not include a LabelRowV2
parameter.Returns:
The decorated function.
Get the agent stages for which there exist an agent implementation.
This function is intended to make it easy to iterate through all current agent tasks and put the task specs into external queueing systems like Celery or Modal.
For a concrete example, please see the doc string for the class it self.
Note that if you didn’t specify an implementation (by decorating your
function with @runner.stage
) for a given agent stage, the stage will
not show up by calling this function.
Returns:
An iterable over encord.workflow.stages.agent.AgentStage
objects
where the runner contains an agent implementation.
Raises:
AssertionError
- if the runner does not have an associated project.Initialize the runner with an optional project hash.
The project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
project_hash
- The project hash that the runner applies to.
Can be left unspecified to be able to reuse same runner on multiple projects.
Runs agents against Workflow projects.
When called, it will iteratively run agent stages till they are empty.
By default, runner will exit after finishing the tasks identified at the point of trigger.
To automatically re-run, you can use the refresh_every
keyword.
Example:
Initialize the runner with an optional project hash.
The project_hash
will allow stricter stage validation.
If left unspecified, errors will first be raised during execution of the runner.
Arguments:
project_hash
- The project hash that the runner applies to.
Can be left unspecified to be able to reuse same runner on multiple projects.
pre_execution_callback
- Callable[RunnerBase, None]
Allows for optional additional validation e.g. Check specific Ontology form
Decorator to associate a function with an agent stage.
A function decorated with a stage is added to the list of stages that will be handled by the runner. The runner will call the function for every task which is in that stage.
Example:
The function declaration can be any function that takes parameters that are type annotated with the following types:
encord.project.Project
that the runner is operating on.encord.objects.LabelRowV2
that the task is associated with.encord.workflow.stages.agent.AgentTask
that the task is associated with.All those parameters will be automatically injected when the agent is called.
Example:
Arguments:
stage
- The name or uuid of the stage that the function should be
associated with.label_row_metadata_include_args
- Arguments to be passed to
project.list_label_rows_v2(...)
label_row_initialise_labels_args
- Arguments to be passed to
label_row.initialise_labels(...)
overwrite
- Overwrite the method associated to this stage if it already exists
will throw an error otherwisewill_set_priority
- Indicates whether you will be returning a TaskAgentReturnStruct
with a label_row_priority
field set. This field is only required if you are
returning the priority of the label row but not depending on the label row it self.
That is, if your function signature does not include a LabelRowV2
parameter.Returns:
The decorated function.
Run your task agent runner(...)
.
???+ info “Self-updating/Polling runner”
The runner can continuously poll new tasks in the project and execute the defined stage agents.
To do so, please set the refresh_every
parameter.
When set, the runner will re-fetch tasks with at least that amount of time in between polls. If you set the time to, e.g., 1 second, but it takes 60 seconds to empty the task queue, the runner will poll again upon completion of the current task queue.
Arguments:
refresh_every
- Fetch task statuses from the Encord Project every refresh_every
seconds.
If None
, the runner will exit once task queue is empty.num_retries
- If an agent fails on a task, how many times should the runner retry it?task_batch_size
- Number of tasks for which labels are loaded into memory at once.project_hash
- The project hash if not defined at runner instantiation.Returns:
None
Execute the runner.
This function is intended to be called from the “main file”. It is an entry point to be able to run the agent(s) via your shell with command line arguments.
Example:
You can then run execute the runner with:
to see the options is has (it’s those from Runner.__call__
).