Eino Human-in-the-Loop Framework: Technical Architecture Guide

Overview

This document provides technical details of Eino’s human-in-the-loop (HITL) framework architecture, focusing on the interrupt/resume mechanism and the underlying addressing system.

Requirements for Human-in-the-Loop

The diagram below illustrates the key questions that each component must answer during the interrupt/resume process. Understanding these requirements is crucial for grasping the rationale behind the architecture design.

graph TD
    subgraph P1 [Interrupt Phase]
        direction LR
        subgraph Dev1 [Developer]
            direction TB
            D1[Should I interrupt now?<br/>Was I interrupted before?]
            D2[What info should the user<br/>see about this interrupt?]
            D3[What state should I preserve<br/>for later resumption?]
            D1 --> D2 --> D3
        end
        
        subgraph Fw1 [Framework]
            direction TB
            F1[Where in the execution hierarchy<br/>did the interrupt occur?]
            F2[How to associate state with<br/>the interrupt location?]
            F3[How to persist interrupt<br/>context and state?]
            F4[What info does the user need<br/>to understand the interrupt?]
            F1 --> F2 --> F3 --> F4
        end
        
        Dev1 --> Fw1
    end
    
    subgraph P2 [User Decision Phase]
        direction TB
        subgraph "End User"
            direction TB
            U1[Where in the flow did<br/>the interrupt occur?]
            U2[What type of info did<br/>the developer provide?]
            U3[Should I resume this<br/>interrupt?]
            U4[Should I provide data<br/>for resumption?]
            U5[What type of resume data<br/>should I provide?]
            U1 --> U2 --> U3 --> U4 --> U5
        end
    end
    
    
    subgraph P3 [Resume Phase]
        direction LR   
        subgraph Fw2 [Framework]
            direction TB
            FR1[Which entity is interrupting<br/>and how to re-run it?]
            FR2[How to restore context for<br/>the interrupted entity?]
            FR3[How to route user data<br/>to the interrupted entity?]
            FR1 --> FR2 --> FR3
        end
        
        subgraph Dev2 [Developer]
            direction TB
            DR1[Am I the explicit<br/>resume target?]
            DR2[If not target, should I<br/>re-interrupt to continue?]
            DR3[What state did I preserve<br/>at interrupt time?]
            DR4[If user resume data is provided,<br/>how to handle it?]
            DR1 --> DR2 --> DR3 --> DR4
        end
        
        Fw2 --> Dev2
    end
    
    P1 --> P2 --> P3
    
    classDef dev fill:#e1f5fe
    classDef fw fill:#f3e5f5
    classDef user fill:#e8f5e8
    
    class D1,D2,D3,DR1,DR2,DR3,DR4 dev
    class F1,F2,F3,F4,FR1,FR2,FR3 fw
    class U1,U2,U3,U4,U5 user

Therefore, our goals are:

  1. Help developers answer the above questions as easily as possible.
  2. Help end users answer the above questions as easily as possible.
  3. Enable the framework to automatically answer the above questions out of the box.

Quick Start

We demonstrate the functionality with a simple ticket booking Agent. This Agent requests “confirmation” from the user before actually completing the booking, and users can “approve” or “reject” the booking operation. The complete code for this example is at: https://github.com/cloudwego/eino-examples/tree/main/adk/human-in-the-loop/1_approval

  1. Create a ChatModelAgent and configure a Tool for booking tickets.
import (
    "context"
    "fmt"
    "log"

    "github.com/cloudwego/eino/adk"
    "github.com/cloudwego/eino/components/tool"
    "github.com/cloudwego/eino/components/tool/utils"
    "github.com/cloudwego/eino/compose"

    "github.com/cloudwego/eino-examples/adk/common/model"
    tool2 "github.com/cloudwego/eino-examples/adk/common/tool"
)

func NewTicketBookingAgent() adk.Agent {
    ctx := context.Background()

    type bookInput struct {
       Location             string `json:"location"`
       PassengerName        string `json:"passenger_name"`
       PassengerPhoneNumber string `json:"passenger_phone_number"`
    }

    getWeather, err := utils.InferTool(
       "BookTicket",
       "this tool can book ticket of the specific location",
       func(ctx context.Context, input bookInput) (output string, err error) {
          return "success", nil
       })
    if err != nil {
       log.Fatal(err)
    }

    a, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
       Name:        "TicketBooker",
       Description: "An agent that can book tickets",
       Instruction: `You are an expert ticket booker.
Based on the user's request, use the "BookTicket" tool to book tickets.`,
       Model: model.NewChatModel(),
       ToolsConfig: adk.ToolsConfig{
          ToolsNodeConfig: compose.ToolsNodeConfig{
             Tools: []tool.BaseTool{
                // InvokableApprovableTool is a tool decorator provided by eino-examples
                // that adds "approval interrupt" functionality to any InvokableTool
                &tool2.InvokableApprovableTool{InvokableTool: getWeather},
             },
          },
       },
    })
    if err != nil {
       log.Fatal(fmt.Errorf("failed to create chatmodel: %w", err))
    }

    return a
}
  1. Create a Runner, configure CheckPointStore, and run it, passing in a CheckPointID. Eino uses CheckPointStore to save the Agent’s running state when interrupted. Here we use InMemoryStore, which stores in memory. In actual use, distributed storage like redis is recommended. Additionally, Eino uses CheckPointID to uniquely identify and link the “before interrupt” and “after interrupt” runs (or multiple runs).
