After Jared’s excellent introduction to Go concurrency and his look at patterns of Go channel usage I would like to share a channel-based ring buffer in Go based on channels that we developed for the Loggregator Server in CloudFoundry (CF).
CloudFoundry’s Loggregator Server
The goal of Loggregator is to allow application developers to tail the logs of their applications when these are running on CF. The central component of this is the Loggregator server which routes incoming messages. One of the key requirements for this server is that all developers get their logs fairly and that a malicious developer can not cause message loss for other developers by writing very fast loggers or really slow log consumers.
The following drawing shows the basic mechanism of message distribution (every sprocket is a goroutine). Messages come into the system on the left and are processed by the main processing loop which determines whether a message ids match and should thus be forwarded to a particular consumer. Every consumer forwarder has an internal incoming queue, which it takes messages out of to forward to the external consumer.
Congestion in a naive implementation
If a consumer, say consumer 1, slows down it is going to fill up its incoming channel over time. When it is full that channel will block the main message processing loop. A buffered channel will cause the same problem when the buffer runs full.
A channel-based ring buffer solution
Channels and goroutines to the rescue!
The idea is simple: Connect two buffered channels through one goroutine that forwards messages from the incoming channel to the outgoing channel. Whenever a new message can not be placed on on the outgoing channel, take one message out of the outgoing channel (that is the oldest message in the buffer), drop it, and place the new message in the newly freed up outgoing channel.
In the following code snippet we are use int
as our messages.
package main
import "fmt"
type RingBuffer struct {
inputChannel <-chan int
outputChannel chan int
}
func NewRingBuffer(inputChannel <-chan int, outputChannel chan int) *RingBuffer {
return &RingBuffer{inputChannel, outputChannel}
}
func (r *RingBuffer) Run() {
for v := range r.inputChannel {
select {
case r.outputChannel <- v:
default:
<-r.outputChannel
r.outputChannel <- v
}
}
close(r.outputChannel)
}
func main() {
in := make(chan int)
out := make(chan int, 5)
rb := NewRingBuffer(in, out)
go rb.Run()
for i := 0; i < 10; i++ {
in <- i
}
close(in)
for res := range out {
fmt.Println(res)
}
}
//Prints:
//4
//5
//6
//7
//8
//9
//
//Program exited.
Plugging in this “channel struct” will never block and will simply behave like a ring buffer. That is, slower consumers might loose (their oldest) messages, but will never be able to block the main message processing loop.
Here is this idea at work in the Loggregator server source code.
Other solutions
A few packages are available that implement ring buffers in a more classic way by using slices and moving pointers: e.g., container/ring and gringo.
The problem with these implementations is that they need locking to be used concurrently. In the case of container/ring
proper locking needs to be ensured by the user of the package. In the case of gringo you will see extensive locking throughout the package when looking at the source code.