State Function Pattern
A functional approach to state machines where each state is a function that returns the next state to execute. This pattern eliminates switch statements and makes state transitions explicit and testable.
Origins
This pattern was introduced by Rob Pike in his 2011 talk "Lexical Scanning in Go". The key insight: instead of using an enum to represent state and a switch statement to handle transitions, make the state itself a function that returns the next state.
Rob Pike's Lexer
From the text/template/parse package:
// stateFn represents the state of the scanner as a function that returns the next state.
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L109-L110
type stateFn func(*lexer) stateFn
// lexer holds the state of the scanner.
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L112-L127
type lexer struct {
name string // the name of the input; used only for error reports
input string // the string being scanned
leftDelim string // start of action marker
rightDelim string // end of action marker
pos Pos // current position in the input
start Pos // start position of this item
atEOF bool // we have hit the end of input and returned eof
parenDepth int // nesting depth of ( ) exprs
line int // 1+number of newlines seen
startLine int // start line of this item
item item // item to return to parser
insideAction bool // are we inside an action?
options lexOptions
}
The nextItem Function (Demand-Driven)
The key insight: the state machine runs on demand. The parser calls nextItem(), which runs the machine until a token is ready:
// nextItem returns the next item from the input.
// Called by the parser, not in the lexing goroutine.
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L226-L238
func (l *lexer) nextItem() item {
l.item = item{itemEOF, l.pos, "EOF", l.startLine}
state := lexText
if l.insideAction {
state = lexInsideAction
}
for {
state = state(l)
if state == nil {
return l.item
}
}
}
State Functions and Emit
State functions process input. emit stores a token and returns nil to pause:
// emit passes an item back to the parser.
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L178-L181
func (l *lexer) emit(t itemType) stateFn {
return l.emitItem(l.thisItem(t))
}
// emitItem passes the specified item to the parser.
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L183-L187
func (l *lexer) emitItem(i item) stateFn {
l.item = i
return nil // Return nil to pause, letting parser pick up the item
}
// lexText scans until an opening action delimiter, "{{".
// https://github.com/golang/go/blob/master/src/text/template/parse/lex.go#L269-L298
func lexText(l *lexer) stateFn {
if x := strings.Index(l.input[l.pos:], l.leftDelim); x >= 0 {
if x > 0 {
l.pos += Pos(x)
if len(i.val) > 0 {
return l.emitItem(i) // Emit text token, pause
}
}
return lexLeftDelim
}
l.pos = Pos(len(l.input))
if l.pos > l.start {
l.line += strings.Count(l.input[l.start:l.pos], "\n")
return l.emit(itemText) // Emit final text, pause
}
return l.emit(itemEOF) // Emit EOF, pause
}
Key insight: emit returns nil to pause execution. The caller (nextItem) returns the emitted item. When called again, execution resumes from where it left off.
Applying to Pipelines
For long-running analysis pipelines with SSE progress updates, use the exact same pattern:
// StateFn represents a pipeline step as a function that returns the next step.
type StateFn func(*Analysis) StateFn
// Analysis holds the state machine state.
type Analysis struct {
ID string
Brand string
Step int
State State
Error error
Failures []ConversationFailure
Data AnalysisData
// For state machine
progress Progress // Progress to emit (like lexer's item)
state StateFn // Current state function
// Context for cancellation
ctx context.Context
cancel context.CancelFunc
deps *Dependencies
}
type Progress struct {
Stage State
Completed int
Total int
Error string
Failures []ConversationFailure
Done bool // True if StateCompleted or StateError
}
The nextStep Function (Demand-Driven)
Like nextItem, the caller calls nextStep() to get the next progress update:
// nextStep returns the next progress from the analysis.
// Called by the HTTP handler, not in the state machine goroutine.
// Returns (Progress, true) if progress was emitted, (Progress{}, false) if done.
func (a *Analysis) nextStep() (Progress, bool) {
// Initialize on first call
if a.state == nil {
a.state = a.stepQueryProfile
}
a.progress = Progress{Stage: StatePending} // Default
for {
select {
case <-a.ctx.Done():
a.Error = a.ctx.Err()
a.State = StateError
return Progress{Stage: StateError, Error: a.ctx.Err().Error(), Done: true}, false
default:
}
a.state = a.state(a)
if a.state == nil {
// Emitted progress or terminal state
p := a.progress
if p.Stage == StateCompleted || p.Stage == StateError {
p.Done = true
}
// Prepare for next call (resume from next state if stored)
if a.nextState != nil {
a.state = a.nextState
a.nextState = nil
a.Step++
}
return p, !p.Done
}
}
}
State Functions with Emit
// emit stores progress and returns nil to pause.
func (a *Analysis) emit(p Progress) StateFn {
a.progress = p
return nil
}
// stepQueryProfile queries the brand profile.
func (a *Analysis) stepQueryProfile() StateFn {
a.State = StateProfiling
a.persist()
profile, err := a.deps.profiler.Query(a.ctx, a.Brand)
if err != nil {
return a.stepError(fmt.Errorf("query brand profile: %w", err))
}
a.Data.Profile = profile
if err := a.saveBrandProfile(profile); err != nil {
return a.stepError(fmt.Errorf("save brand profile: %w", err))
}
// Emit progress, resume with stepGenerateTopics
return a.emit(Progress{Stage: StateProfiling, Next: a.stepGenerateTopics})
}
// stepGenerateTopics generates topics for the brand.
func (a *Analysis) stepGenerateTopics() StateFn {
a.State = StateTopics
a.persist()
topics, err := a.deps.topics.Generate(a.ctx, a.Data.Profile)
if err != nil {
return a.stepError(fmt.Errorf("generate topics: %w", err))
}
a.Data.Topics = topics
if err := a.saveTopics(topics); err != nil {
return a.stepError(fmt.Errorf("save topics: %w", err))
}
return a.emit(Progress{Stage: StateTopics, Next: a.stepGenerateConversations})
}
// stepGenerateConversations generates conversations for all topics/agents.
func (a *Analysis) stepGenerateConversations() StateFn {
a.State = StateConversations
a.persist()
total := len(a.Data.Topics) * len(a.deps.Agents)
completed := 0
var failures []ConversationFailure
for _, topic := range a.Data.Topics {
for agentKey, agent := range a.deps.Agents {
convo, err := agent.Run(a.ctx, topic.Description)
if err != nil {
failures = append(failures, ConversationFailure{
Topic: topic.Title,
Agent: agentKey,
Error: err,
})
} else {
if err := a.saveConversation(topic.ID, agentKey, convo); err != nil {
return a.stepError(fmt.Errorf("save conversation: %w", err))
}
for _, resource := range convo.Resources() {
a.deps.affQueue.Enqueue(AffiliationJob{
AnalysisID: a.ID,
ResourceID: resource.URI,
BrandInfo: a.Data.Profile,
})
}
}
completed++
}
}
a.Failures = failures
return a.emit(Progress{
Stage: StateConversations,
Completed: completed,
Total: total,
Failures: failures,
Next: a.stepQueueAffiliation,
})
}
func (a *Analysis) stepQueueAffiliation() StateFn {
a.State = StateAffiliationQueued
a.persist()
return a.emit(Progress{
Stage: StateAffiliationQueued,
Data: fmt.Sprintf("%d resources queued", len(a.allResources())),
Next: a.stepCompleted,
})
}
func (a *Analysis) stepCompleted() StateFn {
a.State = StateCompleted
a.persist()
return a.emit(Progress{Stage: StateCompleted, Done: true})
}
// stepError records the error and emits terminal progress.
func (a *Analysis) stepError(err error) StateFn {
a.Error = err
a.State = StateError
a.persist()
return a.emit(Progress{Stage: StateError, Error: err.Error(), Done: true})
}
Caller Usage (HTTP Handler)
The caller drives the state machine by calling nextStep():
func (h *handler) runAnalysis(w http.ResponseWriter, r *http.Request) {
analysis := NewAnalysis(brand, h.deps)
// Run in background
go func() {
for {
progress, more := analysis.nextStep()
// Send SSE to client
h.sendSSE(progress)
if !more {
break // Done (completed or error)
}
// Check context cancellation
select {
case <-r.Context().Done():
analysis.cancel() // Cancel the analysis
return
default:
}
}
}()
}
Alternative: Store Next State in Analysis
If Progress shouldn't contain function pointers:
type Analysis struct {
// ... other fields ...
state StateFn // Current state
nextState StateFn // Where to resume after emit
}
func (a *Analysis) emitThen(p Progress, next StateFn) StateFn {
a.progress = p
a.nextState = next
return nil
}
// In nextStep(), after state returns nil:
if a.nextState != nil {
a.state = a.nextState
a.nextState = nil
a.Step++
} else if a.progress.Stage == StateCompleted || a.progress.Stage == StateError {
// Terminal - no next state
return p, false
}
Key Principles
- Demand-driven - Caller calls
nextStep()to advance the machine - emit returns nil - Pauses execution to let caller consume progress
- No error return value -
StateFnreturns onlyStateFn(following Rob Pike) - No logging in states - Errors transition to
stepErrorterminal state - Context for cancellation - Pass context through Analysis for proper cleanup
Benefits
- Caller controls execution - Like parser calling
nextItem() - No switch statement - States are functions
- Proper error handling - Errors are terminal states
- Rob Pike's original pattern - Exact same structure
- Testable - Each state function testable in isolation