aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--compiler/channel.go37
-rw-r--r--compiler/testdata/channel.ll52
-rw-r--r--compiler/testdata/goroutine-cortex-m-qemu-tasks.ll4
-rw-r--r--compiler/testdata/goroutine-wasm-asyncify.ll4
-rw-r--r--src/runtime/chan.go848
5 files changed, 383 insertions, 562 deletions
diff --git a/compiler/channel.go b/compiler/channel.go
index 9969835e8..7e867c278 100644
--- a/compiler/channel.go
+++ b/compiler/channel.go
@@ -41,17 +41,17 @@ func (b *builder) createChanSend(instr *ssa.Send) {
b.CreateStore(chanValue, valueAlloca)
}
- // Allocate blockedlist buffer.
- channelBlockedList := b.getLLVMRuntimeType("channelBlockedList")
- channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList")
+ // Allocate buffer for the channel operation.
+ channelOp := b.getLLVMRuntimeType("channelOp")
+ channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op")
// Do the send.
- b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "")
+ b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "")
// End the lifetime of the allocas.
// This also works around a bug in CoroSplit, at least in LLVM 8:
// https://bugs.llvm.org/show_bug.cgi?id=41742
- b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize)
+ b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize)
if !isZeroSize {
b.emitLifetimeEnd(valueAlloca, valueAllocaSize)
}
@@ -72,12 +72,12 @@ func (b *builder) createChanRecv(unop *ssa.UnOp) llvm.Value {
valueAlloca, valueAllocaSize = b.createTemporaryAlloca(valueType, "chan.value")
}
- // Allocate blockedlist buffer.
- channelBlockedList := b.getLLVMRuntimeType("channelBlockedList")
- channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList")
+ // Allocate buffer for the channel operation.
+ channelOp := b.getLLVMRuntimeType("channelOp")
+ channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op")
// Do the receive.
- commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "")
+ commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "")
var received llvm.Value
if isZeroSize {
received = llvm.ConstNull(valueType)
@@ -85,7 +85,7 @@ func (b *builder) createChanRecv(unop *ssa.UnOp) llvm.Value {
received = b.CreateLoad(valueType, valueAlloca, "chan.received")
b.emitLifetimeEnd(valueAlloca, valueAllocaSize)
}
- b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize)
+ b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize)
if unop.CommaOk {
tuple := llvm.Undef(b.ctx.StructType([]llvm.Type{valueType, b.ctx.Int1Type()}, false))
@@ -198,10 +198,10 @@ func (b *builder) createSelect(expr *ssa.Select) llvm.Value {
if expr.Blocking {
// Stack-allocate operation structures.
// If these were simply created as a slice, they would heap-allocate.
- chBlockAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelBlockedList"), len(selectStates))
- chBlockAlloca, chBlockSize := b.createTemporaryAlloca(chBlockAllocaType, "select.block.alloca")
- chBlockLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false)
- chBlockPtr := b.CreateGEP(chBlockAllocaType, chBlockAlloca, []llvm.Value{
+ opsAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelOp"), len(selectStates))
+ opsAlloca, opsSize := b.createTemporaryAlloca(opsAllocaType, "select.block.alloca")
+ opsLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false)
+ opsPtr := b.CreateGEP(opsAllocaType, opsAlloca, []llvm.Value{
llvm.ConstInt(b.ctx.Int32Type(), 0, false),
llvm.ConstInt(b.ctx.Int32Type(), 0, false),
}, "select.block")
@@ -209,15 +209,18 @@ func (b *builder) createSelect(expr *ssa.Select) llvm.Value {
results = b.createRuntimeCall("chanSelect", []llvm.Value{
recvbuf,
statesPtr, statesLen, statesLen, // []chanSelectState
- chBlockPtr, chBlockLen, chBlockLen, // []channelBlockList
+ opsPtr, opsLen, opsLen, // []channelOp
}, "select.result")
// Terminate the lifetime of the operation structures.
- b.emitLifetimeEnd(chBlockAlloca, chBlockSize)
+ b.emitLifetimeEnd(opsAlloca, opsSize)
} else {
- results = b.createRuntimeCall("tryChanSelect", []llvm.Value{
+ opsPtr := llvm.ConstNull(b.dataPtrType)
+ opsLen := llvm.ConstInt(b.uintptrType, 0, false)
+ results = b.createRuntimeCall("chanSelect", []llvm.Value{
recvbuf,
statesPtr, statesLen, statesLen, // []chanSelectState
+ opsPtr, opsLen, opsLen, // []channelOp (nil slice)
}, "select.result")
}
diff --git a/compiler/testdata/channel.ll b/compiler/testdata/channel.ll
index 65e18dea8..68982d051 100644
--- a/compiler/testdata/channel.ll
+++ b/compiler/testdata/channel.ll
@@ -3,7 +3,7 @@ source_filename = "channel.go"
target datalayout = "e-m:e-p:32:32-p10:8:8-p20:8:8-i64:64-n32:64-S128-ni:1:10:20"
target triple = "wasm32-unknown-wasi"
-%runtime.channelBlockedList = type { ptr, ptr, ptr, { ptr, i32, i32 } }
+%runtime.channelOp = type { ptr, ptr, i32, ptr }
%runtime.chanSelectState = type { ptr, ptr }
; Function Attrs: allockind("alloc,zeroed") allocsize(0)
@@ -18,15 +18,15 @@ entry:
}
; Function Attrs: nounwind
-define hidden void @main.chanIntSend(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 {
+define hidden void @main.chanIntSend(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 {
entry:
- %chan.blockedList = alloca %runtime.channelBlockedList, align 8
+ %chan.op = alloca %runtime.channelOp, align 8
%chan.value = alloca i32, align 4
call void @llvm.lifetime.start.p0(i64 4, ptr nonnull %chan.value)
store i32 3, ptr %chan.value, align 4
- call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList)
- call void @runtime.chanSend(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.blockedList, ptr undef) #4
- call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList)
+ call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op)
+ call void @runtime.chanSend(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.op, ptr undef) #4
+ call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op)
call void @llvm.lifetime.end.p0(i64 4, ptr nonnull %chan.value)
ret void
}
@@ -34,48 +34,48 @@ entry:
; Function Attrs: nocallback nofree nosync nounwind willreturn memory(argmem: readwrite)
declare void @llvm.lifetime.start.p0(i64 immarg, ptr nocapture) #3
-declare void @runtime.chanSend(ptr dereferenceable_or_null(32), ptr, ptr dereferenceable_or_null(24), ptr) #1
+declare void @runtime.chanSend(ptr dereferenceable_or_null(36), ptr, ptr dereferenceable_or_null(16), ptr) #1
; Function Attrs: nocallback nofree nosync nounwind willreturn memory(argmem: readwrite)
declare void @llvm.lifetime.end.p0(i64 immarg, ptr nocapture) #3
; Function Attrs: nounwind
-define hidden void @main.chanIntRecv(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 {
+define hidden void @main.chanIntRecv(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 {
entry:
- %chan.blockedList = alloca %runtime.channelBlockedList, align 8
+ %chan.op = alloca %runtime.channelOp, align 8
%chan.value = alloca i32, align 4
call void @llvm.lifetime.start.p0(i64 4, ptr nonnull %chan.value)
- call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList)
- %0 = call i1 @runtime.chanRecv(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.blockedList, ptr undef) #4
+ call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op)
+ %0 = call i1 @runtime.chanRecv(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.op, ptr undef) #4
call void @llvm.lifetime.end.p0(i64 4, ptr nonnull %chan.value)
- call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList)
+ call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op)
ret void
}
-declare i1 @runtime.chanRecv(ptr dereferenceable_or_null(32), ptr, ptr dereferenceable_or_null(24), ptr) #1
+declare i1 @runtime.chanRecv(ptr dereferenceable_or_null(36), ptr, ptr dereferenceable_or_null(16), ptr) #1
; Function Attrs: nounwind
-define hidden void @main.chanZeroSend(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 {
+define hidden void @main.chanZeroSend(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 {
entry:
- %chan.blockedList = alloca %runtime.channelBlockedList, align 8
- call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList)
- call void @runtime.chanSend(ptr %ch, ptr null, ptr nonnull %chan.blockedList, ptr undef) #4
- call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList)
+ %chan.op = alloca %runtime.channelOp, align 8
+ call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op)
+ call void @runtime.chanSend(ptr %ch, ptr null, ptr nonnull %chan.op, ptr undef) #4
+ call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op)
ret void
}
; Function Attrs: nounwind
-define hidden void @main.chanZeroRecv(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 {
+define hidden void @main.chanZeroRecv(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 {
entry:
- %chan.blockedList = alloca %runtime.channelBlockedList, align 8
- call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList)
- %0 = call i1 @runtime.chanRecv(ptr %ch, ptr null, ptr nonnull %chan.blockedList, ptr undef) #4
- call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList)
+ %chan.op = alloca %runtime.channelOp, align 8
+ call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op)
+ %0 = call i1 @runtime.chanRecv(ptr %ch, ptr null, ptr nonnull %chan.op, ptr undef) #4
+ call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op)
ret void
}
; Function Attrs: nounwind
-define hidden void @main.selectZeroRecv(ptr dereferenceable_or_null(32) %ch1, ptr dereferenceable_or_null(32) %ch2, ptr %context) unnamed_addr #2 {
+define hidden void @main.selectZeroRecv(ptr dereferenceable_or_null(36) %ch1, ptr dereferenceable_or_null(36) %ch2, ptr %context) unnamed_addr #2 {
entry:
%select.states.alloca = alloca [2 x %runtime.chanSelectState], align 8
%select.send.value = alloca i32, align 4
@@ -88,7 +88,7 @@ entry:
store ptr %ch2, ptr %0, align 4
%.repack3 = getelementptr inbounds [2 x %runtime.chanSelectState], ptr %select.states.alloca, i32 0, i32 1, i32 1
store ptr null, ptr %.repack3, align 4
- %select.result = call { i32, i1 } @runtime.tryChanSelect(ptr undef, ptr nonnull %select.states.alloca, i32 2, i32 2, ptr undef) #4
+ %select.result = call { i32, i1 } @runtime.chanSelect(ptr undef, ptr nonnull %select.states.alloca, i32 2, i32 2, ptr null, i32 0, i32 0, ptr undef) #4
call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %select.states.alloca)
%1 = extractvalue { i32, i1 } %select.result, 0
%2 = icmp eq i32 %1, 0
@@ -105,7 +105,7 @@ select.body: ; preds = %select.next
br label %select.done
}
-declare { i32, i1 } @runtime.tryChanSelect(ptr, ptr, i32, i32, ptr) #1
+declare { i32, i1 } @runtime.chanSelect(ptr, ptr, i32, i32, ptr, i32, i32, ptr) #1
attributes #0 = { allockind("alloc,zeroed") allocsize(0) "alloc-family"="runtime.alloc" "target-features"="+bulk-memory,+mutable-globals,+nontrapping-fptoint,+sign-ext" }
attributes #1 = { "target-features"="+bulk-memory,+mutable-globals,+nontrapping-fptoint,+sign-ext" }
diff --git a/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll b/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll
index f149f3a0c..a57bb20f3 100644
--- a/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll
+++ b/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll
@@ -135,13 +135,13 @@ entry:
declare i32 @runtime.sliceCopy(ptr nocapture writeonly, ptr nocapture readonly, i32, i32, i32, ptr) #2
; Function Attrs: nounwind
-define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #1 {
+define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #1 {
entry:
call void @runtime.chanClose(ptr %ch, ptr undef) #9
ret void
}
-declare void @runtime.chanClose(ptr dereferenceable_or_null(32), ptr) #2
+declare void @runtime.chanClose(ptr dereferenceable_or_null(36), ptr) #2
; Function Attrs: nounwind
define hidden void @main.startInterfaceMethod(ptr %itf.typecode, ptr %itf.value, ptr %context) unnamed_addr #1 {
diff --git a/compiler/testdata/goroutine-wasm-asyncify.ll b/compiler/testdata/goroutine-wasm-asyncify.ll
index 699b9f205..c4af76037 100644
--- a/compiler/testdata/goroutine-wasm-asyncify.ll
+++ b/compiler/testdata/goroutine-wasm-asyncify.ll
@@ -144,13 +144,13 @@ entry:
declare i32 @runtime.sliceCopy(ptr nocapture writeonly, ptr nocapture readonly, i32, i32, i32, ptr) #1
; Function Attrs: nounwind
-define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 {
+define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 {
entry:
call void @runtime.chanClose(ptr %ch, ptr undef) #9
ret void
}
-declare void @runtime.chanClose(ptr dereferenceable_or_null(32), ptr) #1
+declare void @runtime.chanClose(ptr dereferenceable_or_null(36), ptr) #1
; Function Attrs: nounwind
define hidden void @main.startInterfaceMethod(ptr %itf.typecode, ptr %itf.value, ptr %context) unnamed_addr #2 {
diff --git a/src/runtime/chan.go b/src/runtime/chan.go
index 1f0d7ced8..c62685700 100644
--- a/src/runtime/chan.go
+++ b/src/runtime/chan.go
@@ -1,27 +1,45 @@
package runtime
// This file implements the 'chan' type and send/receive/select operations.
-
-// A channel can be in one of the following states:
-// empty:
-// No goroutine is waiting on a send or receive operation. The 'blocked'
-// member is nil.
-// recv:
-// A goroutine tries to receive from the channel. This goroutine is stored
-// in the 'blocked' member.
-// send:
-// The reverse of send. A goroutine tries to send to the channel. This
-// goroutine is stored in the 'blocked' member.
-// closed:
-// The channel is closed. Sends will panic, receives will get a zero value
-// plus optionally the indication that the channel is zero (with the
-// comma-ok value in the task).
//
-// A send/recv transmission is completed by copying from the data element of the
-// sending task to the data element of the receiving task, and setting
-// the 'comma-ok' value to true.
-// A receive operation on a closed channel is completed by zeroing the data
-// element of the receiving task and setting the 'comma-ok' value to false.
+// Every channel has a list of senders and a list of receivers, and possibly a
+// queue. There is no 'channel state', the state is inferred from the available
+// senders/receivers and values in the buffer.
+//
+// - A sender will first try to send the value to a waiting receiver if there is
+// one, but only if there is nothing in the queue (to keep the values flowing
+// in the correct order). If it can't, it will add the value in the queue and
+// possibly wait as a sender if there's no space available.
+// - A receiver will first try to read a value from the queue, but if there is
+// none it will try to read from a sender in the list. It will block if it
+// can't proceed.
+//
+// State is kept in various ways:
+//
+// - The sender value is stored in the sender 'channelOp', which is really a
+// queue entry. This works for both senders and select operations: a select
+// operation has a separate value to send for each case.
+// - The receiver value is stored inside Task.Ptr. This works for receivers, and
+// importantly also works for select which has a single buffer for every
+// receive operation.
+// - The `Task.Data` value stores how the channel operation proceeded. For
+// normal send/receive operations, it starts at chanOperationWaiting and then
+// is changed to chanOperationOk or chanOperationClosed depending on whether
+// the send/receive proceeded normally or because it was closed. For a select
+// operation, it also stores the 'case' index in the upper bits (zero for
+// non-select operations) so that the select operation knows which case did
+// proceed.
+// The value is at the same time also a way that goroutines can be the first
+// (and only) goroutine to 'take' a channel operation to change it from
+// 'waiting' to any other value. This is important for the select statement
+// because multiple goroutines could try to let different channels in the
+// select statement proceed at the same time. By using Task.Data, only a
+// single channel operation in the select statement can proceed.
+// - It is possible for the channel queues to contain already-processed senders
+// or receivers. This can happen when the select statement managed to proceed
+// but the goroutine doing the select has not yet cleaned up the stale queue
+// entries before returning. This should therefore only happen for a short
+// period.
import (
"internal/task"
@@ -29,490 +47,283 @@ import (
"unsafe"
)
-func chanDebug(ch *channel) {
- if schedulerDebug {
- if ch.bufSize > 0 {
- println("--- channel update:", ch, ch.state.String(), ch.bufSize, ch.bufUsed)
- } else {
- println("--- channel update:", ch, ch.state.String())
- }
- }
+// The runtime implementation of the Go 'chan' type.
+type channel struct {
+ closed bool
+ elementSize uintptr
+ bufCap uintptr // 'cap'
+ bufLen uintptr // 'len'
+ bufHead uintptr
+ bufTail uintptr
+ senders chanQueue
+ receivers chanQueue
+ buf unsafe.Pointer
}
-// channelBlockedList is a list of channel operations on a specific channel which are currently blocked.
-type channelBlockedList struct {
- // next is a pointer to the next blocked channel operation on the same channel.
- next *channelBlockedList
-
- // t is the task associated with this channel operation.
- // If this channel operation is not part of a select, then the pointer field of the state holds the data buffer.
- // If this channel operation is part of a select, then the pointer field of the state holds the receive buffer.
- // If this channel operation is a receive, then the data field should be set to zero when resuming due to channel closure.
- t *task.Task
-
- // s is a pointer to the channel select state corresponding to this operation.
- // This will be nil if and only if this channel operation is not part of a select statement.
- // If this is a send operation, then the send buffer can be found in this select state.
- s *chanSelectState
-
- // allSelectOps is a slice containing all of the channel operations involved with this select statement.
- // Before resuming the task, all other channel operations on this select statement should be canceled by removing them from their corresponding lists.
- allSelectOps []channelBlockedList
+const (
+ chanOperationWaiting = 0b00 // waiting for a send/receive operation to continue
+ chanOperationOk = 0b01 // successfully sent or received (not closed)
+ chanOperationClosed = 0b10 // channel was closed, the value has been zeroed
+ chanOperationMask = 0b11
+)
+
+type chanQueue struct {
+ first *channelOp
}
-// remove takes the current list of blocked channel operations and removes the specified operation.
-// This returns the resulting list, or nil if the resulting list is empty.
-// A nil receiver is treated as an empty list.
-func (b *channelBlockedList) remove(old *channelBlockedList) *channelBlockedList {
- if b == old {
- return b.next
- }
- c := b
- for ; c != nil && c.next != old; c = c.next {
- }
- if c != nil {
- c.next = old.next
- }
- return b
+// Pus the next channel operation to the queue. All appropriate fields must have
+// been initialized already.
+// This function must be called with interrupts disabled.
+func (q *chanQueue) push(node *channelOp) {
+ node.next = q.first
+ q.first = node
}
-// detach removes all other channel operations that are part of the same select statement.
-// If the input is not part of a select statement, this is a no-op.
-// This must be called before resuming any task blocked on a channel operation in order to ensure that it is not placed on the runqueue twice.
-func (b *channelBlockedList) detach() {
- if b.allSelectOps == nil {
- // nothing to do
- return
- }
- for i, v := range b.allSelectOps {
- // cancel all other channel operations that are part of this select statement
- switch {
- case &b.allSelectOps[i] == b:
- // This entry is the one that was already detached.
- continue
- case v.t == nil:
- // This entry is not used (nil channel).
- continue
+// Pop the next waiting channel from the queue. Channels that are no longer
+// waiting (for example, when they're part of a select operation) will be
+// skipped.
+// This function must be called with interrupts disabled.
+func (q *chanQueue) pop(chanOp uint64) *channelOp {
+ for {
+ if q.first == nil {
+ return nil
}
- v.s.ch.blocked = v.s.ch.blocked.remove(&b.allSelectOps[i])
- if v.s.ch.blocked == nil {
- if v.s.value == nil {
- // recv operation
- if v.s.ch.state != chanStateClosed {
- v.s.ch.state = chanStateEmpty
- }
- } else {
- // send operation
- if v.s.ch.bufUsed == 0 {
- // unbuffered channel
- v.s.ch.state = chanStateEmpty
- } else {
- // buffered channel
- v.s.ch.state = chanStateBuf
- }
- }
+
+ // Pop next from the queue.
+ popped := q.first
+ q.first = q.first.next
+
+ // The new value for the 'data' field will be a combination of the
+ // channel operation and the select index. (The select index is 0 for
+ // non-select channel operations).
+ newDataValue := chanOp | uint64(popped.index<<2)
+
+ // Try to be the first to proceed with this goroutine.
+ if popped.task.Data == chanOperationWaiting {
+ popped.task.Data = newDataValue
+ return popped
}
- chanDebug(v.s.ch)
}
}
-type channel struct {
- elementSize uintptr // the size of one value in this channel
- bufSize uintptr // size of buffer (in elements)
- state chanState
- blocked *channelBlockedList
- bufHead uintptr // head index of buffer (next push index)
- bufTail uintptr // tail index of buffer (next pop index)
- bufUsed uintptr // number of elements currently in buffer
- buf unsafe.Pointer // pointer to first element of buffer
+// Remove the given to-be-removed node from the queue if it is part of the
+// queue. If there are multiple, only one will be removed.
+// This function must be called with interrupts disabled.
+func (q *chanQueue) remove(remove *channelOp) {
+ n := &q.first
+ for *n != nil {
+ if *n == remove {
+ *n = (*n).next
+ return
+ }
+ n = &((*n).next)
+ }
+}
+
+type channelOp struct {
+ next *channelOp
+ task *task.Task
+ index uintptr // select index, 0 for non-select operation
+ value unsafe.Pointer // if this is a sender, this is the value to send
+}
+
+type chanSelectState struct {
+ ch *channel
+ value unsafe.Pointer
}
-// chanMake creates a new channel with the given element size and buffer length in number of elements.
-// This is a compiler intrinsic.
func chanMake(elementSize uintptr, bufSize uintptr) *channel {
return &channel{
elementSize: elementSize,
- bufSize: bufSize,
+ bufCap: bufSize,
buf: alloc(elementSize*bufSize, nil),
}
}
// Return the number of entries in this chan, called from the len builtin.
// A nil chan is defined as having length 0.
-//
-//go:inline
func chanLen(c *channel) int {
if c == nil {
return 0
}
- return int(c.bufUsed)
+ return int(c.bufLen)
}
// Return the capacity of this chan, called from the cap builtin.
// A nil chan is defined as having capacity 0.
-//
-//go:inline
func chanCap(c *channel) int {
if c == nil {
return 0
}
- return int(c.bufSize)
+ return int(c.bufCap)
}
-// resumeRX resumes the next receiver and returns the destination pointer.
-// If the ok value is true, then the caller is expected to store a value into this pointer.
-func (ch *channel) resumeRX(ok bool) unsafe.Pointer {
- // pop a blocked goroutine off the stack
- var b *channelBlockedList
- b, ch.blocked = ch.blocked, ch.blocked.next
-
- // get destination pointer
- dst := b.t.Ptr
-
- if !ok {
- // the result value is zero
- memzero(dst, ch.elementSize)
- b.t.Data = 0
- }
-
- if b.s != nil {
- // tell the select op which case resumed
- b.t.Ptr = unsafe.Pointer(b.s)
-
- // detach associated operations
- b.detach()
- }
-
- scheduleTask(b.t)
-
- return dst
-}
-
-// resumeTX resumes the next sender and returns the source pointer.
-// The caller is expected to read from the value in this pointer before yielding.
-func (ch *channel) resumeTX() unsafe.Pointer {
- // pop a blocked goroutine off the stack
- var b *channelBlockedList
- b, ch.blocked = ch.blocked, ch.blocked.next
-
- // get source pointer
- src := b.t.Ptr
-
- if b.s != nil {
- // use state's source pointer
- src = b.s.value
-
- // tell the select op which case resumed
- b.t.Ptr = unsafe.Pointer(b.s)
-
- // detach associated operations
- b.detach()
- }
-
- scheduleTask(b.t)
-
- return src
-}
-
-// push value to end of channel if space is available
-// returns whether there was space for the value in the buffer
-func (ch *channel) push(value unsafe.Pointer) bool {
- // immediately return false if the channel is not buffered
- if ch.bufSize == 0 {
- return false
- }
-
- // ensure space is available
- if ch.bufUsed == ch.bufSize {
- return false
- }
-
- // copy value to buffer
- memcpy(
- unsafe.Add(ch.buf, // pointer to the base of the buffer + offset = pointer to destination element
- ch.elementSize*ch.bufHead), // element size * equivalent slice index = offset
- value,
- ch.elementSize,
- )
-
- // update buffer state
- ch.bufUsed++
+// Push the value to the channel buffer array, for a send operation.
+// This function may only be called when interrupts are disabled and it is known
+// there is space available in the buffer.
+func (ch *channel) bufferPush(value unsafe.Pointer) {
+ elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
+ ch.bufLen++
ch.bufHead++
- if ch.bufHead == ch.bufSize {
+ if ch.bufHead == ch.bufCap {
ch.bufHead = 0
}
- return true
+ memcpy(elemAddr, value, ch.elementSize)
}
-// pop value from channel buffer if one is available
-// returns whether a value was popped or not
-// result is stored into value pointer
-func (ch *channel) pop(value unsafe.Pointer) bool {
- // channel is empty
- if ch.bufUsed == 0 {
- return false
- }
-
- // compute address of source
- addr := unsafe.Add(ch.buf, (ch.elementSize * ch.bufTail))
-
- // copy value from buffer
- memcpy(
- value,
- addr,
- ch.elementSize,
- )
-
- // zero buffer element to allow garbage collection of value
- memzero(
- addr,
- ch.elementSize,
- )
-
- // update buffer state
- ch.bufUsed--
-
- // move tail up
+// Pop a value from the channel buffer and store it in the 'value' pointer, for
+// a receive operation.
+// This function may only be called when interrupts are disabled and it is known
+// there is at least one value available in the buffer.
+func (ch *channel) bufferPop(value unsafe.Pointer) {
+ elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
+ ch.bufLen--
ch.bufTail++
- if ch.bufTail == ch.bufSize {
+ if ch.bufTail == ch.bufCap {
ch.bufTail = 0
}
- return true
+ memcpy(value, elemAddr, ch.elementSize)
+
+ // Zero the value to allow the GC to collect it.
+ memzero(elemAddr, ch.elementSize)
}
-// try to send a value to a channel, without actually blocking
-// returns whether the value was sent
-// will panic if channel is closed
+// Try to proceed with this send operation without blocking, and return whether
+// the send succeeded. Interrupts must be disabled when calling this function.
func (ch *channel) trySend(value unsafe.Pointer) bool {
- if ch == nil {
- // send to nil channel blocks forever
- // this is non-blocking, so just say no
- return false
+ // To make sure we send values in the correct order, we can only send
+ // directly to a receiver when there are no values in the buffer.
+
+ // Do not allow sending on a closed channel.
+ if ch.closed {
+ // Note: we cannot currently recover from this panic.
+ // There's some state in the select statement especially that would be
+ // corrupted if we allowed recovering from this panic.
+ runtimePanic("send on closed channel")
}
- i := interrupt.Disable()
-
- switch ch.state {
- case chanStateEmpty, chanStateBuf:
- // try to dump the value directly into the buffer
- if ch.push(value) {
- ch.state = chanStateBuf
- interrupt.Restore(i)
+ // There is no value in the buffer and we have a receiver available. Copy
+ // the value directly into the receiver.
+ if ch.bufLen == 0 {
+ if receiver := ch.receivers.pop(chanOperationOk); receiver != nil {
+ memcpy(receiver.task.Ptr, value, ch.elementSize)
+ scheduleTask(receiver.task)
return true
}
- interrupt.Restore(i)
- return false
- case chanStateRecv:
- // unblock receiver
- dst := ch.resumeRX(true)
-
- // copy value to receiver
- memcpy(dst, value, ch.elementSize)
-
- // change state to empty if there are no more receivers
- if ch.blocked == nil {
- ch.state = chanStateEmpty
- }
+ }
- interrupt.Restore(i)
+ // If there is space in the buffer (if this is a buffered channel), we can
+ // store the value in the buffer and continue.
+ if ch.bufLen < ch.bufCap {
+ ch.bufferPush(value)
return true
- case chanStateSend:
- // something else is already waiting to send
- interrupt.Restore(i)
- return false
- case chanStateClosed:
- interrupt.Restore(i)
- runtimePanic("send on closed channel")
- default:
- interrupt.Restore(i)
- runtimePanic("invalid channel state")
}
-
- interrupt.Restore(i)
return false
}
-// try to receive a value from a channel, without really blocking
-// returns whether a value was received
-// second return is the comma-ok value
-func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) {
+func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
if ch == nil {
- // receive from nil channel blocks forever
- // this is non-blocking, so just say no
- return false, false
+ // A nil channel blocks forever. Do not schedule this goroutine again.
+ deadlock()
}
- i := interrupt.Disable()
-
- switch ch.state {
- case chanStateBuf, chanStateSend:
- // try to pop the value directly from the buffer
- if ch.pop(value) {
- // unblock next sender if applicable
- if ch.blocked != nil {
- src := ch.resumeTX()
-
- // push sender's value into buffer
- ch.push(src)
-
- if ch.blocked == nil {
- // last sender unblocked - update state
- ch.state = chanStateBuf
- }
- }
-
- if ch.bufUsed == 0 {
- // channel empty - update state
- ch.state = chanStateEmpty
- }
+ mask := interrupt.Disable()
- interrupt.Restore(i)
- return true, true
- } else if ch.blocked != nil {
- // unblock next sender if applicable
- src := ch.resumeTX()
+ // See whether we can proceed immediately, and if so, return early.
+ if ch.trySend(value) {
+ interrupt.Restore(mask)
+ return
+ }
- // copy sender's value
- memcpy(value, src, ch.elementSize)
+ // Can't proceed. Add us to the list of senders and wait until we're awoken.
+ t := task.Current()
+ t.Data = chanOperationWaiting
+ op.task = t
+ op.index = 0
+ op.value = value
+ ch.senders.push(op)
+ interrupt.Restore(mask)
+
+ // Wait until this goroutine is resumed.
+ task.Pause()
- if ch.blocked == nil {
- // last sender unblocked - update state
- ch.state = chanStateEmpty
- }
+ // Check whether the sent happened normally (not because the channel was
+ // closed while sending).
+ if t.Data == chanOperationClosed {
+ // Oops, this channel was closed while sending!
+ runtimePanic("send on closed channel")
+ }
+}
- interrupt.Restore(i)
- return true, true
- }
- interrupt.Restore(i)
- return false, false
- case chanStateRecv, chanStateEmpty:
- // something else is already waiting to receive
- interrupt.Restore(i)
- return false, false
- case chanStateClosed:
- if ch.pop(value) {
- interrupt.Restore(i)
- return true, true
+// Try to proceed with this receive operation without blocking, and return
+// whether the receive operation succeeded. Interrupts must be disabled when
+// calling this function.
+func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
+ // To make sure we keep the values in the channel in the correct order, we
+ // first have to read values from the buffer before we can look at the
+ // senders.
+
+ // If there is a value available in the buffer, we can pull it out and
+ // proceed immediately.
+ if ch.bufLen > 0 {
+ ch.bufferPop(value)
+
+ // Check for the next sender available and push it to the buffer.
+ if sender := ch.senders.pop(chanOperationOk); sender != nil {
+ ch.bufferPush(sender.value)
+ scheduleTask(sender.task)
}
- // channel closed - nothing to receive
+ return true, true
+ }
+
+ if ch.closed {
+ // Channel is closed, so proceed immediately.
memzero(value, ch.elementSize)
- interrupt.Restore(i)
return true, false
- default:
- runtimePanic("invalid channel state")
}
- runtimePanic("unreachable")
- return false, false
-}
-
-type chanState uint8
-
-const (
- chanStateEmpty chanState = iota // nothing in channel, no senders/receivers
- chanStateRecv // nothing in channel, receivers waiting
- chanStateSend // senders waiting, buffer full if present
- chanStateBuf // buffer not empty, no senders waiting
- chanStateClosed // channel closed
-)
-
-func (s chanState) String() string {
- switch s {
- case chanStateEmpty:
- return "empty"
- case chanStateRecv:
- return "recv"
- case chanStateSend:
- return "send"
- case chanStateBuf:
- return "buffered"
- case chanStateClosed:
- return "closed"
- default:
- return "invalid"
+ // If there is a sender, we can proceed with the channel operation
+ // immediately.
+ if sender := ch.senders.pop(chanOperationOk); sender != nil {
+ memcpy(value, sender.value, ch.elementSize)
+ scheduleTask(sender.task)
+ return true, true
}
-}
-// chanSelectState is a single channel operation (send/recv) in a select
-// statement. The value pointer is either nil (for receives) or points to the
-// value to send (for sends).
-type chanSelectState struct {
- ch *channel
- value unsafe.Pointer
+ return false, false
}
-// chanSend sends a single value over the channel.
-// This operation will block unless a value is immediately available.
-// May panic if the channel is closed.
-func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) {
- i := interrupt.Disable()
-
- if ch.trySend(value) {
- // value immediately sent
- chanDebug(ch)
- interrupt.Restore(i)
- return
- }
-
+func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
if ch == nil {
// A nil channel blocks forever. Do not schedule this goroutine again.
- interrupt.Restore(i)
deadlock()
}
- // wait for receiver
- sender := task.Current()
- ch.state = chanStateSend
- sender.Ptr = value
- *blockedlist = channelBlockedList{
- next: ch.blocked,
- t: sender,
- }
- ch.blocked = blockedlist
- chanDebug(ch)
- interrupt.Restore(i)
- task.Pause()
- sender.Ptr = nil
-}
+ mask := interrupt.Disable()
-// chanRecv receives a single value over a channel.
-// It blocks if there is no available value to receive.
-// The received value is copied into the value pointer.
-// Returns the comma-ok value.
-func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool {
- i := interrupt.Disable()
-
- if rx, ok := ch.tryRecv(value); rx {
- // value immediately available
- chanDebug(ch)
- interrupt.Restore(i)
+ if received, ok := ch.tryRecv(value); received {
+ interrupt.Restore(mask)
return ok
}
- if ch == nil {
- // A nil channel blocks forever. Do not schedule this goroutine again.
- interrupt.Restore(i)
- deadlock()
- }
-
- // wait for a value
- receiver := task.Current()
- ch.state = chanStateRecv
- receiver.Ptr, receiver.Data = value, 1
- *blockedlist = channelBlockedList{
- next: ch.blocked,
- t: receiver,
- }
- ch.blocked = blockedlist
- chanDebug(ch)
- interrupt.Restore(i)
+ // We can't proceed, so we add ourselves to the list of receivers and wait
+ // until we're awoken.
+ t := task.Current()
+ t.Ptr = value
+ t.Data = chanOperationWaiting
+ op.task = t
+ op.index = 0
+ ch.receivers.push(op)
+ interrupt.Restore(mask)
+
+ // Wait until the goroutine is resumed.
task.Pause()
- ok := receiver.Data == 1
- receiver.Ptr, receiver.Data = nil, 0
- return ok
+
+ // Return whether the receive happened from a closed channel.
+ return t.Data != chanOperationClosed
}
// chanClose closes the given channel. If this channel has a receiver or is
@@ -522,128 +333,135 @@ func chanClose(ch *channel) {
// Not allowed by the language spec.
runtimePanic("close of nil channel")
}
- i := interrupt.Disable()
- switch ch.state {
- case chanStateClosed:
+
+ mask := interrupt.Disable()
+
+ if ch.closed {
// Not allowed by the language spec.
- interrupt.Restore(i)
+ interrupt.Restore(mask)
runtimePanic("close of closed channel")
- case chanStateSend:
- // This panic should ideally on the sending side, not in this goroutine.
- // But when a goroutine tries to send while the channel is being closed,
- // that is clearly invalid: the send should have been completed already
- // before the close.
- interrupt.Restore(i)
- runtimePanic("close channel during send")
- case chanStateRecv:
- // unblock all receivers with the zero value
- ch.state = chanStateClosed
- for ch.blocked != nil {
- ch.resumeRX(false)
+ }
+
+ // Proceed all receiving operations that are blocked.
+ for {
+ receiver := ch.receivers.pop(chanOperationClosed)
+ if receiver == nil {
+ // Processed all receivers.
+ break
}
- case chanStateEmpty, chanStateBuf:
- // Easy case. No available sender or receiver.
+
+ // Zero the value that the receiver is getting.
+ memzero(receiver.task.Ptr, ch.elementSize)
+
+ // Wake up the receiving goroutine.
+ scheduleTask(receiver.task)
}
- ch.state = chanStateClosed
- interrupt.Restore(i)
- chanDebug(ch)
-}
-// chanSelect is the runtime implementation of the select statement. This is
-// perhaps the most complicated statement in the Go spec. It returns the
-// selected index and the 'comma-ok' value.
-//
-// TODO: do this in a round-robin fashion (as specified in the Go spec) instead
-// of picking the first one that can proceed.
-func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) {
- istate := interrupt.Disable()
-
- if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) {
- // one channel was immediately ready
- interrupt.Restore(istate)
- return selected, ok
+ // Let all senders panic.
+ for {
+ sender := ch.senders.pop(chanOperationClosed)
+ if sender == nil {
+ break // processed all senders
+ }
+
+ // Wake up the sender.
+ scheduleTask(sender.task)
}
- // construct blocked operations
- for i, v := range states {
- if v.ch == nil {
- // A nil channel receive will never complete.
- // A nil channel send would have panicked during tryChanSelect.
- ops[i] = channelBlockedList{}
+ ch.closed = true
+
+ interrupt.Restore(mask)
+}
+
+// chanSelect implements blocking or non-blocking select operations.
+// The 'ops' slice must be set if (and only if) this is a blocking select.
+func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uintptr, bool) {
+ mask := interrupt.Disable()
+
+ const selectNoIndex = ^uintptr(0)
+ selectIndex := selectNoIndex
+ selectOk := true
+
+ // Iterate over each state, and see if it can proceed.
+ // TODO: start from a random index.
+ for i, state := range states {
+ if state.ch == nil {
+ // A nil channel blocks forever, so it won't take part of the select
+ // operation.
continue
}
- ops[i] = channelBlockedList{
- next: v.ch.blocked,
- t: task.Current(),
- s: &states[i],
- allSelectOps: ops,
- }
- v.ch.blocked = &ops[i]
- if v.value == nil {
- // recv
- switch v.ch.state {
- case chanStateEmpty:
- v.ch.state = chanStateRecv
- case chanStateRecv:
- // already in correct state
- default:
- interrupt.Restore(istate)
- runtimePanic("invalid channel state")
+ if state.value == nil { // chan receive
+ if received, ok := state.ch.tryRecv(recvbuf); received {
+ selectIndex = uintptr(i)
+ selectOk = ok
+ break
}
- } else {
- // send
- switch v.ch.state {
- case chanStateEmpty:
- v.ch.state = chanStateSend
- case chanStateSend:
- // already in correct state
- case chanStateBuf:
- // already in correct state
- default:
- interrupt.Restore(istate)
- runtimePanic("invalid channel state")
+ } else { // chan send
+ if state.ch.trySend(state.value) {
+ selectIndex = uintptr(i)
+ break
}
}
- chanDebug(v.ch)
}
- // expose rx buffer
+ // If this select can immediately proceed, or is a non-blocking select,
+ // return early.
+ blocking := len(ops) != 0
+ if selectIndex != selectNoIndex || !blocking {
+ interrupt.Restore(mask)
+ return selectIndex, selectOk
+ }
+
+ // The select is blocking and no channel operation can proceed, so things
+ // become more complicated.
+ // We add ourselves as a sender/receiver to every channel, and wait for the
+ // first one to complete. Only one will successfully complete, because
+ // senders and receivers will check t.Data for the state so that only one
+ // will be able to "take" this select operation.
t := task.Current()
t.Ptr = recvbuf
- t.Data = 1
+ t.Data = chanOperationWaiting
+ for i, state := range states {
+ if state.ch == nil {
+ continue
+ }
+ op := &ops[i]
+ op.task = t
+ op.index = uintptr(i)
+ if state.value == nil { // chan receive
+ state.ch.receivers.push(op)
+ } else { // chan send
+ op.value = state.value
+ state.ch.senders.push(op)
+ }
+ }
- // wait for one case to fire
- interrupt.Restore(istate)
+ // Now we wait until one of the send/receive operations can proceed.
+ interrupt.Restore(mask)
task.Pause()
- // figure out which one fired and return the ok value
- return (uintptr(t.Ptr) - uintptr(unsafe.Pointer(&states[0]))) / unsafe.Sizeof(chanSelectState{}), t.Data != 0
-}
-
-// tryChanSelect is like chanSelect, but it does a non-blocking select operation.
-func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) {
- istate := interrupt.Disable()
+ // Resumed, so one channel operation must have progressed.
- // See whether we can receive from one of the channels.
+ // Make sure all channel ops are removed from the senders/receivers
+ // queue before we return and the memory of them becomes invalid.
for i, state := range states {
+ if state.ch == nil {
+ continue
+ }
+ op := &ops[i]
+ mask := interrupt.Disable()
if state.value == nil {
- // A receive operation.
- if rx, ok := state.ch.tryRecv(recvbuf); rx {
- chanDebug(state.ch)
- interrupt.Restore(istate)
- return uintptr(i), ok
- }
+ state.ch.receivers.remove(op)
} else {
- // A send operation: state.value is not nil.
- if state.ch.trySend(state.value) {
- chanDebug(state.ch)
- interrupt.Restore(istate)
- return uintptr(i), true
- }
+ state.ch.senders.remove(op)
}
+ interrupt.Restore(mask)
}
- interrupt.Restore(istate)
- return ^uintptr(0), false
+ // Pull the return values out of t.Data (which contains two bitfields).
+ selectIndex = uintptr(t.Data) >> 2
+ selectOk = t.Data&chanOperationMask != chanOperationClosed
+
+ return selectIndex, selectOk
}