Go Concurrency: GoRoutines, Worker Pools and Throttling Made Simple

GoRoutines, WaitGroups, Workers.

Go’s concurrency is simple and very powerful, and here’s the main use cases and how to use them.

Setting the field

For the sake of these examples, here’s the structs we’ll be using:

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

Launch and forget

By far the easiest form of concurrency, the launch-and-forget is very easy in Go.

You must simply prepend the function you want to run asynchronously with go, and that’s all.

i.e.

go doSomething()

Alternatively, you can wrap a code of block with go func() { and }():

go func() {
    // do multiple things
}()

Keep in mind that if your program finishes before your goroutine, your goroutine may be cut short:

func main() {
    go func() {
        time.Sleep(time.Second * 3)
        println("World") // <------ this will never execute because the program will have already exited!
    }()
    println("Hello")
}

If you want to prevent that from happening, you might have to resort to a WaitGroup (I’ll talk about this later) or a channel to tell the program to wait until everything is done:

func main() {
    finished := make(chan bool)
    go func() {
        time.Sleep(time.Second * 3)
        println("World")
        finished <- true
    }()
    println("Hello")
    <- finished
}

If you’re curious about what finished := make(chan bool), finished <- true and <- finished does:

finished := make(chan bool) creates a channel that expects booleans as data type, finished <- true sends the data true to the channel finished and <- finished waits for data to be sent to the channel finished.

Note that the result does not change whether you send true or false. In fact, you could’ve even used string instead of bool. The crucial part is that the program will wait for the channel to receive something before moving on.

Technically, wg.Wait() is just like having one finished channel for each job.

In fact, channels are very powerful. For instance, the following:

func main() {
    worldChannel := make(chan string)
    dearChannel := make(chan string)
    go func() {
        time.Sleep(time.Second * 3)
        worldChannel <- "world"
    }()
    go func() {
        time.Sleep(time.Second * 2)
        dearChannel <- "dear"
    }()
    println("Hello", <- dearChannel, <- worldChannel)
}

would output Hello dear world in ~3 seconds, because the highest denominator here is the first goroutine that takes 3 seconds to execute, as opposed to the second goroutine, which, by the time the first goroutine finishes, will already have sent its data through its own channel.

Anyways, I digress.

Launch and return

In order to return the result of multiple jobs, we need to make sure to wait that all jobs are done before returning.

To do that, we’ll use sync.WaitGroup, which will allow us to add the number of jobs that needs to be completed to the WaitGroup and keep track of when all jobs are completed.

Furthermore, because unlike the previous examples, we need to keep track of the results, this means that we’d be risking making concurrent changes to a slice, which would cause problems. To prevent that, we’ll use sync.RWMutex, which will allow us to prevent concurrent changes. Note that there any many alternatives to this, but this is the easiest method.

Because we want to return the result of each job, our function signature will look something like this:

func launchAndReturn(jobs []Job) []JobResult {

We’ll instantiate our slice of results, RWMutex as well as WaitGroup:

    var (
        results      []JobResult
        resultsMutex = sync.RWMutex{}
        wg           sync.WaitGroup
    )

Add the number of jobs in the WaitGroup

    wg.Add(len(jobs))

And now that we’ve set everything up, we can iterate over each job, and start a new goroutine for each entry.

    for _, job := range jobs {
        go func(job Job) {

To make sure that the job is marked as done at the end of each goroutines, we’ll defer wg.Done() right now. This is important, because if wg.Done() is not called as often as len(jobs) from the earlier wg.Add(len(jobs)), the WaitGroup will hang forever later on (we’ll get to this).

            defer wg.Done()

Finally, we can execute the job

            jobResult := doSomething(job)

Now that we’ve executed the job asynchronously, we have to store the result. First, we need to lock the RWMutex, second, append the result, and finally, unlock the mutex

            resultsMutex.Lock()
            results = append(results, jobResult)
            resultsMutex.Unlock()

And now we can close our goroutine

        }(job)
    }

The last step is to wait for the WaitGroup to complete by using wg.Wait(), and then we can return the results.

    wg.Wait()
    return results
}

That’s all! This will concurrently execute each jobs, and wait for all of them to complete. Here’s the final result:

package main

