Eino: Indexer User Guide
Introduction
The Indexer component is a component for storing and indexing documents. Its main purpose is to store documents and their vector representations in backend storage systems and provide efficient retrieval capabilities. This component plays an important role in the following scenarios:
- Building vector databases for semantic association search
Component Definition
Interface Definition
Code location: eino/components/indexer/interface.go
type Indexer interface {
Store(ctx context.Context, docs []*schema.Document, opts ...Option) (ids []string, err error)
}
Store Method
- Function: Store documents and build indexes
- Parameters:
- ctx: Context object for passing request-level information and Callback Manager
- docs: List of documents to be stored
- opts: Storage options for configuring storage behavior
- Return values:
- ids: List of successfully stored document IDs
- error: Error information during storage
Common Options
The Indexer component uses IndexerOption to define optional parameters. Indexer defines the following common options. Additionally, each specific implementation can define its own specific Options, wrapped into a unified IndexerOption type through the WrapIndexerImplSpecificOptFn function.
type Options struct {
// SubIndexes is the list of sub-indexes to be built
SubIndexes []string
// Embedding is the component used to generate document vectors
Embedding embedding.Embedder
}
Options can be set in the following ways:
// Set sub-indexes
WithSubIndexes(subIndexes []string) Option
// Set vector generation component
WithEmbedding(emb embedding.Embedder) Option
Usage
Standalone Usage
VikingDB Example
import (
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-ext/components/indexer/volc_vikingdb"
)
collectionName := "eino_test"
/*
* The example below pre-built a dataset (collection) named eino_test with the following field configuration:
* Field Name Field Type Vector Dimension
* ID string
* vector vector 1024
* sparse_vector sparse_vector
* content string
* extra_field_1 string
*
* Notes when using the component:
* 1. Field names and types for ID / vector / sparse_vector / content should match the configuration above
* 2. Vector dimension should match the output dimension of the model corresponding to ModelName
* 3. Some models don't output sparse vectors, in which case UseSparse should be set to false, and the collection may not have a sparse_vector field
*/
cfg := &volc_vikingdb.IndexerConfig{
// https://api-vikingdb.volces.com (North China)
// https://api-vikingdb.mlp.cn-shanghai.volces.com (East China)
// https://api-vikingdb.mlp.ap-mya.byteplus.com (Overseas - Johor)
Host: "api-vikingdb.volces.com",
Region: "cn-beijing",
AK: ak,
SK: sk,
Scheme: "https",
ConnectionTimeout: 0,
Collection: collectionName,
EmbeddingConfig: volc_vikingdb.EmbeddingConfig{
UseBuiltin: true,
ModelName: "bge-m3",
UseSparse: true,
},
AddBatchSize: 10,
}
volcIndexer, _ := volc_vikingdb.NewIndexer(ctx, cfg)
doc := &schema.Document{
ID: "mock_id_1",
Content: "A ReAct prompt consists of few-shot task-solving trajectories, with human-written text reasoning traces and actions, as well as environment observations in response to actions",
}
volc_vikingdb.SetExtraDataFields(doc, map[string]interface{}{"extra_field_1": "mock_ext_abc"})
volc_vikingdb.SetExtraDataTTL(doc, 1000)
docs := []*schema.Document{doc}
resp, _ := volcIndexer.Store(ctx, docs)
fmt.Printf("vikingDB store success, docs=%v, resp ids=%v\n", docs, resp)
Milvus Example
package main
import (
"github.com/cloudwego/eino/schema"
"github.com/milvus-io/milvus/client/v2/milvusclient"
"github.com/cloudwego/eino-ext/components/indexer/milvus2"
)
// Create indexer
indexer, err := milvus2.NewIndexer(ctx, &milvus2.IndexerConfig{
ClientConfig: &milvusclient.ClientConfig{
Address: addr,
Username: username,
Password: password,
},
Collection: "my_collection",
Dimension: 1024, // Match embedding model dimension
MetricType: milvus2.COSINE,
IndexBuilder: milvus2.NewHNSWIndexBuilder().WithM(16).WithEfConstruction(200),
Embedding: emb,
})
// Index documents
docs := []*schema.Document{
{
ID: "doc1",
Content: "EINO is a framework for building AI applications",
},
}
ids, err := indexer.Store(ctx, docs)
ElasticSearch 7 Example
import (
"github.com/cloudwego/eino/components/embedding"
"github.com/cloudwego/eino/schema"
elasticsearch "github.com/elastic/go-elasticsearch/v7"
"github.com/cloudwego/eino-ext/components/indexer/es7"
)
client, _ := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{"http://localhost:9200"},
Username: username,
Password: password,
})
// Create ES indexer component
indexer, _ := es7.NewIndexer(ctx, &es7.IndexerConfig{
Client: client,
Index: indexName,
BatchSize: 10,
DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es7.FieldValue, err error) {
return map[string]es7.FieldValue{
fieldContent: {
Value: doc.Content,
EmbedKey: fieldContentVector, // Vectorize document content and save to "content_vector" field
},
fieldExtraLocation: {
Value: doc.MetaData[docExtraLocation],
},
}, nil
},
Embedding: emb,
})
// Index documents
docs := []*schema.Document{
{
ID: "doc1",
Content: "EINO is a framework for building AI applications",
},
}
ids, err := indexer.Store(ctx, docs)
OpenSearch 2 Example
package main
import (
"github.com/cloudwego/eino/schema"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/cloudwego/eino-ext/components/indexer/opensearch2"
)
client, err := opensearch.NewClient(opensearch.Config{
Addresses: []string{"http://localhost:9200"},
Username: username,
Password: password,
})
// Create opensearch indexer component
indexer, _ := opensearch2.NewIndexer(ctx, &opensearch2.IndexerConfig{
Client: client,
Index: "your_index_name",
BatchSize: 10,
DocumentToFields: func(ctx context.Context, doc *schema.Document) (map[string]opensearch2.FieldValue, error) {
return map[string]opensearch2.FieldValue{
"content": {
Value: doc.Content,
EmbedKey: "content_vector",
},
}, nil
},
Embedding: emb,
})
// Index documents
docs := []*schema.Document{
{
ID: "doc1",
Content: "EINO is a framework for building AI applications",
},
}
ids, err := indexer.Store(ctx, docs)
Usage in Orchestration
// Use in Chain
chain := compose.NewChain[[]*schema.Document, []string]()
chain.AppendIndexer(indexer)
// Use in Graph
graph := compose.NewGraph[[]*schema.Document, []string]()
graph.AddIndexerNode("indexer_node", indexer)
Option and Callback Usage
Option Usage Example
// Using options (standalone usage)
ids, err := indexer.Store(ctx, docs,
// Set sub-indexes
indexer.WithSubIndexes([]string{"kb_1", "kb_2"}),
// Set vector generation component
indexer.WithEmbedding(embedder),
)
Callback Usage Example
Code location: eino-ext/components/indexer/volc_vikingdb/examples/builtin_embedding
import (
"context"
"fmt"
"log"
"os"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/indexer"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
callbacksHelper "github.com/cloudwego/eino/utils/callbacks"
"github.com/cloudwego/eino-ext/components/indexer/volc_vikingdb"
)
handler := &callbacksHelper.IndexerCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *indexer.CallbackInput) context.Context {
log.Printf("input access, len: %v, content: %s\n", len(input.Docs), input.Docs[0].Content)
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *indexer.CallbackOutput) context.Context {
log.Printf("output finished, len: %v, ids=%v\n", len(output.IDs), output.IDs)
return ctx
},
// OnError
}
// Use callback handler
helper := callbacksHelper.NewHandlerHelper().
Indexer(handler).
Handler()
chain := compose.NewChain[[]*schema.Document, []string]()
chain.AppendIndexer(volcIndexer)
// Use at runtime
run, _ := chain.Compile(ctx)
outIDs, _ := run.Invoke(ctx, docs, compose.WithCallbacks(helper))
fmt.Printf("vikingDB store success, docs=%v, resp ids=%v\n", docs, outIDs)
Existing Implementations
- Volc VikingDB Indexer: Vector database indexer based on Volcengine VikingDB Indexer - VikingDB
- Milvus v2.5+ Indexer: Vector database indexer based on Milvus Indexer - Milvus 2 (v2.5+)
- Milvus v2.4- Indexer: Vector database indexer based on Milvus Indexer - Milvus (v2.4-)
- Elasticsearch 8 Indexer: General search engine indexer based on ES8 Indexer - ElasticSearch 8
- ElasticSearch 7 Indexer: General search engine indexer based on ES7 Indexer - Elasticsearch 7
- OpenSearch 3 Indexer: General search engine indexer based on OpenSearch 3 Indexer - OpenSearch 3
- OpenSearch 2 Indexer: General search engine indexer based on OpenSearch 2 Indexer - OpenSearch 2
Implementation Reference
When implementing a custom Indexer component, note the following:
- Pay attention to handling common options as well as implementation-level option handling
- Pay attention to callback handling
Option Mechanism
Custom Indexer can implement its own Options as needed:
// Define Option struct
type MyIndexerOptions struct {
BatchSize int
MaxRetries int
}
// Define Option functions
func WithBatchSize(size int) indexer.Option {
return indexer.WrapIndexerImplSpecificOptFn(func(o *MyIndexerOptions) {
o.BatchSize = size
})
}
Callback Handling
Indexer implementations need to trigger callbacks at appropriate times. The framework has defined standard callback input/output structs:
// CallbackInput is the input for indexer callback
type CallbackInput struct {
// Docs is the list of documents to be indexed
Docs []*schema.Document
// Extra is additional information for the callback
Extra map[string]any
}
// CallbackOutput is the output for indexer callback
type CallbackOutput struct {
// IDs is the list of document IDs returned by the indexer
IDs []string
// Extra is additional information for the callback
Extra map[string]any
}
Complete Implementation Example
type MyIndexer struct {
batchSize int
embedder embedding.Embedder
}
func NewMyIndexer(config *MyIndexerConfig) (*MyIndexer, error) {
return &MyIndexer{
batchSize: config.DefaultBatchSize,
embedder: config.DefaultEmbedder,
}, nil
}
func (i *MyIndexer) Store(ctx context.Context, docs []*schema.Document, opts ...indexer.Option) ([]string, error) {
// 1. Handle options
options := &indexer.Options{},
options = indexer.GetCommonOptions(options, opts...)
// 2. Get callback manager
cm := callbacks.ManagerFromContext(ctx)
// 3. Callback before storage starts
ctx = cm.OnStart(ctx, info, &indexer.CallbackInput{
Docs: docs,
})
// 4. Execute storage logic
ids, err := i.doStore(ctx, docs, options)
// 5. Handle errors and completion callback
if err != nil {
ctx = cm.OnError(ctx, info, err)
return nil, err
}
ctx = cm.OnEnd(ctx, info, &indexer.CallbackOutput{
IDs: ids,
})
return ids, nil
}
func (i *MyIndexer) doStore(ctx context.Context, docs []*schema.Document, opts *indexer.Options) ([]string, error) {
// Implement document storage logic (pay attention to handling common option parameters)
// 1. If Embedding component is set, generate vector representations for documents
if opts.Embedding != nil {
// Extract document content
texts := make([]string, len(docs))
for j, doc := range docs {
texts[j] = doc.Content
}
// Generate vectors
vectors, err := opts.Embedding.EmbedStrings(ctx, texts)
if err != nil {
return nil, err
}
// Store vectors in document's MetaData
for j, doc := range docs {
doc.WithVector(vectors[j])
}
}
// 2. Other custom logic
return ids, nil
}