a := NewTicketBookingAgent()
runner := adk.NewRunner(ctx, adk.RunnerConfig{
    EnableStreaming: true, // you can disable streaming here
    Agent:           a,

    // provide a CheckPointStore for eino to persist the execution state of the agent for later resumption.
    // Here we use an in-memory store for simplicity.
    // In the real world, you can use a distributed store like Redis to persist the checkpoints.
    CheckPointStore: store.NewInMemoryStore(),
})
iter := runner.Query(ctx, "book a ticket for Martin, to Beijing, on 2025-12-01, the phone number is 1234567. directly call tool.", adk.WithCheckPointID("1"))
  1. Get interrupt information from AgentEvent event.Action.Interrupted.InterruptContexts[0].Info, which in this case is “about to book which train for whom, approve or not”. You’ll also get an InterruptID (event.Action.Interrupted.InterruptContexts[0].ID), which the Eino framework uses to identify “where the interrupt occurred”. Here it’s printed directly to the terminal; in actual use, it may need to be returned to the frontend as an HTTP response.
var lastEvent *adk.AgentEvent
for {
    event, ok := iter.Next()
    if !ok {
       break
    }
    if event.Err != nil {
       log.Fatal(event.Err)
    }

    prints.Event(event)

    lastEvent = event
}

// this interruptID is crucial 'locator' for Eino to know where the interrupt happens,
// so when resuming later, you have to provide this same `interruptID` along with the approval result back to Eino
interruptID := lastEvent.Action.Interrupted.InterruptContexts[0].ID
  1. Display interrupt information to the user and receive the user’s response, such as “approve”. In this example, both display to user and receiving user input happen on the local terminal. In actual applications, it might use a ChatBot for input/output.
var apResult *tool.ApprovalResult
for {
    scanner := bufio.NewScanner(os.Stdin)
    fmt.Print("your input here: ")
    scanner.Scan()
    fmt.Println()
    nInput := scanner.Text()
    if strings.ToUpper(nInput) == "Y" {
       apResult = &tool.ApprovalResult{Approved: true}
       break
    } else if strings.ToUpper(nInput) == "N" {
       // Prompt for reason when denying
       fmt.Print("Please provide a reason for denial: ")
       scanner.Scan()
       reason := scanner.Text()
       fmt.Println()
       apResult = &tool.ApprovalResult{Approved: false, DisapproveReason: &reason}
       break
    }

    fmt.Println("invalid input, please input Y or N")
}

Sample output:

