Concurrent pipeline
A concurrent pipeline consists of several stages that work concurrently and are connected by channels. It is similar to a UNIX pipeline command, where the first stage produces a value, the next stages perform an operation on a value, and the last stage delivers the result to output. For example, this command has three stages, each connected by a pipe (|):
- Stage 1:
echoprints its arguments to STDOUT. - Stage 2:
trtranslates the output from stage 1, deleting the whitespace. - Stage 3:
wccounts the characters in the output.
echo 'this is a test' | tr -d ' ' | wc -c
Building a pipeline
In Go, a concurrent pipeline consists of any of the following components:
- Producer: Produces messages and sends them to the next stage.
- Throttler: Slows the passage of messages between producer and consumer.
- Dispatcher: Specialized goroutine that manages a worker pool of goroutines.
runPipeline: Function that ties everything together- Iterator: A function type that standardizes how values are retrieved by the pipeline’s consumer.
Configuration
This example is a CLI app that uses the following configuration:
type config struct {
url string
n int
c int
rps int
}
This configuration is set with an Options type that you can initialize with different configurations:
type Options struct {
Concurrency int
RPS int
Send SendFunc
}
// Defaults returns the default [Options].
func Defaults() Options {
return withDefaults(Options{})
}
func withDefaults(o Options) Options {
if o.Concurrency == 0 {
o.Concurrency = 1
}
if o.Send == nil {
o.Send = func(r *http.Request) Result {
return Send(http.DefaultClient, r)
}
}
return o
}
Producer
- Only has an output channel
- initializes a buffered channel, launches a goroutine that sends the request to the output channel
ntimes- buffered channels hold only one value at a time and block until the value is received from another channel. When this producer sends the last value on the channel, the
defer closeline runs. This guarantees that the channel closes and that all values are delivered.
- buffered channels hold only one value at a time and block until the value is received from another channel. When this producer sends the last value on the channel, the
- Returns an output channel (receive channel type) after filling it with the given number of requests.
- The next stage
func produce(n int, req *http.Request) <-chan *http.Request {
out := make(chan *http.Request)
go func() {
defer close(out)
for range n {
out <- req
}
}()
return out
}
Throttler
- Because this sends HTTP requests, we add a throttler to slow the message flow so we don’t overload the server
- Gets messages from the producer with
in.inis a receive channel (receive data from the channel) - Returns a receive-only channel, which means that the caller can only receive from this channel, not send
- Uses a time.Ticker to implement the delay. A ticker sends the current time on a channel at regular intervals (
delay).<-t.Csends a value, but since we don’t need to do anything with its value, we discard it. The program blocks untilt.Csends the value.
func throttle(in <-chan *http.Request, delay time.Duration) <-chan *http.Request {
out := make(chan *http.Request)
go func() {
defer close(out)
t := time.NewTicker(delay)
for r := range in {
<-t.C
out <- r
}
}()
return out
}
Dispatcher
- Uses the fan-out pattern, which distributes incoming tasks across goroutines.
- Because it accepts a
SendFuncthat handles the HTTP logic, the dispatcher is decoupled from the request-handling specifics. - Creates multiple goroutines to send HTTP reqs to the server concurrently and gather results.
- Takes a recieve-only channel of HTTP requests, concurrency value, and a function of type
SendFunc, which returns aResult. - Returns a receive-only channel
- The first goroutine ranges over the
inchannel of HTTP requests, sends the requests with the givenSendFunc, and stores the returnedResulttypes in theoutchannel that is returned by the Dispatcher. - the goroutine with
wg.Waitis the “monitoring” goroutine, which monitors the active workers and closes the output channel when all worker goroutines complete their work (callwg.Done()). The monitoring goroutine blocks onwg.Wait()until its counter hits 0, then it closes the output channel.
func dispatch(in <-chan *http.Request, concurrency int, send SendFunc) <-chan Result {
out := make(chan Result)
var wg sync.WaitGroup
wg.Add(concurrency)
for range concurrency {
go func() {
defer wg.Done()
for req := range in {
out <- send(req)
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Running the pipeline
- Produce
nrequests and put them in therequestschannel. - If
RPSis set in the config, then send the channel to the throttler. Delay each request by theRPSvalue.- If
RPSis not set, skip the throttler and go directly to the dispatcher.
- If
- Return the dispatcher’s
outchannel, which is a receive-only channel ofResultvalues.
func runPipeline(n int, req *http.Request, opts Options) <-chan Result {
requests := produce(n, req)
if opts.RPS > 0 {
requests = throttle(
requests, time.Second/time.Duration(opts.RPS),
)
}
return dispatch(requests, opts.Concurrency, opts.Send)
}
The caller would assign the output of runPipeline to a receive channel (a channel you read data from). For example, this function assigns runPipeline to results, and then uses results as the values in an iterator that reads from the channel until it is empty:
func SendN(n int, req *http.Request, opts Options) (Results, error) {
opts = withDefaults(opts)
if n <= 0 {
return nil, fmt.Errorf("n must be positive: got %d,", n)
}
results := runPipeline(n, req, opts) // assignment
return func(yield func(Result) bool) { // iterator yield function
for result := range results {
if !yield(result) {
return
}
}
}, nil
}