import (
    "fmt"
    "sync"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

func main() {
    var jobs []Job
    for i := 0; i < 5; i++ {
        jobs = append(jobs, Job{Id: i})
    }
    jobResults := launchAndReturn(jobs)
    fmt.Println(jobResults)
}

func launchAndReturn(jobs []Job) []JobResult {
    var (
        results      []JobResult
        resultsMutex = sync.RWMutex{}
        wg           sync.WaitGroup
    )
    wg.Add(len(jobs))

    for _, job := range jobs {
        go func(job Job) {
            defer wg.Done()
            jobResult := doSomething(job)
            resultsMutex.Lock()
            results = append(results, jobResult)
            resultsMutex.Unlock()
        }(job)
    }

    wg.Wait()
    return results
}

func doSomething(job Job) JobResult {
    fmt.Printf("Running job #%d\n", job.Id)
    return JobResult{Output: "Success"}
}

Output:

Running job #0
Running job #4
Running job #2
Running job #3
Running job #1
[{Success} {Success} {Success} {Success} {Success}]

Launch, throttle and forget

There’s two kinds of throttling: maximum number of concurrent workers, and number of jobs execute per unit of time.

Because another example on throttling concurrency will be introduced in the next section, I’ll take care of the former case here: maximum number of concurrent workers.

There are a lot of ways to throttle, so I’ll be mentioning my favorite, which also happens to be the easiest and the simplest. It involves having each workers on a different goroutine, and having each workers listen to the same channel for new jobs.

As I just mentioned the worker (which is just a function) will be launched on a different goroutine, and will listen to the channel for new jobs available:

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {
    defer wg.Done()
    for job := range jobChannel {
        doSomething(id, job)
    }
}

You might be wondering, “Why is there a for loop on a channel when it’s not even a slice?”

Iterating over a channel means you’re listening to the channel, and this listening will continue until the channel is closed. Note that multiple workers will be listening to the same channel, but not to worry since Go channels is made for the producer-consumer problem.

We’ll start by creating some fake jobs.

func main() {
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

Now, we’ll create our WaitGroup and we’ll create a constant with the desired number of workers

    const NumberOfWorkers = 10
    var wg sync.WaitGroup

Because each worker will run on a different goroutine, we also need to set our WaitGroup to wait for our number of workers, as opposed to the previous use case which required us to pass the total number of jobs instead.

    wg.Add(NumberOfWorkers)
    jobChannel := make(chan Job)

Now that our channel is ready, we can start the workers. For now, they’ll just sit here and wait, since we haven’t sent the jobs to the channel yet.

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel)
    }

We send the jobs to the channel here, which is being read on by our workers. Note that a channel is like a pointer to a queue, meaning that two workers who are reading from the same queue won’t get the same jobs.

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

Since we already sent each jobs to the channel, we can close the channel and wait for the workers to finish.

    close(jobChannel)
    wg.Wait()
}

Final result:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

const NumberOfWorkers = 10

func main() {
    start := time.Now()
    // Create fake jobs for testing purposes
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

    var (
        wg         sync.WaitGroup
        jobChannel = make(chan Job)
    )
    wg.Add(NumberOfWorkers)

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

    close(jobChannel)
    wg.Wait()
    fmt.Printf("Took %s\n", time.Since(start))
}

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {
    defer wg.Done()
    for job := range jobChannel {
        doSomething(id, job)
    }
}

func doSomething(workerId int, job Job) JobResult {
    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)
    time.Sleep(1 * time.Second)
    return JobResult{Output: "Success"}
}

Launch, throttle and return

This use case is a mix of both previous sections, but you won’t need a RWMutex, because we’ll be reading the results synchronously.

Unlike before, our worker function now has a new parameter called jobResultChannel where we’ll send our results.

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, jobResultChannel chan JobResult) {
    defer wg.Done()
    for job := range jobChannel {
        resultChannel <- doSomething(id, job)
    }
}

Nothing new here.

func main() {
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }
    const NumberOfWorkers = 10

Unlike previously, we’ll create another channel for the output as well as a slice to store our results.

    var (
        wg               sync.WaitGroup
        jobChannel       = make(chan Job)
        jobResultChannel = make(chan JobResult, len(jobs))
        jobResults       []JobResult
    )
    wg.Add(NumberOfWorkers)

Start the workers and send the jobs to the channel which is being read by the workers.

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel, jobResultChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

Now that we’ve already sent all jobs to the channel, we can close it. Likewise, since the workers also took care of sending the result to the jobResultChannel, we can close it too.

    close(jobChannel)
    wg.Wait()
    close(jobResultChannel)

Read all JobResults from the channel (this is synchronous), and then do whatever you want with the results.

    // Receive job results from workers
    for result := range jobResultChannel {
        jobResults = append(jobResults, result)
    }

    fmt.Println(jobResults)
}

Final result:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    Id int
}

type JobResult struct {
    Output string
}

const NumberOfWorkers = 10

func main() {
    start := time.Now()

    // Create fake jobs for testing purposes
    var jobs []Job
    for i := 0; i < 100; i++ {
        jobs = append(jobs, Job{Id: i})
    }

    var wg sync.WaitGroup
    wg.Add(NumberOfWorkers)
    jobChannel := make(chan Job)
    jobResultChannel := make(chan JobResult, len(jobs))

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel, jobResultChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

    close(jobChannel)
    wg.Wait()
    close(jobResultChannel)

    var jobResults []JobResult
    // Receive job results from workers
    for result := range jobResultChannel {
        jobResults = append(jobResults, result)
    }

    fmt.Println(jobResults)
    fmt.Printf("Took %s", time.Since(start))
}

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job, resultChannel chan JobResult) {
    defer wg.Done()
    for job := range jobChannel {
        resultChannel <- doSomething(id, job)
    }
}