name: TicketBooker
path: [{TicketBooker}]
tool name: BookTicket
arguments: {"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}

name: TicketBooker
path: [{TicketBooker}]
tool 'BookTicket' interrupted with arguments '{"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}', waiting for your approval, please answer with Y/N

your input here: Y
  1. Call Runner.ResumeWithParams, passing in the same InterruptID and the data for resumption, which here is “approve”. In this example, the initial Runner.Query and the subsequent Runner.ResumeWithParams are in the same instance. In real scenarios, it might be two requests from a ChatBot frontend hitting two different server instances. As long as the CheckPointID is the same for both, and the CheckPointStore configured for Runner is distributed storage, Eino can achieve cross-instance interrupt recovery.
// here we directly resumes right in the same instance where the original `Runner.Query` happened.
// In the real world, the original `Runner.Run/Query` and the subsequent `Runner.ResumeWithParams`
// can happen in different processes or machines, as long as you use the same `CheckPointID`,
// and you provided a distributed `CheckPointStore` when creating the `Runner` instance.
iter, err := runner.ResumeWithParams(ctx, "1", &adk.ResumeParams{
    Targets: map[string]any{
       interruptID: apResult,
    },
})
if err != nil {
    log.Fatal(err)
}
for {
    event, ok := iter.Next()
    if !ok {
       break
    }

    if event.Err != nil {
       log.Fatal(event.Err)
    }

    prints.Event(event)
}

Complete sample output:

name: TicketBooker
path: [{TicketBooker}]
tool name: BookTicket
arguments: {"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}

name: TicketBooker
path: [{TicketBooker}]
tool 'BookTicket' interrupted with arguments '{"location":"Beijing","passenger_name":"Martin","passenger_phone_number":"1234567"}', waiting for your approval, please answer with Y/N

your input here: Y

name: TicketBooker
path: [{TicketBooker}]
tool response: success

name: TicketBooker
path: [{TicketBooker}]
answer: The ticket for Martin to Beijing on 2025-12-01 has been successfully booked. If you need any more assistance, feel free
 to ask!

More Examples

Architecture Overview

The following flowchart illustrates the high-level interrupt/resume process:

flowchart TD
    U[End User]
    
    subgraph R [Runner]
        Run
        Resume
    end
    
    U -->|Initial Input| Run
    U -->|Resume Data| Resume
    
    subgraph E [(Arbitrarily Nested) Entities]
        Agent
        Tool
        ...
    end
    
    subgraph C [Run Context]
        Address
        InterruptState
        ResumeData
    end
    
    Run -->|Any number of transfer / call| E
    R <-->|Store/Restore| C
    Resume -->|Replay transfer / call| E
    C -->|Automatically assigned to| E

The following sequence diagram shows the chronological interaction flow between the three main participants:

sequenceDiagram
    participant D as Developer
    participant F as Framework
    participant U as End User
    
   
    Note over D,F: 1. Interrupt Phase
    D->>F: Call StatefulInterrupt()<br>specify info and state
    F->>F: Persist InterruptID->{address, state}
    
   
    Note over F,U: 2. User Decision Phase
    F->>U: Emit InterruptID->{address, info}
    U->>U: Decide InterruptID->{resume data}
    U->>F: Call TargetedResume()<br>provide InterruptID->{resume data}
    
   
    Note over D,F: 3. Resume Phase
    F->>F: Route to interrupted entity
    F->>D: Provide state and resume data
    D->>D: Handle resumption

ADK Package API

The ADK package provides high-level abstractions for building interruptible agents with human-in-the-loop capabilities.

1. APIs for Interrupting

Interrupt

Creates a basic interrupt action. Use when an agent needs to pause execution to request external input or intervention but doesn’t need to save any internal state for resumption.

func Interrupt(ctx context.Context, info any) *AgentEvent

Parameters:

  • ctx: The context of the running component.
  • info: User-facing data describing the reason for the interrupt.

Returns: *AgentEvent with interrupt action.

Example:

// Inside an agent's Run method:

// Create a simple interrupt to request clarification.
return adk.Interrupt(ctx, "User query is unclear, please clarify.")

StatefulInterrupt

Creates an interrupt action while also saving the agent’s internal state. Use when an agent has internal state that must be restored to continue correctly.

func StatefulInterrupt(ctx context.Context, info any, state any) *AgentEvent

Parameters:

  • ctx: The context of the running component.
  • info: User-facing data describing the interrupt.
  • state: The agent’s internal state object, which will be serialized and stored.

Returns: *AgentEvent with interrupt action.

Example:

// Inside an agent's Run method:

// Define state to save.
type MyAgentState struct {
    ProcessedItems int
    CurrentTopic   string
}

currentState := &MyAgentState{
    ProcessedItems: 42,
    CurrentTopic:   "HITL",
}

// Interrupt and save current state.
return adk.StatefulInterrupt(ctx, "Need user feedback before continuing", currentState)

CompositeInterrupt

Creates an interrupt action for components that coordinate multiple sub-components. It combines one or more sub-agent interrupts into a single, cohesive interrupt. Any agent containing sub-agents (e.g., custom Sequential or Parallel agents) uses this to propagate their children’s interrupts.

func CompositeInterrupt(ctx context.Context, info any, state any, 
    subInterruptSignals ...*InterruptSignal) *AgentEvent

Parameters:

  • ctx: The context of the running component.
  • info: User-facing data describing the coordinator’s own interrupt reason.
  • state: The coordinator agent’s own state (e.g., index of interrupted sub-agent).
  • subInterruptSignals: Variadic list of InterruptSignal objects from interrupted sub-agents.

Returns: *AgentEvent with interrupt action.

Example:

// In a custom sequential agent that runs two sub-agents...
subAgent1 := &myInterruptingAgent{}
subAgent2 := &myOtherAgent{}

// If subAgent1 returns an interrupt event...
subInterruptEvent := subAgent1.Run(ctx, input)

// The parent agent must catch it and wrap it in a CompositeInterrupt.
if subInterruptEvent.Action.Interrupted != nil {
    // The parent agent can add its own state, like which child was interrupted.
    parentState := map[string]int{"interrupted_child_index": 0}
    
    // Bubble up the interrupt.
    return adk.CompositeInterrupt(ctx, 
        "A sub-agent needs attention", 
        parentState, 
        subInterruptEvent.Action.Interrupted.internalInterrupted,
    )
}

2. APIs for Getting Interrupt Information

InterruptInfo and InterruptCtx

When agent execution is interrupted, the AgentEvent contains structured interrupt information. The InterruptInfo struct contains a list of InterruptCtx objects, each representing an interrupt point in the hierarchy.

InterruptCtx provides a complete, user-facing context for a single resumable interrupt point.

type InterruptCtx struct {
    // ID is the unique, fully-qualified address of the interrupt point, used for targeted resumption.
    // Example: "agent:A;node:graph_a;tool:tool_call_123"
    ID string

    // Address is a structured sequence of AddressSegment segments leading to the interrupt point.
    Address Address

    // Info is user-facing information associated with the interrupt, provided by the component that triggered it.
    Info any

    // IsRootCause indicates if the interrupt point is the exact root cause of the interrupt.
    IsRootCause bool

    // Parent points to the context of the parent component in the interrupt chain (nil for top-level interrupts).
    Parent *InterruptCtx
}

The following example shows how to access this information:

// At the application layer, after interruption:
if event.Action != nil && event.Action.Interrupted != nil {
    interruptInfo := event.Action.Interrupted
    
    // Get a flat list of all interrupt points
    interruptPoints := interruptInfo.InterruptContexts 
    
    for _, point := range interruptPoints {
        // Each point contains a unique ID, user-facing info, and its hierarchical address
        fmt.Printf("Interrupt ID: %s, Address: %s, Info: %v\n", point.ID, point.Address.String(), point.Info)
    }
}

3. APIs for End User Resumption

(*Runner).ResumeWithParams

Continues interrupted execution from a checkpoint using the “explicit targeted resumption” strategy. This is the most common and powerful way to resume, allowing you to target specific interrupt points and provide data to them.

When using this method:

  • Components whose addresses are in the ResumeParams.Targets map will be explicit targets.
  • Interrupted components whose addresses are not in the ResumeParams.Targets map must re-interrupt themselves to preserve their state.
func (r *Runner) ResumeWithParams(ctx context.Context, checkPointID string, 
    params *ResumeParams, opts ...AgentRunOption) (*AsyncIterator[*AgentEvent], error)

Parameters:

  • ctx: The context for resumption.
  • checkPointID: The identifier of the checkpoint to resume from.
  • params: Interrupt parameters containing a map of interrupt IDs to resume data. These IDs can point to any interruptible component across the execution graph.
  • opts: Additional run options.

Returns: An async iterator of agent events.

Example:

// After receiving an interrupt event...
interruptID := interruptEvent.Action.Interrupted.InterruptContexts[0].ID

// Prepare data for the specific interrupt point.
resumeData := map[string]any{
    interruptID: "This is the clarification you requested.",
}

// Resume execution with targeted data.
resumeIterator, err := runner.ResumeWithParams(ctx, "my-checkpoint-id", &ResumeParams{Targets: resumeData})
if err != nil {
    // Handle error
}

// Process events from the resume iterator
for event := range resumeIterator.Events() {
    if event.Err != nil {
        // Handle event error
        break
    }
    // Process agent event
    fmt.Printf("Event: %+v\n", event)
}

4. APIs for Developer Resumption

ResumeInfo Struct

ResumeInfo holds all information needed to resume interrupted agent execution. It is created by the framework and passed to the agent’s Resume method.

type ResumeInfo struct {
    // WasInterrupted indicates if this agent had an interrupt in a previous Runner run.
    WasInterrupted bool

    // InterruptState holds the state saved via StatefulInterrupt or CompositeInterrupt.
    InterruptState any

    // IsResumeTarget indicates if this agent is a specific target of ResumeWithParams.
    IsResumeTarget bool

    // ResumeData holds user-provided data for this agent.
    ResumeData any

    // ... other fields
}

Example:

import (
    "context"
    "errors"
    "fmt"
    
    "github.com/cloudwego/eino/adk"
)

// Inside an agent's Resume method:
func (a *myAgent) Resume(ctx context.Context, info *adk.ResumeInfo, opts ...adk.AgentRunOption) *adk.AsyncIterator[*adk.AgentEvent] {
    if !info.WasInterrupted {
        // Already entered Resume method, must have WasInterrupted = true
        return adk.NewAsyncIterator([]*adk.AgentEvent{{Err: errors.New("not an interrupt")}}, nil)
    }

    if !info.IsResumeTarget {
        // This agent is not a specific target, so must re-interrupt to preserve its state.
        return adk.StatefulInterrupt(ctx, "Waiting for another part of the workflow to be resumed", info.InterruptState)
    }

    // This agent is the target. Handle the resume data.
    if info.ResumeData != nil {
        userInput, ok := info.ResumeData.(string)
        if ok {
            // Process user input and continue execution
            fmt.Printf("Received user input: %s\n", userInput)
            // Update agent state based on user input
            a.currentState.LastUserInput = userInput
        }
    }
    
    // Continue normal execution logic
    return a.Run(ctx, &adk.AgentInput{Input: "resumed execution"})
}

Compose Package API

The compose package provides low-level building blocks for creating complex, interruptible workflows. Suitable for throwing interrupts and handling resumption in Graph Nodes.

1. APIs for Interrupting

Interrupt

Creates a special error that signals the execution engine to interrupt the current run at a component’s specific address and save a checkpoint. This is the standard way for a single, non-composite component to signal a resumable interrupt.

func Interrupt(ctx context.Context, info any) error

Parameters:

  • ctx: The context of the running component, used to retrieve the current execution address.
  • info: User-facing information about the interrupt. This information is not persisted but is exposed to the calling application via InterruptCtx.

StatefulInterrupt

Similar to Interrupt, but also saves the component’s internal state. The state is saved in the checkpoint and provided back to the component via GetInterruptState upon resumption.

func StatefulInterrupt(ctx context.Context, info any, state any) error

Parameters:

  • ctx: The context of the running component.
  • info: User-facing information about the interrupt.
  • state: The internal state that the interrupting component needs to persist.

CompositeInterrupt

Creates a special error representing a composite interrupt. It is designed for “composite” nodes (like ToolsNode) or any component that coordinates multiple independent, interruptible sub-processes. It bundles multiple sub-interrupt errors into a single error that the engine can deconstruct into a flat list of resumable points.

func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error) error

Parameters:

  • ctx: The context of the running composite node.
  • info: User-facing information from the composite node itself (can be nil).
  • state: The composite node’s own state (can be nil).
  • errs: A list of errors from sub-processes. These can be Interrupt, StatefulInterrupt, or nested CompositeInterrupt errors.

Example:

// A node that runs multiple processes in parallel.
var errs []error
for _, process := range processes {
    subCtx := compose.AppendAddressSegment(ctx, "process", process.ID)
    _, err := process.Run(subCtx)
    if err != nil {
        errs = append(errs, err)
    }
}

// If any sub-process interrupted, bundle them up.
if len(errs) > 0 {
    // The composite node can save its own state, e.g., which processes have completed.
    return compose.CompositeInterrupt(ctx, "Parallel execution needs input", parentState, errs...)
}

2. APIs for Getting Interrupt Information

ExtractInterruptInfo

Extracts the structured InterruptInfo object from an error returned by a Runnable’s Invoke or Stream method. This is the primary way for an application to get a list of all interrupt points after execution pauses.

composeInfo, ok := compose.ExtractInterruptInfo(err)
if ok {
    // Access interrupt contexts
    interruptContexts := composeInfo.InterruptContexts
}

Example:

// After calling a graph that interrupted...
_, err := graph.Invoke(ctx, "initial input")

if err != nil {
    interruptInfo, isInterrupt := compose.ExtractInterruptInfo(err)
    if isInterrupt {
        fmt.Printf("Execution was interrupted with %d interrupt points.\n", len(interruptInfo.InterruptContexts))
        // Now you can inspect interruptInfo.InterruptContexts to decide how to resume.
    }
}

3. APIs for End User Resumption

Resume

Prepares the context for an “explicit targeted resumption” operation by targeting one or more components without providing data. Useful when the resumption action itself is the signal.

func Resume(ctx context.Context, interruptIDs ...string) context.Context

Example:

// After interrupt, we get two interrupt IDs: id1 and id2.
// We want to resume both without providing specific data.
resumeCtx := compose.Resume(context.Background(), id1, id2)

// Pass this context to the next Invoke/Stream call.
// In the components corresponding to id1 and id2, GetResumeContext will return isResumeFlow = true.

ResumeWithData

Prepares a context to resume a single specific component with data. It’s a convenience wrapper around BatchResumeWithData.

func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context

Example:

// Resume a single interrupt point with specific data.
resumeCtx := compose.ResumeWithData(context.Background(), interruptID, "This is the specific data you requested.")

// Pass this context to the next Invoke/Stream call.

BatchResumeWithData

This is the core function for preparing a resume context. It injects a map of resume targets (interrupt IDs) and their corresponding data into the context. Components whose interrupt IDs exist as keys will receive isResumeFlow = true when calling GetResumeContext.

func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context

Example:

// Resume multiple interrupt points at once, each with different data.
resumeData := map[string]any{
    "interrupt-id-1": "Data for the first point.",
    "interrupt-id-2": 42, // Data can be any type.
    "interrupt-id-3": nil, // Equivalent to using Resume() for this ID.
}

resumeCtx := compose.BatchResumeWithData(context.Background(), resumeData)

// Pass this context to the next Invoke/Stream call.

4. APIs for Developer Resumption

GetInterruptState

Provides a type-safe way to check and retrieve persisted state from a previous interrupt. This is the primary function for a component to learn about its past state.

func GetInterruptState[T any](ctx context.Context) (wasInterrupted bool, hasState bool, state T)

Return values:

  • wasInterrupted: true if the node was part of a previous interrupt.
  • hasState: true if state was provided and successfully cast to type T.
  • state: The typed state object.

Example:

// Inside a lambda or tool's execution logic:
wasInterrupted, hasState, state := compose.GetInterruptState[*MyState](ctx)

if wasInterrupted {
    fmt.Println("This component was interrupted in a previous run.")
    if hasState {
        fmt.Printf("Restored state: %+v\n", state)
    } 
} else {
    // This is the first time this component is running in this execution.
}

GetResumeContext

Checks if the current component is a target of a resume operation and retrieves any user-provided data. This is typically called after GetInterruptState confirms the component was interrupted.

func GetResumeContext[T any](ctx context.Context) (isResumeTarget bool, hasData bool, data T)

Return values:

  • isResumeTarget: true if the component was explicitly targeted by the resume call. If false, the component must re-interrupt to preserve its state. Note that if a component is not directly targeted but is a parent of a directly targeted component, isResumeTarget will still be true. For example, if NodeA is interrupted and targeted for resumption, then GraphA containing NodeA will also be a resume target.
  • hasData: true if resume data was provided for this component.
  • data: The typed data provided by the user.

Example:

// Inside a lambda or tool's execution logic, after checking GetInterruptState:
wasInterrupted, _, oldState := compose.GetInterruptState[*MyState](ctx)

if wasInterrupted {
    isTarget, hasData, resumeData := compose.GetResumeContext[string](ctx)
    if isTarget {
        // This component is the target, continue execution logic.
        if hasData {
            fmt.Printf("Resuming with user data: %s\n", resumeData)
        }
        // Complete the work using restored state and resume data
        result := processWithStateAndData(state, resumeData)
        return result, nil
    } else {
        // This component is not the target, so must re-interrupt.
        return compose.StatefulInterrupt(ctx, "Waiting for another component to be resumed", oldState)
    }
}

Tool Package API

Completely symmetric to the Compose Package API, used for throwing interrupts and handling resumption inside Tools. See the components/tool/interrupt.go file.

Underlying Architecture: The Addressing System

The Need for Addresses

The addressing system is designed to solve three fundamental requirements for effective human-in-the-loop interactions:

  1. State Attachment: To attach local state to interrupt points, we need a stable, unique locator for each interrupt point.
  2. Targeted Resumption: To provide targeted resume data to specific interrupt points, we need a way to precisely identify each point.
  3. Interrupt Localization: To tell end users exactly where in the execution hierarchy the interrupt occurred.

How Addresses Meet These Requirements

The address system meets these requirements through three key properties:

  • Stability: Addresses remain consistent across multiple executions, ensuring the same interrupt points can be reliably identified.
  • Uniqueness: Each interrupt point has a unique address, enabling precise targeting during resumption.
  • Hierarchy: Addresses provide a clear hierarchical path showing exactly where in the execution flow the interrupt occurred.

Address Structure and Segment Types

Address Structure

type Address struct {
    Segments []AddressSegment
}

type AddressSegment struct {
    Type  AddressSegmentType
    ID    string
    SubID string
}

Address Structure Diagram

The following diagrams illustrate the hierarchical structure of Address and its AddressSegment from both ADK and Compose layer perspectives:

ADK Layer Perspective (Simplified, Agent-centric view):

graph TD
    A[Address] --> B[AddressSegment 1]
    A --> C[AddressSegment 2]
    A --> D[AddressSegment 3]
    
    B --> B1[Type: Agent]
    B --> B2[ID: A]
    
    C --> C1[Type: Agent]
    C --> C2[ID: B]
    
    D --> D1[Type: Tool]
    D --> D2[ID: search_tool]
    D --> D3[SubID: 1]
    
    style A fill:#e1f5fe
    style B fill:#f3e5f5
    style C fill:#f3e5f5
    style D fill:#f3e5f5
    style B1 fill:#e8f5e8
    style B2 fill:#e8f5e8
    style C1 fill:#e8f5e8
    style C2 fill:#e8f5e8
    style D1 fill:#e8f5e8
    style D2 fill:#e8f5e8
    style D3 fill:#e8f5e8

Compose Layer Perspective (Detailed, complete hierarchical view):

graph TD
    A[Address] --> B[AddressSegment 1]
    A --> C[AddressSegment 2]
    A --> D[AddressSegment 3]
    A --> E[AddressSegment 4]
    
    B --> B1[Type: Runnable]
    B --> B2[ID: my_graph]
    
    C --> C1[Type: Node]
    C --> C2[ID: sub_graph]
    
    D --> D1[Type: Node]
    D --> D2[ID: tools_node]
    
    E --> E1[Type: Tool]
    E --> E2[ID: mcp_tool]
    E --> E3[SubID: 1]
    
    style A fill:#e1f5fe
    style B fill:#f3e5f5
    style C fill:#f3e5f5
    style D fill:#f3e5f5
    style E fill:#f3e5f5
    style B1 fill:#e8f5e8
    style B2 fill:#e8f5e8
    style C1 fill:#e8f5e8
    style C2 fill:#e8f5e8
    style D1 fill:#e8f5e8
    style D2 fill:#e8f5e8
    style E1 fill:#e8f5e8
    style E2 fill:#e8f5e8
    style E3 fill:#e8f5e8

Layer-Specific Address Segment Types

ADK Layer Segment Types

The ADK layer provides a simplified, agent-centric abstraction of the execution hierarchy:

type AddressSegmentType = core.AddressSegmentType

const (
    AddressSegmentAgent AddressSegmentType = "agent"
    AddressSegmentTool  AddressSegmentType = "tool"
)

Key characteristics:

  • Agent segments: Represent agent-level execution segments (typically SubID is omitted).
  • Tool segments: Represent tool-level execution segments (SubID is used to ensure uniqueness).
  • Simplified view: Abstracts away underlying complexity for agent developers.
  • Example path: Agent:A → Agent:B → Tool:search_tool:1

Compose Layer Segment Types

The compose layer provides fine-grained control and visibility into the entire execution hierarchy:

type AddressSegmentType = core.AddressSegmentType

const (
    AddressSegmentRunnable AddressSegmentType = "runnable"  // Graph, Workflow, or Chain
    AddressSegmentNode     AddressSegmentType = "node"      // Individual graph nodes
    AddressSegmentTool     AddressSegmentType = "tool"      // Specific tool calls
)

Key characteristics:

  • Runnable segments: Represent top-level executables (Graph, Workflow, Chain).
  • Node segments: Represent individual nodes in an execution graph.
  • Tool segments: Represent specific tool calls within a ToolsNode.
  • Detailed view: Provides full visibility into the execution hierarchy.
  • Example path: Runnable:my_graph → Node:sub_graph → Node:tools_node → Tool:mcp_tool:1

Extensibility and Design Principles

The address segment type system is designed to be extensible. Framework developers can add new segment types to support additional execution patterns or custom components while maintaining backward compatibility.

Key design principle: The ADK layer provides simplified, agent-centric abstractions while the compose layer handles the full complexity of the execution hierarchy. This layered approach allows developers to work at the abstraction level that suits their needs.

Backward Compatibility

The human-in-the-loop framework maintains full backward compatibility with existing code. All previous interrupt and resume patterns will continue to work as before, while providing enhanced functionality through the new addressing system.

1. Graph Interrupt Compatibility

Previous graph interrupt flows using the deprecated NewInterruptAndRerunErr or InterruptAndRerun in nodes/tools will continue to be supported, but with one critical additional step: error wrapping.

Since these functions are not aware of the new addressing system, the component calling them is responsible for catching the error and wrapping it with address information using the WrapInterruptAndRerunIfNeeded helper function. This is typically done inside composite nodes (like the official ToolsNode) that call legacy components.

Note: If you choose not to use WrapInterruptAndRerunIfNeeded, the original behavior of these functions will be preserved. End users can still use ExtractInterruptInfo to get information from the error as before. However, since the resulting interrupt context will lack the correct address, you will not be able to use the new targeted resumption APIs for that specific interrupt point. To fully enable the new address-aware features, wrapping is required.

// 1. A legacy tool using deprecated interrupt
func myLegacyTool(ctx context.Context, input string) (string, error) {
    // ... tool logic
    // This error is not address-aware.
    return "", compose.NewInterruptAndRerunErr("Need user approval")
}

// 2. A composite node that calls the legacy tool
var legacyToolNode = compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
    out, err := myLegacyTool(ctx, input)
    if err != nil {
        // CRITICAL: Caller must wrap the error to add address.
        // The "tool:legacy_tool" segment will be appended to the current address.
        segment := compose.AddressSegment{Type: "tool", ID: "legacy_tool"}
        return "", compose.WrapInterruptAndRerunIfNeeded(ctx, segment, err)
    }
    return out, nil
})

