aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/runtime/chan.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/runtime/chan.go')
-rw-r--r--src/runtime/chan.go120
1 files changed, 93 insertions, 27 deletions
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index 0d0cbf06a..e437798b0 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -30,11 +30,12 @@ package runtime
// non-select operations) so that the select operation knows which case did
// proceed.
// The value is at the same time also a way that goroutines can be the first
-// (and only) goroutine to 'take' a channel operation to change it from
-// 'waiting' to any other value. This is important for the select statement
-// because multiple goroutines could try to let different channels in the
-// select statement proceed at the same time. By using Task.Data, only a
-// single channel operation in the select statement can proceed.
+// (and only) goroutine to 'take' a channel operation using an atomic CAS
+// operation to change it from 'waiting' to any other value. This is important
+// for the select statement because multiple goroutines could try to let
+// different channels in the select statement proceed at the same time. By
+// using Task.Data, only a single channel operation in the select statement
+// can proceed.
// - It is possible for the channel queues to contain already-processed senders
// or receivers. This can happen when the select statement managed to proceed
// but the goroutine doing the select has not yet cleaned up the stale queue
@@ -49,15 +50,17 @@ import (
// The runtime implementation of the Go 'chan' type.
type channel struct {
- closed bool
- elementSize uintptr
- bufCap uintptr // 'cap'
- bufLen uintptr // 'len'
- bufHead uintptr
- bufTail uintptr
- senders chanQueue
- receivers chanQueue
- buf unsafe.Pointer
+ closed bool
+ selectLocked bool
+ elementSize uintptr
+ bufCap uintptr // 'cap'
+ bufLen uintptr // 'len'
+ bufHead uintptr
+ bufTail uintptr
+ senders chanQueue
+ receivers chanQueue
+ lock task.PMutex
+ buf unsafe.Pointer
}
const (
@@ -73,7 +76,8 @@ type chanQueue struct {
// Pus the next channel operation to the queue. All appropriate fields must have
// been initialized already.
-// This function must be called with interrupts disabled.
+// This function must be called with interrupts disabled and the channel lock
+// held.
func (q *chanQueue) push(node *channelOp) {
node.next = q.first
q.first = node
@@ -99,8 +103,8 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
newDataValue := chanOp | popped.index<<2
// Try to be the first to proceed with this goroutine.
- if popped.task.DataUint32() == chanOperationWaiting {
- popped.task.SetDataUint32(newDataValue)
+ swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
+ if swapped {
return popped
}
}
@@ -108,7 +112,8 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
// Remove the given to-be-removed node from the queue if it is part of the
// queue. If there are multiple, only one will be removed.
-// This function must be called with interrupts disabled.
+// This function must be called with interrupts disabled and the channel lock
+// held.
func (q *chanQueue) remove(remove *channelOp) {
n := &q.first
for *n != nil {
@@ -159,8 +164,8 @@ func chanCap(c *channel) int {
}
// Push the value to the channel buffer array, for a send operation.
-// This function may only be called when interrupts are disabled and it is known
-// there is space available in the buffer.
+// This function may only be called when interrupts are disabled, the channel is
+// locked and it is known there is space available in the buffer.
func (ch *channel) bufferPush(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
ch.bufLen++
@@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {
// Pop a value from the channel buffer and store it in the 'value' pointer, for
// a receive operation.
-// This function may only be called when interrupts are disabled and it is known
-// there is at least one value available in the buffer.
+// This function may only be called when interrupts are disabled, the channel is
+// locked and it is known there is at least one value available in the buffer.
func (ch *channel) bufferPop(value unsafe.Pointer) {
elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
ch.bufLen--
@@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
}
// Try to proceed with this send operation without blocking, and return whether
-// the send succeeded. Interrupts must be disabled when calling this function.
+// the send succeeded. Interrupts must be disabled and the lock must be held
+// when calling this function.
func (ch *channel) trySend(value unsafe.Pointer) bool {
// To make sure we send values in the correct order, we can only send
// directly to a receiver when there are no values in the buffer.
@@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}
mask := interrupt.Disable()
+ ch.lock.Lock()
// See whether we can proceed immediately, and if so, return early.
if ch.trySend(value) {
+ ch.lock.Unlock()
interrupt.Restore(mask)
return
}
@@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
op.index = 0
op.value = value
ch.senders.push(op)
+ ch.lock.Unlock()
interrupt.Restore(mask)
// Wait until this goroutine is resumed.
+ // It might be resumed after Unlock() and before Pause(). In that case,
+ // because we use semaphores, the Pause() will continue immediately.
task.Pause()
// Check whether the sent happened normally (not because the channel was
@@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
}
// Try to proceed with this receive operation without blocking, and return
-// whether the receive operation succeeded. Interrupts must be disabled when
-// calling this function.
+// whether the receive operation succeeded. Interrupts must be disabled and the
+// lock must be held when calling this function.
func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
// To make sure we keep the values in the channel in the correct order, we
// first have to read values from the buffer before we can look at the
@@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
}
mask := interrupt.Disable()
+ ch.lock.Lock()
if received, ok := ch.tryRecv(value); received {
+ ch.lock.Unlock()
interrupt.Restore(mask)
return ok
}
@@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
op.task = t
op.index = 0
ch.receivers.push(op)
+ ch.lock.Unlock()
interrupt.Restore(mask)
// Wait until the goroutine is resumed.
@@ -335,9 +349,11 @@ func chanClose(ch *channel) {
}
mask := interrupt.Disable()
+ ch.lock.Lock()
if ch.closed {
// Not allowed by the language spec.
+ ch.lock.Unlock()
interrupt.Restore(mask)
runtimePanic("close of closed channel")
}
@@ -370,14 +386,56 @@ func chanClose(ch *channel) {
ch.closed = true
+ ch.lock.Unlock()
interrupt.Restore(mask)
}
+// We currently use a global select lock to avoid deadlocks while locking each
+// individual channel in the select. Without this global lock, two select
+// operations that have a different order of the same channels could end up in a
+// deadlock. This global lock is inefficient if there are many select operations
+// happening in parallel, but gets the job done.
+//
+// If this becomes a performance issue, we can see how the Go runtime does this.
+// I think it does this by sorting all states by channel address and then
+// locking them in that order to avoid this deadlock.
+var chanSelectLock task.PMutex
+
+// Lock all channels (taking care to skip duplicate channels).
+func lockAllStates(states []chanSelectState) {
+ if !hasParallelism {
+ return
+ }
+ for _, state := range states {
+ if state.ch != nil && !state.ch.selectLocked {
+ state.ch.lock.Lock()
+ state.ch.selectLocked = true
+ }
+ }
+}
+
+// Unlock all channels (taking care to skip duplicate channels).
+func unlockAllStates(states []chanSelectState) {
+ if !hasParallelism {
+ return
+ }
+ for _, state := range states {
+ if state.ch != nil && state.ch.selectLocked {
+ state.ch.lock.Unlock()
+ state.ch.selectLocked = false
+ }
+ }
+}
+
// chanSelect implements blocking or non-blocking select operations.
// The 'ops' slice must be set if (and only if) this is a blocking select.
func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
mask := interrupt.Disable()
+ // Lock everything.
+ chanSelectLock.Lock()
+ lockAllStates(states)
+
const selectNoIndex = ^uint32(0)
selectIndex := selectNoIndex
selectOk := true
@@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// return early.
blocking := len(ops) != 0
if selectIndex != selectNoIndex || !blocking {
+ unlockAllStates(states)
+ chanSelectLock.Unlock()
interrupt.Restore(mask)
return selectIndex, selectOk
}
@@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// become more complicated.
// We add ourselves as a sender/receiver to every channel, and wait for the
// first one to complete. Only one will successfully complete, because
- // senders and receivers will check t.Data for the state so that only one
- // will be able to "take" this select operation.
+ // senders and receivers use a compare-and-exchange atomic operation on
+ // t.Data so that only one will be able to "take" this select operation.
t := task.Current()
t.Ptr = recvbuf
t.SetDataUint32(chanOperationWaiting)
@@ -438,6 +498,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}
// Now we wait until one of the send/receive operations can proceed.
+ unlockAllStates(states)
+ chanSelectLock.Unlock()
interrupt.Restore(mask)
task.Pause()
@@ -445,6 +507,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
// Make sure all channel ops are removed from the senders/receivers
// queue before we return and the memory of them becomes invalid.
+ chanSelectLock.Lock()
+ lockAllStates(states)
for i, state := range states {
if state.ch == nil {
continue
@@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
}
interrupt.Restore(mask)
}
+ unlockAllStates(states)
+ chanSelectLock.Unlock()
// Pull the return values out of t.Data (which contains two bitfields).
selectIndex = t.DataUint32() >> 2