Writing a semaphore in Go

Published 09.01.2023 • Last modified 21.01.2024

Compared to Javascript, Python and other single threaded languages, Go takes a very different approach to

I was having difficulties understanding how concurrent Go programs are supposed to work. Coming from single-threaded event-driven languages like Dart and Javascript, it was not easy wrapping my head around things like mutexes or channels. I recently found the Little Book of Semaphores, which is a free ebook that teaches methods for synchronization in concurrent programs. It does this with a primitive called a “semaphore”. I highly recommend reading chapters 2 and 3 of the book to understand semaphores.

First I will explain the definition of a semaphore and write a working semaphore with Go channels and other primitives. Then I will show you how to implement other synchronization primitives in the sync package: Mutex, Cond, WaitGroup and RWMutex.


Basically, a semaphore should have these three functions:

type Semaphore struct {}

func NewSemaphore(value int) *Semaphore {}

func (sem *Semaphore) Wait() {}
func (sem *Semaphore) Signal() {}

A positive value represents the number of threads that can pass without blocking. If the value is negative, it represents the number of waiting threads.

Even though the semaphore has an integer value, it isn’t available to callers. This means, that:

The book refers to threads, which are identical to Go’s goroutines in the context of synchronization. I will use the terms goroutine and thread interchangeably.

Before writing a semaphore myself, I took a look at the golang.org/x/sync implementation of a weighted semaphore. Weighted means that you can assign smaller or bigger “units” of work to different tasks. I used this implementation as a base for my own. Since I’m trying to imitate the semaphores found in the book, I won’t need weights, and the implementation can be much simpler. Here is the first part:

type Semaphore struct {
	value   int
	mutex   *sync.Mutex
	waiters []chan struct{}
}

func NewSemaphore(value int) *Semaphore {
	return &Semaphore{
		value:   value,
		mutex:   &sync.Mutex{},
	}
}

There are three fields: value holds the current value. It is protected by the mutex, which guarantees that only one thread can access the value at once. The third field is a slice of channels sending the empty struct{} value. It’s purpose is to keep track of all of the blocking channels. With these fields in place, let’s implement the Wait() first.

func (sem *Semaphore) Wait() {
	sem.mutex.Lock()

	sem.value--
	if sem.value >= 0 {
		sem.mutex.Unlock()
		return
	}

	w := make(chan struct{})
	sem.waiters = append(sem.waiters, w)
	
	sem.mutex.Unlock()
	<-w
}

First we lock the mutex so that we can decrement the value safely. If the value is positive or zero, we can return early. Otherwise we create a new channel and wait block the thread. The placement of the mutex.Unlock() is very important here: we need to unlock it before using the channel so that any other calls to the semaphore may succeed. With that in place, let’s look at the Signal() function:

func (sem *Semaphore) Signal() {
	sem.mutex.Lock()

	if sem.value < 0 {
		w := sem.waiters[0]
		sem.waiters = sem.waiters[1:]
		close(w)
	}

	sem.value++
	sem.mutex.Unlock()
}

If the value is negative, we pop the first element of the slice into a variable w. We can then close the channel w as closing a channel will also unblock the receiving end.

Because we are popping the first waiter from the slice, Signal() will always wake up the thread that was waiting the longest. This ensures that none of the threads will be subject to thread starvation, or block indefinitely while others get unblocked.

That was it, we now have a fully working semaphore! We can try it out with the “Multiplex” example from the book.

Multiplex #


func main() {
	multiplex := NewSemaphore(2)

	// spawn 4 goroutines
	for i := 0; i < 4; i++ {
		go work(multiplex, i)
	}
	// wait for all of the goroutines to finish
	time.Sleep(2 * time.Second)
}

func work(multiplex *Semaphore, worker int) {
	multiplex.Wait()

	fmt.Println(worker, "->")

	time.Sleep(500 * time.Millisecond)

	fmt.Println(worker, "<-")

	multiplex.Signal()
}

In this example, we want to limit the number of simultaneous workers. We can do this by setting the semaphore’s initial value to 2. The first and second worker will call Wait(), but since the value is 2, both of them will pass. Every time a worker finishes, it will call Signal(), letting the next one through. This means that there is never more than two threads working at once.

With 4 goroutines, the output looks like this:

3 ->
0 ->
3 <-
1 ->
0 <-
2 ->
1 <-
2 <-

As you can see from the little arrow visualization, only two goroutines are allowed to be working at the same time. Every time a worker finishes, it signals to let another go through.

If set the semaphore’s initial value to 1, only one gets to go through at a time. That is also how a sync.Mutex works!

mutex := NewSemaphore(1)

mutex.Wait() // sync.Mutex.Lock()
mutex.Signal() // sync.Mutex.Unlock()

This is already working, but we can make it simpler. If you think about it, a Go channel is pretty similar to how a semaphore is supposed to work. Many goroutines can wait to receive a value from the channel at the same time. We also don’t need to store any additional information about the waiter. We can turn the []chan struct{} into a chan struct{}.

Here is the new version:

type Semaphore struct {
	value   int
	mutex   *sync.Mutex
	waiters chan struct{}
}

func NewSemaphore(value int) *Semaphore {
	return &Semaphore{
		value:   value,
		mutex:   &sync.Mutex{},
		waiters: make(chan struct{}),
	}
}

func (sem *Semaphore) Signal() {
	sem.mutex.Lock()

	if sem.value < 0 {
		select {
		case sem.waiters <- struct{}{}:
		default:
			panic("semaphore: trying to release nonexistent waiter")
		}
	}

	sem.value++
	sem.mutex.Unlock()
}