// 3. End-user code can now see the full address.
_, err := graph.Invoke(ctx, input)
if err != nil {
    interruptInfo, exists := compose.ExtractInterruptInfo(err)
    if exists {
        // The interrupt context will now have a correct, fully-qualified address.
        fmt.Printf("Interrupt Address: %s\n", interruptInfo.InterruptContexts[0].Address.String())
    }
}

2. Compatibility with Static Interrupts Added at Compile Time

Static interrupts added via WithInterruptBeforeNodes or WithInterruptAfterNodes continue to work, but the way state is handled has been improved.

When a static interrupt is triggered, an InterruptCtx is generated with an address pointing to the graph (or subgraph) that defined the interrupt. The key is that the InterruptCtx.Info field now directly exposes that graph’s state.

This enables a more direct and intuitive workflow:

  1. The end user receives the InterruptCtx and can inspect the graph’s live state via the .Info field.
  2. They can modify this state object directly.
  3. They can then pass the modified graph state object back via ResumeWithData with the InterruptCtx.ID to resume execution.

This new pattern often makes the old WithStateModifier option unnecessary, though it remains available for full backward compatibility.

// 1. Define a graph with its own local state
type MyGraphState struct {
    SomeValue string
}

g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyGraphState {
    return &MyGraphState{SomeValue: "initial"}
}))
// ... add nodes1 and node2 to graph ...

