Eino: Interrupt & CheckPoint User Manual
💡 Note: In version v0.3.26, due to a code writing error, the serialization content of CheckPoint broke compatibility. For new integrations using CheckPoint after v0.3.26, it’s recommended to use the latest version directly.
Eino provides a compatibility branch. Businesses that use checkpoint and are on versions lower than v0.3.26 should first upgrade to the compatibility branch when upgrading eino, then upgrade to the main branch after old data expires.
Since the compatibility branch introduces additional performance overhead and generally business agent checkpoints have a relatively short validity period, the branch has not been merged into the main branch.
Introduction
Using the Interrupt & CheckPoint feature, you can pause Graph execution at specified positions and resume from the breakpoint later. For StateGraph, you can also modify State before resuming from the breakpoint.
💡 Breakpoint resumption can only restore input and data generated by each node at runtime. You need to ensure the Graph orchestration is exactly the same, and pass in CallOptions completely again (should be kept consistent unless there are special circumstances, except for cases like relying on CallOption to pass data during Resume).
Using Static Interrupt
Static Interrupt supports pausing the Graph before or after a specified Node executes. Pass in WithInterruptAfterNodes and WithInterruptBeforeNodes Options during Compile to set Interrupts:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
g := NewGraph[string, string]()
err := g.AddLambdaNode("node1", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
err = g.AddLambdaNode("node2", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
/** other graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node1"}), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
💡 Currently only static breakpoints can be set during Compile. If you need to set them during requests, feel free to raise the requirement~
You can get whether the current run was Interrupted and the Interrupt information from the error returned by the run:
// compose/checkpoint.go
**type **InterruptInfo **struct **{
State any
BeforeNodes []string
AfterNodes []string
RerunNodes []string
RerunNodesExtra **map**[string]any
SubGraphs **map**[string]*InterruptInfo
InterruptContexts []*InterruptCtx
}
func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool) {}
For example:
import "github.com/cloudwego/eino/compse"
/***graph compose code
* g := NewGraph
* xxx
* runner := g.Compile
*/
result, err := runner.Invoke(ctx, input)
if info, ok := ExtractInterruptInfo(err); ok {
// handler info
}
if err != nil {
// handle error
}
💡 When Interrupted, output is an empty value and has no meaning.
Using CheckPoint
CheckPoint records Graph running state. Using CheckPoint allows resuming execution after Interrupt.
Implementing CheckPointStore
CheckPointStore is a KV storage interface with key type string and value type []byte. We don’t provide encapsulation or default implementation; users need to implement it themselves to store checkpoints.
// compose/checkpoint.go
type CheckpointStore interface {
Get(ctx **context**._Context_, key string) (value []byte, existed bool,err error)
Set(ctx **context**._Context_, key string, value []byte) (err error)
}
Registering Serialization Methods
CheckPoint saving and reading involves serialization and deserialization of Graph node inputs, outputs, and State. When only using simple types or eino built-in types (such as Message or Document), users don’t need additional operations; when introducing custom structs, you need to register types in advance. Eino provides the registration method schema.RegisterName:
package main
import "github.com/cloudwego/eino/schema"
type MyState struct {
Counter int
Note string
}
func init() {
// Register the type with a stable name for serialization/persistence.
// Use the pointer form if you persist pointers to this type.
// It's recommended to register types within the `init()` function
// within the same file your type is declared.
schema.RegisterName[*MyState]("my_state_v1")
}
After registration, type information will be additionally recorded during serialization, so during deserialization, even without specifying the type (such as deserializing to interface{}), Eino can deserialize to the correct type. The key in the registration method uniquely identifies this type. Once the key is determined, it must not be changed, otherwise persisted checkpoints cannot be correctly restored.
💡 Unexported fields of structs cannot be accessed, so they won’t be stored/restored
By default, eino’s built-in serialization functionality is used. At this time, if the registered type implements json Marshaler and Unmarshaler, the type’s serialization and deserialization will use custom methods.
// encoding/json
type Marshaler interface {
MarshalJSON() ([]byte, error)
}
type Unmarshaler interface {
UnmarshalJSON([]byte) error
}
Eino also provides an option to change the serialization method to gob:
r, err := compose.NewChain[*AgentInput, Message]().
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *AgentInput) ([]Message, error) {
return a.genModelInput(ctx, instruction, input)
})).
AppendChatModel(a.model).
Compile(ctx, compose.WithGraphName(a.name),
compose.WithCheckPointStore(store),
compose.WithSerializer(&gobSerializer{}))
Users can choose based on preference. Once chosen, it’s not recommended to change easily as historical data won’t be compatible.
Enabling CheckPoint
After creating CheckPointStore, pass it as an Option when Compiling Graph to bind CheckPointer to Graph:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
/** graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithCheckPointStore(store), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
After that, you can introduce CheckPoint during requests through CallOption:
// compose/checkpoint.go
func WithCheckPointID(checkPointID string) Option
Checkpoint id will be used as the key for CheckPointStore. When the graph runs, it checks whether CheckPointStore has this id. If it exists, it resumes from the checkpoint; when interrupted, it saves the graph state to this id.
Dynamic Interrupt
Nodes can dynamically trigger Interrupt by returning special errors:
Before eino v0.7.0
// eino/compose/interrupt.go
// emit a plain interrupt signal
var InterruptAndRerun = errors.New("interrupt and rerun")
// emit an interrupt signal with extra info
**func **NewInterruptAndRerunErr(extra any) error
When Eino Graph receives this error returned by a node, an interrupt will occur. When resuming, this node will be run again. Before re-running, StateModifier will be called to modify state (if configured).
In this case, when re-running the node, the input will be replaced with an empty value instead of the original input. If the original input is still needed when re-running, it needs to be saved to State in advance.
In eino v0.7.0 and later
Added support for “saving local state”, “exposing internal interrupt signals”, and “parallel interrupts”:
// eino/compose/interrupt.go
// emit an interrupt signal with user-facing info
func Interrupt(ctx context.Context, info any) error
// emit an interrupt signal with user-facing info AS WELL AS
// persistent LOCALLY-DEFINED state
func StatefulInterrupt(ctx context.Context, info any, state any) error
// emit an interrupt signal WRAPPING other interrupt signals
// emitted from inner processes,
// such as ToolsNode wrapping Tools.
func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error)
For detailed design, see: Eino human-in-the-loop Framework: Technical Architecture Guide
External Proactive Interrupt
Sometimes, we want to proactively trigger an interrupt from outside the Graph, save the scene, and resume later. These scenarios may include graceful instance shutdown, etc. In this case, you can call WithGraphInterrupt to get a ctx and an interrupt function. The ctx is passed to running methods like graph.Invoke(), and the interrupt function is called when the user wants to proactively interrupt:
// from compose/graph_call_options.go
_// WithGraphInterrupt creates a context with graph cancellation support._
_// When the returned context is used to invoke a graph or workflow, calling the interrupt function will trigger an interrupt._
_// The graph will wait for current tasks to complete by default._
**func **WithGraphInterrupt(parent context.Context) (ctx context.Context, interrupt **func**(opts ...GraphInterruptOption)) {}
When proactively calling the interrupt function, you can pass parameters like timeout:
// from compose/graph_call_options.go
_// WithGraphInterruptTimeout specifies the max waiting time before generating an interrupt._
_// After the max waiting time, the graph will force an interrupt. Any unfinished tasks will be re-run when the graph is resumed._
**func **WithGraphInterruptTimeout(timeout time.Duration) GraphInterruptOption {
**return func**(o *graphInterruptOptions) {
o.timeout = &timeout
}
}
When externally triggered interrupt occurs, nodes internally don’t have the opportunity to save local state (including the node’s input), so eino will automatically save the input of externally interrupted nodes and automatically restore it on the next execution. For non-externally triggered interrupt scenarios, saving input is each node’s responsibility when initiating interrupt internally, which can be done by saving to graph state or using compose.StatefulInterrupt to save local state.
CheckPoint in Streaming
Streaming needs to concatenate data streams when saving CheckPoint, so concatenation methods need to be registered:
// compose/stream_concat.go
func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
// example
type TestStruct struct {
Body string
}
// RegisterStreamChunkConcatFunc is not thread-safe, needs to be used during initialization phase
RegisterStreamChunkConcatFunc(func(ss []TestStruct)(TestStruct, error){
ret := TestStruct{Body:""}
for i := range ss {
ret.Body += ss[i].Body
}
return ret, nil
})
Eino provides concat methods for *schema.Message, []*schema.Message and string by default.
Interrupt&CheckPoint in Nested Graphs
With the premise that the parent graph passes in CheckPointer, when using AddGraphNode, passing InterruptNodes through WithGraphCompileOptions can enable the sub-graph’s Interrupt&CheckPoint. If the parent graph doesn’t have CheckPointer set, an error will be reported during Compile.
/* graph compose code
xxx
*/
g.AddGraphNode("node1", subGraph, WithGraphCompileOptions(
WithInterruptAfterNodes([]string{"node2"}),
))
g.Compile(ctx, WithCheckPointStore(cp))
If interrupted in a sub-graph, the state modified when resuming should be the sub-graph state. TODO, explain Path usage in StateModifier
Resume
Resume: Graph runs after Interrupt and checkpoint saving.
Before eino v0.7.0
Influence resume behavior by modifying State.
// compose/checkpoint.go
type StateModifier func(ctx context.Context, path NodePath, state any) error
func WithStateModifier(sm StateModifier) GraphCompileOption
StateModifier takes effect when Graph resumes running, allowing modification of State before running. Path takes effect in nested graphs; treat as empty array for non-nested.
/* graph compose and compile
xxx
*/
// first run interrupt
id := GenUUID()
_, err := runner.Invoke(ctx, input, WithCheckPointID(id))
// resume from id
_, err = runner.Invoke(ctx, input/*unused*/,
WithCheckPointID(id),
WithStateModifier(func(ctx context.Context, path NodePath, state any) error{
state.(*testState).Field1 = "hello"
return nil
}),
)
💡 Input is not read during Resume, so passing empty input is fine at this time.
In eino v0.7.0 and later
Besides StateModifier, you can also selectively resume a specific interrupt point, and directly pass “resume data” to the specified “interrupt point”:
// specifically resume particular interrupt point(s),
// without specifying resume data
func Resume(ctx context.Context, interruptIDs ...string) context.Context
// specifically resume one interrupt point, with custom resume data
func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context
// specifically resume multiple interrupt points, each with custom resume data
func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context
Where InterruptID is obtained from the interrupt error:
interruptInfo, isInterrupt := ExtractInterruptInfo(err)
if isInterrupt {
// maybe multiple interrupt points exist here,
// we only take the first one for illustration purpose
interruptID = interruptInfo.InterruptContexts[0].ID
}
resumeData is the type defined by the interrupt point. For example, a Tool interrupted and requires user “approval” to execute this Tool, customizing an ApprovalResult as resumeData:
func (i InvokableApprovableTool) InvokableRun(ctx context.Context, argumentsInJSON string,
opts ...tool.Option) (string, error) {
toolInfo, err := i.Info(ctx)
if err != nil {
return "", err
}
wasInterrupted, _, storedArguments := compose.GetInterruptState[string](ctx)
if !wasInterrupted { // initial invocation, interrupt and wait for approval
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: argumentsInJSON,
ToolCallID: compose.GetToolCallID(ctx),
}, argumentsInJSON)
}
isResumeTarget, hasData, data := compose.GetResumeContext[*ApprovalResult](ctx)
if !isResumeTarget { // was interrupted but not explicitly resumed, reinterrupt and wait for approval again
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: storedArguments,
ToolCallID: compose.GetToolCallID(ctx),
}, storedArguments)
}
if !hasData {
return "", fmt.Errorf("tool '%s' resumed with no data", toolInfo.Name)
}
if data.Approved {
return i.InvokableTool.InvokableRun(ctx, storedArguments, opts...)
}
if data.DisapproveReason != nil {
return fmt.Sprintf("tool '%s' disapproved, reason: %s", toolInfo.Name, *data.DisapproveReason), nil
}
return fmt.Sprintf("tool '%s' disapproved", toolInfo.Name), nil
}
Examples
Before eino v0.7.0
https://github.com/cloudwego/eino-examples/tree/main/compose/graph/react_with_interrupt
After eino v0.7.0
https://github.com/cloudwego/eino/blob/main/compose/resume_test.go
Including:
TestInterruptStateAndResumeForRootGraph: Simple dynamic interrupt
TestInterruptStateAndResumeForSubGraph: Sub-graph interrupt
TestInterruptStateAndResumeForToolInNestedSubGraph: Tool interrupt inside nested sub-graph
TestMultipleInterruptsAndResumes: Parallel interrupts
TestReentryForResumedTools: Tool interrupt in ReAct Agent, multiple loop executions after resume
TestGraphInterruptWithinLambda: Lambda node contains independent Graph with internal interrupt