diff options
-rw-r--r-- | src/internal/task/task.go | 7 | ||||
-rw-r--r-- | src/runtime/chan.go | 120 | ||||
-rw-r--r-- | src/runtime/scheduler_cooperative.go | 8 | ||||
-rw-r--r-- | src/runtime/scheduler_none.go | 3 |
4 files changed, 108 insertions, 30 deletions
diff --git a/src/internal/task/task.go b/src/internal/task/task.go index 587c67526..546f5ba11 100644 --- a/src/internal/task/task.go +++ b/src/internal/task/task.go @@ -27,7 +27,7 @@ type Task struct { } // DataUint32 returns the Data field as a uint32. The value is only valid after -// setting it through SetDataUint32. +// setting it through SetDataUint32 or by storing to it using DataAtomicUint32. func (t *Task) DataUint32() uint32 { return *(*uint32)(unsafe.Pointer(&t.Data)) } @@ -38,6 +38,11 @@ func (t *Task) SetDataUint32(val uint32) { *(*uint32)(unsafe.Pointer(&t.Data)) = val } +// DataAtomicUint32 returns the Data field as an atomic-if-needed Uint32 value. +func (t *Task) DataAtomicUint32() *Uint32 { + return (*Uint32)(unsafe.Pointer(&t.Data)) +} + // getGoroutineStackSize is a compiler intrinsic that returns the stack size for // the given function and falls back to the default stack size. It is replaced // with a load from a special section just before codegen. diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 0d0cbf06a..e437798b0 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -30,11 +30,12 @@ package runtime // non-select operations) so that the select operation knows which case did // proceed. // The value is at the same time also a way that goroutines can be the first -// (and only) goroutine to 'take' a channel operation to change it from -// 'waiting' to any other value. This is important for the select statement -// because multiple goroutines could try to let different channels in the -// select statement proceed at the same time. By using Task.Data, only a -// single channel operation in the select statement can proceed. +// (and only) goroutine to 'take' a channel operation using an atomic CAS +// operation to change it from 'waiting' to any other value. This is important +// for the select statement because multiple goroutines could try to let +// different channels in the select statement proceed at the same time. By +// using Task.Data, only a single channel operation in the select statement +// can proceed. // - It is possible for the channel queues to contain already-processed senders // or receivers. This can happen when the select statement managed to proceed // but the goroutine doing the select has not yet cleaned up the stale queue @@ -49,15 +50,17 @@ import ( // The runtime implementation of the Go 'chan' type. type channel struct { - closed bool - elementSize uintptr - bufCap uintptr // 'cap' - bufLen uintptr // 'len' - bufHead uintptr - bufTail uintptr - senders chanQueue - receivers chanQueue - buf unsafe.Pointer + closed bool + selectLocked bool + elementSize uintptr + bufCap uintptr // 'cap' + bufLen uintptr // 'len' + bufHead uintptr + bufTail uintptr + senders chanQueue + receivers chanQueue + lock task.PMutex + buf unsafe.Pointer } const ( @@ -73,7 +76,8 @@ type chanQueue struct { // Pus the next channel operation to the queue. All appropriate fields must have // been initialized already. -// This function must be called with interrupts disabled. +// This function must be called with interrupts disabled and the channel lock +// held. func (q *chanQueue) push(node *channelOp) { node.next = q.first q.first = node @@ -99,8 +103,8 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp { newDataValue := chanOp | popped.index<<2 // Try to be the first to proceed with this goroutine. - if popped.task.DataUint32() == chanOperationWaiting { - popped.task.SetDataUint32(newDataValue) + swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue) + if swapped { return popped } } @@ -108,7 +112,8 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp { // Remove the given to-be-removed node from the queue if it is part of the // queue. If there are multiple, only one will be removed. -// This function must be called with interrupts disabled. +// This function must be called with interrupts disabled and the channel lock +// held. func (q *chanQueue) remove(remove *channelOp) { n := &q.first for *n != nil { @@ -159,8 +164,8 @@ func chanCap(c *channel) int { } // Push the value to the channel buffer array, for a send operation. -// This function may only be called when interrupts are disabled and it is known -// there is space available in the buffer. +// This function may only be called when interrupts are disabled, the channel is +// locked and it is known there is space available in the buffer. func (ch *channel) bufferPush(value unsafe.Pointer) { elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize) ch.bufLen++ @@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) { // Pop a value from the channel buffer and store it in the 'value' pointer, for // a receive operation. -// This function may only be called when interrupts are disabled and it is known -// there is at least one value available in the buffer. +// This function may only be called when interrupts are disabled, the channel is +// locked and it is known there is at least one value available in the buffer. func (ch *channel) bufferPop(value unsafe.Pointer) { elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize) ch.bufLen-- @@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) { } // Try to proceed with this send operation without blocking, and return whether -// the send succeeded. Interrupts must be disabled when calling this function. +// the send succeeded. Interrupts must be disabled and the lock must be held +// when calling this function. func (ch *channel) trySend(value unsafe.Pointer) bool { // To make sure we send values in the correct order, we can only send // directly to a receiver when there are no values in the buffer. @@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) { } mask := interrupt.Disable() + ch.lock.Lock() // See whether we can proceed immediately, and if so, return early. if ch.trySend(value) { + ch.lock.Unlock() interrupt.Restore(mask) return } @@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) { op.index = 0 op.value = value ch.senders.push(op) + ch.lock.Unlock() interrupt.Restore(mask) // Wait until this goroutine is resumed. + // It might be resumed after Unlock() and before Pause(). In that case, + // because we use semaphores, the Pause() will continue immediately. task.Pause() // Check whether the sent happened normally (not because the channel was @@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) { } // Try to proceed with this receive operation without blocking, and return -// whether the receive operation succeeded. Interrupts must be disabled when -// calling this function. +// whether the receive operation succeeded. Interrupts must be disabled and the +// lock must be held when calling this function. func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) { // To make sure we keep the values in the channel in the correct order, we // first have to read values from the buffer before we can look at the @@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool { } mask := interrupt.Disable() + ch.lock.Lock() if received, ok := ch.tryRecv(value); received { + ch.lock.Unlock() interrupt.Restore(mask) return ok } @@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool { op.task = t op.index = 0 ch.receivers.push(op) + ch.lock.Unlock() interrupt.Restore(mask) // Wait until the goroutine is resumed. @@ -335,9 +349,11 @@ func chanClose(ch *channel) { } mask := interrupt.Disable() + ch.lock.Lock() if ch.closed { // Not allowed by the language spec. + ch.lock.Unlock() interrupt.Restore(mask) runtimePanic("close of closed channel") } @@ -370,14 +386,56 @@ func chanClose(ch *channel) { ch.closed = true + ch.lock.Unlock() interrupt.Restore(mask) } +// We currently use a global select lock to avoid deadlocks while locking each +// individual channel in the select. Without this global lock, two select +// operations that have a different order of the same channels could end up in a +// deadlock. This global lock is inefficient if there are many select operations +// happening in parallel, but gets the job done. +// +// If this becomes a performance issue, we can see how the Go runtime does this. +// I think it does this by sorting all states by channel address and then +// locking them in that order to avoid this deadlock. +var chanSelectLock task.PMutex + +// Lock all channels (taking care to skip duplicate channels). +func lockAllStates(states []chanSelectState) { + if !hasParallelism { + return + } + for _, state := range states { + if state.ch != nil && !state.ch.selectLocked { + state.ch.lock.Lock() + state.ch.selectLocked = true + } + } +} + +// Unlock all channels (taking care to skip duplicate channels). +func unlockAllStates(states []chanSelectState) { + if !hasParallelism { + return + } + for _, state := range states { + if state.ch != nil && state.ch.selectLocked { + state.ch.lock.Unlock() + state.ch.selectLocked = false + } + } +} + // chanSelect implements blocking or non-blocking select operations. // The 'ops' slice must be set if (and only if) this is a blocking select. func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) { mask := interrupt.Disable() + // Lock everything. + chanSelectLock.Lock() + lockAllStates(states) + const selectNoIndex = ^uint32(0) selectIndex := selectNoIndex selectOk := true @@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO // return early. blocking := len(ops) != 0 if selectIndex != selectNoIndex || !blocking { + unlockAllStates(states) + chanSelectLock.Unlock() interrupt.Restore(mask) return selectIndex, selectOk } @@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO // become more complicated. // We add ourselves as a sender/receiver to every channel, and wait for the // first one to complete. Only one will successfully complete, because - // senders and receivers will check t.Data for the state so that only one - // will be able to "take" this select operation. + // senders and receivers use a compare-and-exchange atomic operation on + // t.Data so that only one will be able to "take" this select operation. t := task.Current() t.Ptr = recvbuf t.SetDataUint32(chanOperationWaiting) @@ -438,6 +498,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO } // Now we wait until one of the send/receive operations can proceed. + unlockAllStates(states) + chanSelectLock.Unlock() interrupt.Restore(mask) task.Pause() @@ -445,6 +507,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO // Make sure all channel ops are removed from the senders/receivers // queue before we return and the memory of them becomes invalid. + chanSelectLock.Lock() + lockAllStates(states) for i, state := range states { if state.ch == nil { continue @@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO } interrupt.Restore(mask) } + unlockAllStates(states) + chanSelectLock.Unlock() // Pull the return values out of t.Data (which contains two bitfields). selectIndex = t.DataUint32() >> 2 diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 5c4dfd5bf..91ba86409 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -21,6 +21,12 @@ import ( // queue a new scheduler invocation using setTimeout. const asyncScheduler = GOOS == "js" +const hasScheduler = true + +// Concurrency is not parallelism. While the cooperative scheduler has +// concurrency, it does not have parallelism. +const hasParallelism = false + // Queues used by the scheduler. var ( runqueue task.Queue @@ -248,5 +254,3 @@ func run() { }() scheduler(false) } - -const hasScheduler = true diff --git a/src/runtime/scheduler_none.go b/src/runtime/scheduler_none.go index 7775b360e..a5acfd430 100644 --- a/src/runtime/scheduler_none.go +++ b/src/runtime/scheduler_none.go @@ -6,6 +6,9 @@ import "internal/task" const hasScheduler = false +// No goroutines are allowed, so there's no parallelism anywhere. +const hasParallelism = false + // run is called by the program entry point to execute the go program. // With the "none" scheduler, init and the main function are invoked directly. func run() { |