// 2. Compile graph with static interrupt point
// This will interrupt the graph itself after "node_1" node completes.
graph, err := g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node_1"}))

// 3. Run graph, this will trigger static interrupt
_, err = graph.Invoke(ctx, "start")

// 4. Extract interrupt context and graph's state
interruptInfo, isInterrupt := compose.ExtractInterruptInfo(err)
if isInterrupt {
    interruptCtx := interruptInfo.InterruptContexts[0]

    // .Info field exposes the graph's current state
    graphState, ok := interruptCtx.Info.(*MyGraphState)
    if ok {
        // 5. Modify state directly
        fmt.Printf("Original state value: %s\n", graphState.SomeValue) // Prints "initial"
        graphState.SomeValue = "a-new-value-from-user"

        // 6. Resume by passing back the modified state object
        resumeCtx := compose.ResumeWithData(context.Background(), interruptCtx.ID, graphState)
        result, err := graph.Invoke(resumeCtx, "start")
        // ... execution will continue, and node_2 will now see the modified state.
    }
}

3. Agent Interrupt Compatibility

Compatibility with legacy agents is maintained at the data structure level, ensuring old agent implementations continue to function within the new framework. The key lies in how the adk.InterruptInfo and adk.ResumeInfo structs are populated.

For End Users (Application Layer):

