Files
Onsol-GO/.agents/skills/developing-genkit-go/references/flows-and-http.md
T
2026-04-23 23:58:59 -05:00

4.4 KiB

Flows & HTTP

DefineFlow

Wrap AI logic in a flow for observability, tracing, and HTTP deployment.

jokeFlow := genkit.DefineFlow(g, "jokeFlow",
	func(ctx context.Context, topic string) (string, error) {
		return genkit.GenerateText(ctx, g,
			ai.WithModelName("googleai/gemini-flash-latest"),
			ai.WithPrompt("Tell me a joke about %s", topic),
		)
	},
)

Running a Flow Directly

result, err := jokeFlow.Run(ctx, "cats")

DefineStreamingFlow

Flows that stream chunks back to the caller. Two common patterns:

Pattern 1: Passthrough Streaming

Pass the stream callback directly through to WithStreaming. The callback type is ai.ModelStreamCallback = func(context.Context, *ai.ModelResponseChunk) error:

genkit.DefineStreamingFlow(g, "streamingJokeFlow",
	func(ctx context.Context, topic string, sendChunk ai.ModelStreamCallback) (string, error) {
		resp, err := genkit.Generate(ctx, g,
			ai.WithModelName("googleai/gemini-flash-latest"),
			ai.WithPrompt("Tell me a long joke about %s", topic),
			ai.WithStreaming(sendChunk), // passthrough
		)
		if err != nil {
			return "", err
		}
		return resp.Text(), nil
	},
)

Pattern 2: Manual String Streaming

Use core.StreamCallback[string] to stream extracted text:

genkit.DefineStreamingFlow(g, "streamingJokeFlow",
	func(ctx context.Context, topic string, sendChunk core.StreamCallback[string]) (string, error) {
		stream := genkit.GenerateStream(ctx, g,
			ai.WithModelName("googleai/gemini-flash-latest"),
			ai.WithPrompt("Tell me a long joke about %s", topic),
		)
		for result, err := range stream {
			if err != nil {
				return "", err
			}
			if result.Done {
				return result.Response.Text(), nil
			}
			sendChunk(ctx, result.Chunk.Text())
		}
		return "", nil
	},
)

Typed Streaming Flows

Use core.StreamCallback[T] with GenerateDataStream for typed chunks:

genkit.DefineStreamingFlow(g, "structuredStream",
	func(ctx context.Context, input JokeRequest, sendChunk core.StreamCallback[*Joke]) (*Joke, error) {
		stream := genkit.GenerateDataStream[*Joke](ctx, g,
			ai.WithModelName("googleai/gemini-flash-latest"),
			ai.WithPrompt("Tell me a joke about %s", input.Topic),
		)
		for result, err := range stream {
			if err != nil { return nil, err }
			if result.Done { return result.Output, nil }
			sendChunk(ctx, result.Chunk)
		}
		return nil, nil
	},
)

Named Sub-Steps

Use core.Run inside a flow for traced sub-steps:

genkit.DefineFlow(g, "pipeline",
	func(ctx context.Context, input string) (string, error) {
		subject, err := core.Run(ctx, "extract-subject", func() (string, error) {
			return genkit.GenerateText(ctx, g,
				ai.WithPrompt("Extract the subject from: %s", input),
			)
		})
		if err != nil { return "", err }

		joke, err := core.Run(ctx, "generate-joke", func() (string, error) {
			return genkit.GenerateText(ctx, g,
				ai.WithPrompt("Tell me a joke about %s", subject),
			)
		})
		return joke, err
	},
)

HTTP Handlers

genkit.Handler

Convert any flow into an http.HandlerFunc:

mux := http.NewServeMux()
for _, f := range genkit.ListFlows(g) {
	mux.HandleFunc("POST /"+f.Name(), genkit.Handler(f))
}
log.Fatal(server.Start(ctx, "127.0.0.1:8080", mux))

Request/Response Format

Non-streaming request:

curl -X POST http://localhost:8080/jokeFlow \
  -H "Content-Type: application/json" \
  -d '{"data": "bananas"}'

Response: {"result": "Why did the banana go to the doctor?..."}

Streaming request:

curl -N -X POST http://localhost:8080/streamingJokeFlow \
  -H "Content-Type: application/json" \
  -d '{"data": "bananas"}'

Streaming responses use Server-Sent Events (SSE) format.

genkit.HandlerFunc

For frameworks that expect error-returning handlers:

handler := genkit.HandlerFunc(myFlow)
// handler is func(http.ResponseWriter, *http.Request) error

Context Providers

Inject request context (e.g., auth headers) into flow execution:

mux.HandleFunc("POST /myFlow", genkit.Handler(myFlow,
	genkit.WithContextProviders(func(ctx context.Context, rd core.RequestData) (api.ActionContext, error) {
		// rd.Headers contains HTTP headers
		return api.ActionContext{"userId": rd.Headers.Get("X-User-Id")}, nil
	}),
))

ListFlows

Get all registered flows for dynamic route setup:

flows := genkit.ListFlows(g) // []api.Action
for _, f := range flows {
	fmt.Println(f.Name())
}