4.4 KiB
Flows & HTTP
DefineFlow
Wrap AI logic in a flow for observability, tracing, and HTTP deployment.
jokeFlow := genkit.DefineFlow(g, "jokeFlow",
func(ctx context.Context, topic string) (string, error) {
return genkit.GenerateText(ctx, g,
ai.WithModelName("googleai/gemini-flash-latest"),
ai.WithPrompt("Tell me a joke about %s", topic),
)
},
)
Running a Flow Directly
result, err := jokeFlow.Run(ctx, "cats")
DefineStreamingFlow
Flows that stream chunks back to the caller. Two common patterns:
Pattern 1: Passthrough Streaming
Pass the stream callback directly through to WithStreaming. The callback type is ai.ModelStreamCallback = func(context.Context, *ai.ModelResponseChunk) error:
genkit.DefineStreamingFlow(g, "streamingJokeFlow",
func(ctx context.Context, topic string, sendChunk ai.ModelStreamCallback) (string, error) {
resp, err := genkit.Generate(ctx, g,
ai.WithModelName("googleai/gemini-flash-latest"),
ai.WithPrompt("Tell me a long joke about %s", topic),
ai.WithStreaming(sendChunk), // passthrough
)
if err != nil {
return "", err
}
return resp.Text(), nil
},
)
Pattern 2: Manual String Streaming
Use core.StreamCallback[string] to stream extracted text:
genkit.DefineStreamingFlow(g, "streamingJokeFlow",
func(ctx context.Context, topic string, sendChunk core.StreamCallback[string]) (string, error) {
stream := genkit.GenerateStream(ctx, g,
ai.WithModelName("googleai/gemini-flash-latest"),
ai.WithPrompt("Tell me a long joke about %s", topic),
)
for result, err := range stream {
if err != nil {
return "", err
}
if result.Done {
return result.Response.Text(), nil
}
sendChunk(ctx, result.Chunk.Text())
}
return "", nil
},
)
Typed Streaming Flows
Use core.StreamCallback[T] with GenerateDataStream for typed chunks:
genkit.DefineStreamingFlow(g, "structuredStream",
func(ctx context.Context, input JokeRequest, sendChunk core.StreamCallback[*Joke]) (*Joke, error) {
stream := genkit.GenerateDataStream[*Joke](ctx, g,
ai.WithModelName("googleai/gemini-flash-latest"),
ai.WithPrompt("Tell me a joke about %s", input.Topic),
)
for result, err := range stream {
if err != nil { return nil, err }
if result.Done { return result.Output, nil }
sendChunk(ctx, result.Chunk)
}
return nil, nil
},
)
Named Sub-Steps
Use core.Run inside a flow for traced sub-steps:
genkit.DefineFlow(g, "pipeline",
func(ctx context.Context, input string) (string, error) {
subject, err := core.Run(ctx, "extract-subject", func() (string, error) {
return genkit.GenerateText(ctx, g,
ai.WithPrompt("Extract the subject from: %s", input),
)
})
if err != nil { return "", err }
joke, err := core.Run(ctx, "generate-joke", func() (string, error) {
return genkit.GenerateText(ctx, g,
ai.WithPrompt("Tell me a joke about %s", subject),
)
})
return joke, err
},
)
HTTP Handlers
genkit.Handler
Convert any flow into an http.HandlerFunc:
mux := http.NewServeMux()
for _, f := range genkit.ListFlows(g) {
mux.HandleFunc("POST /"+f.Name(), genkit.Handler(f))
}
log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))
Request/Response Format
Non-streaming request:
curl -X POST http://localhost:8080/jokeFlow \
-H "Content-Type: application/json" \
-d '{"data": "bananas"}'
Response: {"result": "Why did the banana go to the doctor?..."}
Streaming request:
curl -N -X POST http://localhost:8080/streamingJokeFlow \
-H "Content-Type: application/json" \
-d '{"data": "bananas"}'
Streaming responses use Server-Sent Events (SSE) format.
genkit.HandlerFunc
For frameworks that expect error-returning handlers:
handler := genkit.HandlerFunc(myFlow)
// handler is func(http.ResponseWriter, *http.Request) error
Context Providers
Inject request context (e.g., auth headers) into flow execution:
mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow,
genkit.WithContextProviders(func(ctx context.Context, rd core.RequestData) (api.ActionContext, error) {
// rd.Headers contains HTTP headers
return api.ActionContext{"userId": rd.Headers.Get("X-User-Id")}, nil
}),
))
ListFlows
Get all registered flows for dynamic route setup:
flows := genkit.ListFlows(g) // []api.Action
for _, f := range flows {
fmt.Println(f.Name())
}