Workers add capability to an iii system. Each one contributes functions and triggers the engine can
route to. This page covers deploying and wiring workers into a project.Once connected, a worker exposes:
Functions, callable by function_id from anywhere in the system (see
Using iii / Functions).
Triggers it advertises, which other workers can bind their functions to (see
Using iii / Triggers).
For the full SDK surface each Worker can use when interacting with iii, see the complete SDK
reference by language: Node, Python,
Rust, or Browser.
iii worker init creates a new standalone worker from scratch. The command writes a
language-specific project directory with the iii SDK installed, an iii.worker.yaml manifest, and
example function and trigger registrations you can replace with your own.
# Interactive: prompts for the languageiii worker init my-worker# Fully scripted: pass --language to skip the promptiii worker init my-worker --language typescript
Supported languages: typescript (ts), javascript (js), python (py), rust (rs).The positional NAME is the target directory; pass --directory to override it.Re-running iii worker init on a directory that already holds an iii worker (ie. has
.iii/worker.ini) will make no changes to the worker.Worker init will fail by default when targeting a non-empty directory, use --allow-non-empty to
scaffold into any other non-empty directory.
To install an existing worker from the registry instead of scaffolding a new one, use iii worker add. See Using iii / Workers for the registry surface.
A worker connects to the engine over WebSocket. The convention is to set the engine URL via the
III_URL environment variable, but it can also be passed explicitly to register_worker. The
connection string is the only coupling between a worker and the iii instance it joins, so the worker
process can be deployed anywhere reachable on the network.
Node / TypeScript
Python
Rust
import { registerWorker } from "iii-sdk";const url = process.env.III_URL;if (!url) throw new Error("III_URL must be set");const worker = registerWorker(url, { workerName: "my-worker",});
import osfrom iii import register_worker, InitOptionsworker = register_worker( os.environ.get("III_URL"), InitOptions(worker_name="my-worker"),)
use iii_sdk::{InitOptions, WorkerMetadata, register_worker};let url = std::env::var("III_URL").expect("III_URL must be set");let worker = register_worker( &url, InitOptions { metadata: Some(WorkerMetadata { name: "my-worker".into(), ..Default::default() }), ..Default::default() },);
Workers transition through a small set of states after connecting:
connecting → connected → available / busy → disconnected. connecting is the WebSocket handshake.
connected means the Worker has joined the Engine’s registry. available and busy describe
whether the Worker is currently handling invocations. disconnected is the terminal state when the
WebSocket closes. The Engine tracks these transitions and surfaces them to other Workers and tooling
through its discovery functions, so the rest of the system can react.
To see what’s currently connected to the Engine, invoke one of the engine::*::list Functions to
get the current state of the registry. Each returns a list:
Function
What it returns
engine::workers::list
Every connected Worker with metrics.
engine::functions::list
Every registered Function. Filterable by include_internal.
engine::triggers::list
Every registered Trigger. Filterable by include_internal.
engine::trigger-types::list
Every advertised Trigger type with its config and call schemas.
When a Worker’s WebSocket closes, the Engine cleans up after it automatically. Its Functions and
Triggers leave the live registry, and any in-flight invocations of those Functions are cancelled.
In flight requests will get a invocation_stopped error, catch these errors and treat them like a
cancellation. Retrying will fail until the Worker that owns this function reconnects.
Example: catch `invocation_stopped`
Node / TypeScript
Python
Rust
import { IIIInvocationError } from "iii-sdk";try { const result = await worker.trigger({ function_id: "math::add", payload: { a: 1, b: 2 }, });} catch (err) { if (err instanceof IIIInvocationError && err.code === "invocation_stopped") { // Worker disconnected mid-invocation. Subscribe to `engine::functions-available` // (see "Subscribe to changes" below) to know when to retry. return; } throw err;}
from iii import IIIInvocationErrortry: result = worker.trigger({ "function_id": "math::add", "payload": {"a": 1, "b": 2}, })except IIIInvocationError as err: if err.code == "invocation_stopped": # Worker disconnected mid-invocation. Subscribe to `engine::functions-available` # (see "Subscribe to changes" below) to know when to retry. return raise
use iii_sdk::{IIIError, TriggerRequest};use serde_json::json;let result = worker .trigger(TriggerRequest { function_id: "math::add".into(), payload: json!({ "a": 1, "b": 2 }), action: None, timeout_ms: None, }) .await;match result { Err(IIIError::Remote { code, .. }) if code == "invocation_stopped" => { // Worker disconnected mid-invocation. Subscribe to `engine::functions-available` // (see "Subscribe to changes" below) to know when to retry. } Err(e) => return Err(e.into()), Ok(value) => { /* use value */ }}
You can register a Trigger against one of the engine’s discovery events to react to topology changes
as they happen. This is particularly useful for continuing work when a Worker comes back online.
Trigger
When it fires
engine::workers-available
A Worker connects or disconnects.
engine::functions-available
A Function is registered or unregistered.
Example: subscribe to discovery events
Node / TypeScript
Python
Rust
worker.registerFunction( "discovery::on-workers", async (data: { event: string; worker_id: string }) => { if (data.event === "worker_connected") { // A Worker just joined the registry; its Functions are callable now. } },);worker.registerTrigger({ type: "engine::workers-available", function_id: "discovery::on-workers", config: {},});worker.registerFunction( "discovery::on-functions", async (data: { event: string; functions: { function_id: string }[] }) => { // `functions` is the full snapshot after the change. const ids = data.functions.map((f) => f.function_id); },);worker.registerTrigger({ type: "engine::functions-available", function_id: "discovery::on-functions", config: {},});
async def on_workers(data: dict) -> None: if data["event"] == "worker_connected": # A Worker just joined the registry; its Functions are callable now. passworker.register_function("discovery::on-workers", on_workers)worker.register_trigger({ "type": "engine::workers-available", "function_id": "discovery::on-workers", "config": {},})async def on_functions(data: dict) -> None: # `functions` is the full snapshot after the change. ids = [f["function_id"] for f in data.get("functions", [])]worker.register_function("discovery::on-functions", on_functions)worker.register_trigger({ "type": "engine::functions-available", "function_id": "discovery::on-functions", "config": {},})
use iii_sdk::{RegisterFunction, RegisterTriggerInput};use schemars::JsonSchema;use serde::Deserialize;use serde_json::{Value, json};#[derive(Deserialize, JsonSchema)]struct WorkersAvailable { event: String, worker_id: String }#[derive(Deserialize, JsonSchema)]struct FunctionsAvailable { event: String, functions: Vec<Value> }worker.register_function(RegisterFunction::new_async( "discovery::on-workers", |input: WorkersAvailable| async move { if input.event == "worker_connected" { // A Worker just joined the registry; its Functions are callable now. } Ok::<_, String>(()) },));worker.register_trigger(RegisterTriggerInput { trigger_type: "engine::workers-available".into(), function_id: "discovery::on-workers".into(), config: json!({}), metadata: None,})?;worker.register_function(RegisterFunction::new_async( "discovery::on-functions", |input: FunctionsAvailable| async move { // `functions` is the full snapshot after the change. let _count = input.functions.len(); Ok::<_, String>(()) },));worker.register_trigger(RegisterTriggerInput { trigger_type: "engine::functions-available".into(), function_id: "discovery::on-functions".into(), config: json!({}), metadata: None,})?;
iii.worker.yaml is the manifest at the worker’s root that tells iii how to install dependencies,
run the worker, and pass through configuration. This applies to both the iii worker CLI commands
(e.g. start, stop, restart) and to workers
that iii starts automatically when they’re specified in iii’s
config.yaml.
The manifest is metadata about starting the Worker. Once the Worker is running iii treats them all
the same. A Worker started by iii via its config.yaml, via iii worker start, or a manually run
process that uses the iii SDK all behave identically with the Engine.
If a worker isn’t starting correctly then make sure to check its manifest and iii worker logs.
Call the SDK’s shutdown to close the WebSocket cleanly. The engine removes the worker’s Functions
and Triggers from the registry, fires engine::workers-available with worker_disconnected, and
cancels in-flight invocations targeting them with invocation_stopped.Without shutdown, an abrupt process exit reaches the same state once the engine notices the
dropped socket; graceful shutdown makes it deterministic and faster.
// Rust threads do not keep the process alive on their own; await this// before `main` returns so the connection thread exits cleanly.worker.shutdown_async().await;
Shutdown is very useful for One-shot / ephemeral workers. Kubernetes Jobs, serverless
containers, or scheduled scripts can connect just like any other Worker, do their work, and
shutdown() (shutdown_async().await in Rust).