When receiving an interrupt from an agent, the adk.InterruptInfo struct will have both populated:

  • The new, structured InterruptContexts field.
  • The legacy Data field, which will contain the original interrupt information (e.g., ChatModelAgentInterruptInfo or WorkflowInterruptInfo).

This allows end users to gradually migrate their application logic to use the richer InterruptContexts while still being able to access the old Data field when needed.

For Agent Developers:

When a legacy agent’s Resume method is called, the adk.ResumeInfo struct it receives still contains the now-deprecated embedded InterruptInfo field. This field is populated with the same legacy data structures, allowing agent developers to maintain their existing resumption logic without immediate updates to the new address-aware APIs.

// --- End User Perspective ---

// After agent run, you receive an interrupt event.
if event.Action != nil && event.Action.Interrupted != nil {
    interruptInfo := event.Action.Interrupted

    // 1. New way: Access structured interrupt contexts
    if len(interruptInfo.InterruptContexts) > 0 {
        fmt.Printf("New structured context available: %+v\n", interruptInfo.InterruptContexts[0])
    }

    // 2. Old way (still works): Access legacy Data field
    if chatInterrupt, ok := interruptInfo.Data.(*adk.ChatModelAgentInterruptInfo); ok {
        fmt.Printf("Legacy ChatModelAgentInterruptInfo still accessible.\n")
        // ... logic using old struct
    }
}