func (sem *Semaphore) Wait() {
	sem.mutex.Lock()
	sem.value--

	if sem.value >= 0 {
		sem.mutex.Unlock()
		return
	}

	sem.mutex.Unlock()
	<-sem.waiters
}

There is one big difference between the implementations: the order of unblocking threads is non-deterministic. The slice worked in a first-in-first-out fashion, but like this playground demonstrates, the order of receiving channels is random. This is actually fine, because semaphores don’t have to be deterministic. Though it may lead to thread starvation, where a thread is never unblocked when others are. You can see in the playground that sometimes a goroutine receives two or more times in a row, which means that at least one goroutine never receives from the channel.

Because sending to a channel will block if there is no receiver, we need to include the select statement, which simply panics if sending to the channel would block. In practice, this could only happen when the initial value of the semaphore is negative (which is an error on the developer)

Barrier #

We can test this implementation with another example from the book, “Barrier”:

var count int

const numThreads = 5

func work(mutex, barrier *Semaphore, worker int) {
	// only let one at a time through here
	mutex.Wait()
	count = count + 1
	mutex.Signal()
	// the last thread will unlock the barrier
	if count == numThreads {
		barrier.Signal()
		fmt.Printf("worker %d unlocked the barrier\n", worker)
	}
	// the first 4 threads will wait here
	barrier.Wait()
	barrier.Signal()
	fmt.Println("done worker", worker)
}

func main() {
	mutex := NewSemaphore(1)
	barrier := NewSemaphore(0)

	// spawn 5 threads
	for i := 0; i < 5; i++ {
		go work(mutex, barrier, i)
	}

	// wait for all of the threads to finish execution
	time.Sleep(100 * time.Millisecond)
}

The mutex semaphore works like a mutex does. It allows the first four threads to go through the mutex one at at time, fail the comparison and wait at the barrier. Only the last thread will succeed and open the barrier, letting all of them pass through the barrier. The output will look something like this:

worker 2 unlocked the barrier
done worker 2
done worker 4
done worker 0
done worker 1
done worker 3

We can also run this with Go’s race detector. It will reveal a data race in our code:

==================
WARNING: DATA RACE
Write at 0x00000063c670 by goroutine 8:
  main.work()
      ~/semaphore/main.go:54 +0x72
  main.main.func1()
      ~/semaphore/main.go:72 +0x58

Previous read at 0x00000063c670 by goroutine 7:
  main.work()
      ~/semaphore/main.go:57 +0x97
  main.main.func1()
      ~/semaphore/main.go:72 +0x58

Goroutine 8 (running) created at:
  main.main()
      ~/semaphore/main.go:72 +0x20c

Goroutine 7 (running) created at:
  main.main()
      ~/semaphore/main.go:72 +0x20c
==================
done worker 1
done worker 0
worker 4 unlocked the barrier
done worker 2
done worker 3
done worker 4
Found 1 data race(s)
exit status 66

I was struggling to understand why this was happening until I actually looked at where it happened:

func work(mutex, barrier *Semaphore, worker int) {
	mutex.Wait()
	count = count + 1
	mutex.Signal()

	if count == numThreads {
		barrier.Signal()
		fmt.Printf("worker %d unlocked the barrier\n", worker)
	}

	barrier.Wait()
	barrier.Signal()
	fmt.Println("done worker", worker)
}

The data race is only possible between these two lines. This may seem like an error, but it’s actually fine. Only the last comparison to numThreads counts, and in that case the count will always be correct. We could include the if statement in the mutex, but it would be slightly less efficient.

WaitGroup #

Writing a sync.WaitGroup using only semaphores is fairly simple. Instead of using one wait in the main goroutine, we have to use multiple.

func main() {
	a := NewSemaphore(0)

	count := 5

	for i := 0; i < count; i++ {
		go work(a, i)
	}

	for i := 0; i < count; i++ {
		a.Wait()
	}
	fmt.Println("end")
}

func work(a *Semaphore, worker int) {
	// sleep from 0 to 1000 milliseconds
	time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
	fmt.Println("finish", worker)
	a.Signal()
}

For every call to Signal(), there is a call to Wait() on the main thread. The main goroutine unblocks after every worker has finished their work.

Once again, the output will look as expected:

$ go build && time ./semaphore
finish 2
finish 4
finish 0
finish 1
finish 3
end

real    0m0.982s
user    0m0.008s
sys     0m0.000s

Cond #

sync.Cond has the ability to broadcast events to any number of goroutines. It is useful when you want to wake up any number of goroutines at once.

func main() {
	cond, reply := NewSemaphore(0), NewSemaphore(0)

	count := 5

	for i := 0; i < count; i++ {
		go work(cond, reply, i)
	}

	time.Sleep(500 * time.Millisecond)

	for i := 0; i < count; i++ {
		cond.Signal()
	}
	fmt.Println("sent all")

	for i := 0; i < count; i++ {
		reply.Wait()
	}

	fmt.Println("all messages received")
}

func work(cond, reply *Semaphore, worker int) {
	cond.Wait()

	fmt.Printf("worker #%d woke up \n", worker)

	reply.Signal()
}

If we know that there are N amount of listeners, we can signal N times to wake all of them up.

Output:

< 500 ms pause >
sent all
worker #0 woke up
worker #1 woke up
worker #2 woke up
worker #3 woke up
worker #4 woke up
all messages received

RWMutex #

Under construction…