The Pipeline Pattern is a powerful programming approach used to process data sequentially through a series of stages, with each stage transforming the data before passing it along. In Go, this pattern leverages channels and goroutines to build efficient, concurrent workflows.
-
Stages:
- Each stage performs a specific task (e.g., computation, transformation) and passes the result to the next stage via an output channel.
-
Channels:
- Channels act as the medium for data transfer between stages, ensuring thread-safe communication.
-
Concurrency:
- Each stage runs as an independent goroutine, allowing multiple stages to execute simultaneously.
-
Termination:
- Pipelines must handle termination gracefully, typically using a
done
channel orcontext.Context
.
- Pipelines must handle termination gracefully, typically using a
Input ----> [Stage 1] ----> [Stage 2] ----> [Stage n] ----> Output
- Input: The source of data (e.g., numbers, files, or requests).
- Stages: Goroutines that process the data sequentially.
- Output: The final processed result.
The following code demonstrates a simple pipeline that generates integers, squares them, and prints the results:
package main
import "fmt"
// Stage 1: Generates integers
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: Squares integers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// Build the pipeline
nums := generator(1, 2, 3, 4, 5)
squared := square(nums)
// Consume the output
for result := range squared {
fmt.Println("Result:", result)
}
}
generator
: Sends a series of integers to a channel.square
: Receives integers, squares them, and sends the results to another channel.- Main Function: Connects the stages and consumes the final output.
Result: 1
Result: 4
Result: 9
Result: 16
Result: 25
The next example incorporates a done
channel to allow graceful termination of the pipeline:
package main
import "fmt"
// Stage 1: Generate integers
func generator(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
// Stage 2: Squares integers
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
// Stage 3: Doubles squared values
func doubler(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * 2:
case <-done:
return
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
// Build the pipeline
nums := generator(done, 1, 2, 3, 4, 5)
squared := square(done, nums)
doubled := doubler(done, squared)
// Consume the output
for result := range doubled {
fmt.Println("Result:", result)
}
}
done
Channel: Signals all stages to stop processing and exit cleanly.- Resource Cleanup: Ensures that goroutines terminate instead of leaking memory.
The Pipeline Pattern is widely used in real-world scenarios for its modularity and concurrency. Some common applications include:
-
Data Streaming:
- Example: Processing real-time logs, sensor data, or API responses.
-
ETL Pipelines:
- Example: Extracting data from a database, transforming it, and loading it into another system.
-
Image/Video Processing:
- Example: Applying filters, resizing, or compressing images in sequential stages.
-
Parallel Computation:
- Example: Dividing a complex task into smaller stages for better CPU utilization.
-
Task Automation:
- Example: Automating sequential tasks like data fetching, processing, and saving.
-
Avoid Deadlocks:
- Ensure all channels are properly closed at the end of each stage.
-
Use
done
orcontext.Context
for Termination:- This prevents resource leaks by signaling stages to stop when no longer needed.
-
Minimize Blocking:
- Avoid long-running operations in a single stage, as it can delay downstream stages.
-
Error Propagation:
- Use an error channel or context to pass errors between stages.
The Pipeline Pattern is an essential design technique for building concurrent, modular, and efficient systems in Go. It is particularly useful when processing sequential data in scenarios such as real-time streams, task workflows, and data pipelines.
By starting with simple examples and gradually incorporating advanced features like cancellation and error handling, you can effectively use the Pipeline Pattern to solve real-world problems. Its ability to combine concurrency with modularity makes it a go-to tool for developers working on scalable and maintainable Go applications.