// --- Agent Developer Perspective ---

// Inside a legacy agent's Resume method:
func (a *myLegacyAgent) Resume(ctx context.Context, info *adk.ResumeInfo) *adk.AsyncIterator[*adk.AgentEvent] {
    // The deprecated embedded InterruptInfo field is still populated.
    // This allows old resumption logic to continue working.
    if info.InterruptInfo != nil {
        if chatInterrupt, ok := info.InterruptInfo.Data.(*adk.ChatModelAgentInterruptInfo); ok {
            // ... existing resumption logic relying on old ChatModelAgentInterruptInfo struct
            fmt.Println("Resuming based on legacy InterruptInfo.Data field.")
        }
    }
    
    // ... continue execution
}

Migration Benefits

  • Preserved Legacy Behavior: Existing code will continue to work as it did before. Old interrupt patterns won’t cause crashes, but they also won’t automatically gain the new address-aware capabilities without modification.
  • Gradual Adoption: Teams can selectively enable new features on a case-by-case basis. For example, you can wrap legacy interrupts with WrapInterruptAndRerunIfNeeded only in the workflows where you need targeted resumption.
  • Enhanced Functionality: The new addressing system provides richer, structured context (InterruptCtx) for all interrupts, while the old data fields are still populated for full compatibility.
  • Flexible State Management: For static graph interrupts, you can choose modern, direct state modification via the .Info field, or continue using the old WithStateModifier option.

This backward compatibility model ensures a smooth transition for existing users while providing a clear path to adopting the new human-in-the-loop features.


Last modified March 2, 2026: feat: sync en files (c14c5a55)