Samy Ghannad

How to separate generation and execution of work in golang?

I’m a hardcore fan of separation of concerns and have hugely benefited from applying this concept to most areas of my life, including Software Development projects.
A problem and its solution is much more understandable when concerns are not mixed together, it’s much easier to see the bigger picture, understand relationships, finding root causes and adapting change, the result is better management of resources, whether its human capital or time.
Imagine digging through a messy pile of unrelated details trying to find a root cause of a bug while customers are complaining and management is nervously asking for progress every 10 minutes. and you, as the person responsible for this matter, are going on a long awaited vacation tomorrow. and there’s big advertisement campaign starting in 5 hours. and it’s raining hard & heavy, like very hard and heavy. and you have to pee.
Not a dream situation to wakeup to, right? Even thinking about it gives me chills…again.
Yep, I’ve been in this situation, that’s a story for another time, but one thing for sure: I don’t want you there.

There are many cases where one part of a program “produces” some form of data structure which is then “consumed” by another part, this is the perfect occasion to separate these two responsibilities.
One way to to achieve this is by implementing a producer-consumer pattern, remember that design patterns are, as their name implies, patterns, they are a pattern of thinking about a specific kind of situation, a mindset to carry on and internalize in your software developer mindset.
Implementing producer-consumer pattern is simple to do in Go, or any other language if you ask me.
Take this sample program for example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
	"fmt"
)

func main() {
	done:=make(chan struct{})
	tasks := TaskProducer(done)
	TaskConsumer(tasks)
	<-done
	fmt.Println("Done!")
}

func TaskProducer(done chan struct{}) <-chan int {
	TasksQueue := make(chan int)
	go func() {
		defer close(TasksQueue)
		defer close(done)
		for i := 0; i < 10; i++ {
			TasksQueue <- i
		}
	}()
	return TasksQueue
}
func TaskConsumer(TasksQueue <-chan int) {
	go func() {
		for task := range TasksQueue {
			fmt.Println(task)
		}
	}()
}

It has three main parts, a Task Producer which returns a channel that gets filled with tasks over time, a Task Consumer that consumes tasks one by one, and the main function which acts as the orchestrator of the whole thing.

Task Producer

TaskProducer function is responsible for producing tasks, simple, right?
When it’s called, first it makes a channel called TaskQueue, then spawns a go routing in another execution thread (don’t get it confused with threads as in multithreading) , and immediately return the TaskQueue channel.
Take note that if we didn’t use a go routine to fill the channel in parallel, we would end up with a deadlock very soon because after making the TaskQueue, the next step would be to fill the channel, and as the channel is unbuffered it would send one and only one task into the channel and wait for it to be consumed which would never happen in this case.
So by using a go routine to fill the channel we do not wait for the filling work to be done before we can return it.

There’s also a done channel carrying struct{} kind of packages. struct{} is a data structure strictly used for signaling, it’s just empty and can’t contain anything.
This channel is being used for a synchronization purposes only, i.e. waiting for all go routine to finish before allowing main to return. As you can see on line 11, the <-done statement blocks the main function from returning until it succeeds, and when is that? That’s exactly when we close it, quoting directly from Golang docs :

After the last value has been received from a closed channel c, any receive from c will succeed without blocking, returning the zero value for the channel element.

- Golang Docs
We could also define done channel as a global variable and use it inside the TaskProducer instead of passing it as an argument, but personally I’d rather be able to instantly get a feeling of how a function works simply by looking at its signature.
In the global variable scenario it is not obvious how synchronization is done unless you read the function body, another reason is that I just don’t like global variables, I just don’t, they always end up being used by something that was not supposed to do so. Unless there’s a concrete reason to use a global variable, I generally avoid it.
That’s all there’s about TaskProducer.

Task Consumer

