aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAyke van Laethem <[email protected]>2024-10-31 11:20:09 +0100
committerRon Evans <[email protected]>2024-12-04 11:13:35 +0100
commit4aac3cd7b1ca59e339fcb0953e7641b8aac27cd2 (patch)
treec2a9c86787fda314f1875f6b026c7231948ca9ef
parent2588bf7fa047ad3d4fa02ef3e23eb21b1bb45677 (diff)
downloadtinygo-4aac3cd7b1ca59e339fcb0953e7641b8aac27cd2.tar.gz
tinygo-4aac3cd7b1ca59e339fcb0953e7641b8aac27cd2.zip
sync: implement WaitGroup using a futex
This prepares sync.WaitGroup for multithreading. Code size for the cooperative scheduler is nearly unchanged.
-rw-r--r--src/sync/waitgroup.go91
1 files changed, 61 insertions, 30 deletions
diff --git a/src/sync/waitgroup.go b/src/sync/waitgroup.go
index 72ef24c80..40306d932 100644
--- a/src/sync/waitgroup.go
+++ b/src/sync/waitgroup.go
@@ -3,35 +3,65 @@ package sync
import "internal/task"
type WaitGroup struct {
- counter uint
- waiters task.Stack
+ futex task.Futex
}
func (wg *WaitGroup) Add(delta int) {
- if delta > 0 {
- // Check for overflow.
- if uint(delta) > (^uint(0))-wg.counter {
- panic("sync: WaitGroup counter overflowed")
- }
+ switch {
+ case delta > 0:
+ // Delta is positive.
+ for {
+ // Check for overflow.
+ counter := wg.futex.Load()
+ if uint32(delta) > (^uint32(0))-counter {
+ panic("sync: WaitGroup counter overflowed")
+ }
- // Add to the counter.
- wg.counter += uint(delta)
- } else {
- // Check for underflow.
- if uint(-delta) > wg.counter {
- panic("sync: negative WaitGroup counter")
+ // Add to the counter.
+ if wg.futex.CompareAndSwap(counter, counter+uint32(delta)) {
+ // Successfully added.
+ return
+ }
}
+ default:
+ // Delta is negative (or zero).
+ for {
+ counter := wg.futex.Load()
- // Subtract from the counter.
- wg.counter -= uint(-delta)
+ // Check for underflow.
+ if uint32(-delta) > counter {
+ panic("sync: negative WaitGroup counter")
+ }
+
+ // Subtract from the counter.
+ if !wg.futex.CompareAndSwap(counter, counter-uint32(-delta)) {
+ // Could not swap, trying again.
+ continue
+ }
- // If the counter is zero, everything is done and the waiters should be resumed.
- // This code assumes that the waiters cannot wake up until after this function returns.
- // In the current implementation, this is always correct.
- if wg.counter == 0 {
- for t := wg.waiters.Pop(); t != nil; t = wg.waiters.Pop() {
- scheduleTask(t)
+ // If the counter is zero, everything is done and the waiters should
+ // be resumed.
+ // When there are multiple thread, there is a chance for the counter
+ // to go to zero, WakeAll to be called, and then the counter to be
+ // incremented again before a waiting goroutine has a chance to
+ // check the new (zero) value. However the last increment is
+ // explicitly given in the docs as something that should not be
+ // done:
+ //
+ // > Note that calls with a positive delta that occur when the
+ // > counter is zero must happen before a Wait.
+ //
+ // So we're fine here.
+ if counter-uint32(-delta) == 0 {
+ // TODO: this is not the most efficient implementation possible
+ // because we wake up all waiters unconditionally, even if there
+ // might be none. Though since the common usage is for this to
+ // be called with at least one waiter, it's probably fine.
+ wg.futex.WakeAll()
}
+
+ // Successfully swapped (and woken all waiting tasks if needed).
+ return
}
}
}
@@ -41,14 +71,15 @@ func (wg *WaitGroup) Done() {
}
func (wg *WaitGroup) Wait() {
- if wg.counter == 0 {
- // Everything already finished.
- return
- }
-
- // Push the current goroutine onto the waiter stack.
- wg.waiters.Push(task.Current())
+ for {
+ counter := wg.futex.Load()
+ if counter == 0 {
+ return // everything already finished
+ }
- // Pause until the waiters are awoken by Add/Done.
- task.Pause()
+ if wg.futex.Wait(counter) {
+ // Successfully woken by WakeAll (in wg.Add).
+ break
+ }
+ }
}