Skip to main content

Statements Reference

Statements are the building blocks of Moco workflows. Every body in a workflowspec is a statement. Statements fall into two categories: primitives (leaf nodes that do work) and composites (containers that orchestrate other statements).

For full workflowspec context, see the Workflowspec Reference.

Common Parameters

All statements support these optional parameters:

ParameterTypeDescription
namestringUnique identifier for the statement
descriptionstringHuman-readable description
conditionexpression or listSkip this statement if the expression is falsy
output_namestringVariable to store the statement result
output_datalistData transformations to apply after execution

Primitive Statements

transform

Evaluates expressions and assigns variables. The primary way to compute or reshape data.

- transform:
input_data:
- temp: "{{ price * 1.08 }}"
output_data:
- total: "{{ temp }}"
- message: "Total is {{ total }}"
ParameterDescription
input_dataVariable assignments evaluated before output_data
output_dataMain transformation assignments

abort

Terminates or breaks execution with different behaviors.

- abort:
condition: "{{ price < 0 }}"
type: raise
message: "Invalid price: {{ price }}"
TypeBehavior
abortAbort the entire workflow
terminateGracefully terminate the workflow
breakBreak out of the current sequence or parallel block
break_iterationBreak out of the current iteration loop
raiseRaise an error and fail the workflow

activity

Executes a registered activity (HTTP call, database query, custom function, etc.).

- activity:
type: builtin.http_request
input_data:
method: POST
url: https://api.example.com/orders
body:
order_id: "{{ order_id }}"
output_name: api_response
timeout_sec: 30
max_retry_attempts: 3
ParameterDescription
typeActivity type identifier (required)
versionActivity version (default: 1.0.0)
config_dataStatic configuration, evaluated once at workflow start
input_dataDynamic input, evaluated each time the activity runs
output_nameVariable to store the activity result
timeout_secExecution timeout in seconds
max_retry_attemptsNumber of retries on failure
execute_locallyForce local execution, bypassing Temporal
enable_cacheEnable result caching
cache_policyCache configuration (TTL, key)

Built-in activities: builtin.http_request, builtin.delay, builtin.now, builtin.emit_workflow_event.


workflow

Executes a child workflow by reference or inline definition.

- workflow:
wfspec:
name: process-order
version: 1.0.0
child_mode: sync
input_data:
order_id: "{{ order_id }}"
output_name: order_result
Child ModeBehavior
inlineRuns in the parent's context, shares variables (default)
syncRuns independently; parent waits for the result
asyncRuns independently; parent waits only for start, gets workflow_id
detachedRuns completely independently; parent doesn't wait

To define a workflow inline instead of by name:

- workflow:
wfspec:
content:
wfspec_name: inline-helper
wfspec_version: 1.0.0
input_data:
x:
output_name: result
body:
transform:
output_data:
- result: "{{ x * 2 }}"
child_mode: inline
input_data:
x: 21
output_name: doubled

wait_for

Waits for an event matching filter criteria, or until a timeout.

- wait_for:
event:
topic: order_events
match_expression: >
{{ event.data.get('order_id') == order_id and
event.data.get('status') == 'completed' }}
timeout_sec: 60
output_name: completion_event
ParameterDescription
event.topicEvent topic to subscribe to
event.match_expressionPython expression to filter incoming events (event variable is the event object)
timeout_secMaximum wait time in seconds (required)
output_nameVariable to store the received event

emit_event

Emits an event to the event bus.

- emit_event:
input_data:
topic: notification_events
data:
type: order_created
order_id: "{{ order_id }}"
target_workflow_id: "{{ parent_id }}"
metadata:
priority: high
ParameterDescription
topicEvent topic (required)
dataEvent payload (required)
target_workflow_idRoute the event to a specific workflow (optional)
metadataAdditional event metadata (optional)

continue_as_new_if_suggested

Checks whether the runtime suggests restarting the workflow (e.g., Temporal event history nearing its size limit). If suggested, serializes workflow state and restarts execution from the beginning with the preserved state.

This is a no-op in the in-memory runtime. In Temporal, it triggers a continue-as-new when the SDK signals that history is getting large.

- continue_as_new_if_suggested:
name: checkpoint-after-processing
serialize_data_context: true
ParameterTypeDefaultDescription
namestringnullOptional identifier for logging
serialize_data_contextbooleantrueWhether to include data context variables in the serialized state
conditionexpressionnullSkip this statement if the expression is falsy

Composite Statements

sequence

Executes statements one after another.

sequence:
elements:
- transform:
output_data:
- status: "validating"
- activity:
type: builtin.http_request
input_data:
url: https://api.example.com/validate
output_name: validation
- abort:
condition: "{{ not validation.valid }}"
type: raise
message: "Validation failed"

parallel

Executes statements concurrently with configurable join semantics.

- parallel:
join_type: and
elements:
- activity:
name: fetch-user
type: builtin.http_request
input_data:
url: https://api.example.com/users/{{ user_id }}
output_name: user_data
- activity:
name: fetch-orders
type: builtin.http_request
input_data:
url: https://api.example.com/orders?user={{ user_id }}
output_name: order_data
Join TypeBehavior
andAll branches must succeed
orAt least one branch must succeed

iteration

Loops over a collection, either sequentially or in parallel.

- iteration:
iter_type: sequence
input_data: "{{ items }}"
body:
activity:
type: process-item
input_data:
item: "{{ iter_item }}"
ParameterDescription
iter_typesequence (sequential) or parallel (concurrent)
input_dataCollection to iterate over (list, dict.items(), range(), etc.)
bodyStatement executed for each item
join_typeFor parallel iteration: and or or

Special variables available inside the body:

  • iter_item — the current item
  • iter_items — all items in the collection

Use abort with type: break_iteration to exit the loop early.


state_machine

Event-driven finite state machine. See State Machines Reference for full documentation.

- state_machine:
name: order-fsm
initial_state: pending
timeout_sec: 300
states:
- name: pending
- name: processing
- name: completed
is_terminal: true
transitions:
- from_state: pending
to_state: processing
trigger:
event_name: start

Next Steps