TaskConsumer simply ranges over the channel it receives as an argument in a go routine. Why did I use a go routine here? Simple, because in the main function I want the execution flow to continue instantly after calling TaskConsumer and get blocked on <-done statement waiting for it to close.
Of course it was possible to do so by not using a go routine:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
	"fmt"
)

func main() {
	done:=make(chan struct{})
	tasks := TaskProducer(done)
	TaskConsumer(tasks)
	<-done
	fmt.Println("Done!")
}

func TaskProducer(done chan struct{}) <-chan int {
	TasksQueue := make(chan int)
	go func() {
		defer close(TasksQueue)
		defer close(done)
		for i := 0; i < 10; i++ {
			TasksQueue <- i
		}
	}()
	return TasksQueue
}
func TaskConsumer(TasksQueue <-chan int) {
		for task := range TasksQueue {
			fmt.Println(task)
		}
}

Without using a go routine the TaskConsumer would block main until it is done reading from the channel i.e. channel closes, and there was no need for the done channel but there’s a little catch here.
Using this approach I will not be able to scale and expand my consumer code beyond one go routine, I can’t have more than one consumer at a time because each call to TaskConsumer would be run in the main execution thread sequentially. We’ll see about it later on.

Making it better

There’s one big problem with the way synchronization is done here, although not exactly related to the title but I can’t just let it slip.
Using the done channel for synchronization one might ask How do multiple producers may work then? Which one is going to signal the end of operation? Obviously a single producer can’t send a signal or close the channel without making sure other producers are also finished, right?
A better approach would be to use a so called wait group, I leave that to you to read more about it, but it is used like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main

import (
	"fmt"
	"sync"
)

func main() {
	Dispatcher()
	fmt.Println("Done!")
}

func Dispatcher() {
	TasksQueue := make(chan string)
	var wg sync.WaitGroup
	wg.Add(2)
	go func() {
		defer wg.Done()
		runProducers(2, TasksQueue)
	}()
	go func() {
		defer wg.Done()
		runConsumers(3, TasksQueue)
	}()
	wg.Wait()

}

func runProducers(numberOfProducers int, TasksQueue chan string) {
	var wgProducers sync.WaitGroup
	for i := 0; i < numberOfProducers; i++ {
		name := fmt.Sprintf("Producer#%d", i)
		wgProducers.Add(1)
		go func() {
			defer wgProducers.Done()
			TaskProducer(name, TasksQueue)
		}()
	}
	wgProducers.Wait()
	close(TasksQueue)
}

func runConsumers(numberOfConsumers int, TasksQueue chan string) {
	var wgConsumers sync.WaitGroup

	for i := 0; i < numberOfConsumers; i++ {
		name := fmt.Sprintf("Consumer #%d", i)
		wgConsumers.Add(1)
		go func() {
			defer wgConsumers.Done()
			TaskConsumer(name, TasksQueue)
		}()
	}
	wgConsumers.Wait()
}

func TaskProducer(name string, TasksQueue chan<- string) {
	for i := 0; i < 10; i++ {
		TasksQueue <- fmt.Sprintf("%s-%d", name, i)
	}
}

func TaskConsumer(name string, TasksQueue <-chan string) {
	for task := range TasksQueue {
		fmt.Println(name, " read ", task)
	}
}

What I did here is that I added 3 new functions, a Dispatcher to abstract the internals of running multiple Producers and Consumers from top level main function, and 2 other functions,runProducers & runConsumers to run multiple instances of TaskProducer and TaskConsumer using multiple go routines.
For each consumer/producer, we add 1 to the respective wait group, and by calling Done() we subtract 1 from the wait group’s total number. Wait() is a blocking statement and waits until a wait group’s total reaches 0, when It’s equal to 0 it means all go routines have called their Done() meaning they have all finished doing their job.
In the Dispatcher(), we’re running runProducers & runConsumers in two different go routines then wait for do their jobs and return. After Dispatcher() is done Wait()ing, fmt.Println("Done!") is executed and we’re finished!