func doSomething(workerId int, job Job) JobResult {
    fmt.Printf("Worker #%d Running job #%d\n", workerId, job.Id)
    time.Sleep(500 * time.Millisecond)
    return JobResult{Output: "Success"}
}

Time-based throttling

Unlike the two previous examples, which both throttle based on total number of workers, this one throttles both based on a maximum number of workers as well as a maximum number of executions per second.

By now, you get the hang of it, so we’ll go directly to the final result.

The only difference is, for testing purposes, we’ll use a random execution time for the doSomething function.

const (
    NumberOfWorkers                    = 10
    MaximumNumberOfExecutionsPerSecond = 50
)

func main() {
    start := time.Now()
    // Create fake jobs for testing purposes
    var jobs []Job
    for i := 0; i < 500; i++ {
        jobs = append(jobs, Job{Id: i})
    }

    var (
        wg         sync.WaitGroup
        jobChannel = make(chan Job)
    )
    wg.Add(NumberOfWorkers)

    // Start the workers
    for i := 0; i < NumberOfWorkers; i++ {
        go worker(i, &wg, jobChannel)
    }

    // Send jobs to worker
    for _, job := range jobs {
        jobChannel <- job
    }

    close(jobChannel)
    wg.Wait()
    fmt.Printf("Took %s\n", time.Since(start))
}

func worker(id int, wg *sync.WaitGroup, jobChannel <-chan Job) {
    defer wg.Done()
    lastExecutionTime := time.Now()
    minimumTimeBetweenEachExecution := time.Duration(math.Ceil(1e9/(MaximumNumberOfExecutionsPerSecond/float64(NumberOfWorkers))))
    for job := range jobChannel {
        timeUntilNextExecution := - (time.Since(lastExecutionTime) - minimumTimeBetweenEachExecution)
        if timeUntilNextExecution > 0 {
            fmt.Printf("Worker #%d backing off for %s\n", id, timeUntilNextExecution.String())
            time.Sleep(timeUntilNextExecution)
        } else {
            fmt.Printf("Worker #%d not backing off\n", id)
        }
        lastExecutionTime = time.Now()
        doSomething(id, job)
    }
}

func doSomething(workerId int, job Job) JobResult {
    simulatedExecutionTime := rand.Intn(1000)
    fmt.Printf("Worker #%d Running job #%d (simulatedExecutionTime=%dms)\n", workerId, job.Id, simulatedExecutionTime)
    time.Sleep(time.Duration(simulatedExecutionTime) * time.Millisecond)
    return JobResult{Output: "Success"}
}

What this does is divide the total number of workers by the maximum amount of calls per second, and throttles each worker to what the result of that equation is. It’s not the most accurate way of doing it, since some workers may get several jobs with long execution time in a row while other workers may get several jobs with short execution time in a row, but it’s the easiest way of doing it and it’s generally good enough.

In other words, if you have NumberOfWorkers set to 2 and MaximumNumberOfExecutionsPerSecond set to 10, each workers would be throttled to calling doSomething() at most once every 200ms.

Extra

Beside the patterns shown above, I have one more I really like and occasionally use to greatly improve the performance of some applications.

Using channel to perform bulk operations

Go makes it really easy to create a goroutine, and it’s not uncommon to pick up the habit of looking toward even more concurrency the moment you want to improve performance, but the truth is, a single worker performing bulk operations might be much better than several workers each performing one operation each.

const MaxBulkSize = 50

func main() {
    jobChannel := make(chan Job, 1000)
    go worker(jobChannel)
    for i := 0; i < 50000; i++ {
        jobChannel <- Job{Id: i}
    }
    // wait for channel to be empty
    for len(jobChannel) != 0 {
        time.Sleep(100*time.Millisecond)
    }
}

func worker(jobChannel <-chan Job) {
    var jobs []Job
    for {
        if len(jobChannel) > 0 && len(jobs) < MaxBulkSize {
            jobs = append(jobs, <-jobChannel)
            continue
        }
        if (len(jobChannel) == 0 && len(jobs) > 0) || len(jobs) == MaxBulkSize {
            fmt.Printf("processing bulk of %d jobs\n", len(jobs))
            // clear the list of jobs that were just processed
            jobs = jobs[:0]
        }
        // No jobs in the channel? back off.
        if len(jobChannel) == 0 {
            fmt.Println("Backing off")
            time.Sleep(500 * time.Millisecond)
        }
    }
}

What’s even more interesting is that using the job channel provides a form of rate limiting out of the box.

In fact, if this was an operation you’d do based on HTTP requests you have no control over the speed as opposed to a list of jobs which you have control over, all you’d have to do is check if the length of the channel is the same as the capacity of the channel, and if the length has reached the capacity, return 429: Too many requests.

e.g.:

if len(jobChannel) == cap(jobChannel) {
    // reached job channel capacity, return 429 here
    return
}
jobChannel <- Job{Id: i}