diff options
-rw-r--r-- | compiler/channel.go | 37 | ||||
-rw-r--r-- | compiler/testdata/channel.ll | 52 | ||||
-rw-r--r-- | compiler/testdata/goroutine-cortex-m-qemu-tasks.ll | 4 | ||||
-rw-r--r-- | compiler/testdata/goroutine-wasm-asyncify.ll | 4 | ||||
-rw-r--r-- | src/runtime/chan.go | 848 |
5 files changed, 383 insertions, 562 deletions
diff --git a/compiler/channel.go b/compiler/channel.go index 9969835e8..7e867c278 100644 --- a/compiler/channel.go +++ b/compiler/channel.go @@ -41,17 +41,17 @@ func (b *builder) createChanSend(instr *ssa.Send) { b.CreateStore(chanValue, valueAlloca) } - // Allocate blockedlist buffer. - channelBlockedList := b.getLLVMRuntimeType("channelBlockedList") - channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList") + // Allocate buffer for the channel operation. + channelOp := b.getLLVMRuntimeType("channelOp") + channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op") // Do the send. - b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "") + b.createRuntimeCall("chanSend", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "") // End the lifetime of the allocas. // This also works around a bug in CoroSplit, at least in LLVM 8: // https://bugs.llvm.org/show_bug.cgi?id=41742 - b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize) + b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize) if !isZeroSize { b.emitLifetimeEnd(valueAlloca, valueAllocaSize) } @@ -72,12 +72,12 @@ func (b *builder) createChanRecv(unop *ssa.UnOp) llvm.Value { valueAlloca, valueAllocaSize = b.createTemporaryAlloca(valueType, "chan.value") } - // Allocate blockedlist buffer. - channelBlockedList := b.getLLVMRuntimeType("channelBlockedList") - channelBlockedListAlloca, channelBlockedListAllocaSize := b.createTemporaryAlloca(channelBlockedList, "chan.blockedList") + // Allocate buffer for the channel operation. + channelOp := b.getLLVMRuntimeType("channelOp") + channelOpAlloca, channelOpAllocaSize := b.createTemporaryAlloca(channelOp, "chan.op") // Do the receive. - commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelBlockedListAlloca}, "") + commaOk := b.createRuntimeCall("chanRecv", []llvm.Value{ch, valueAlloca, channelOpAlloca}, "") var received llvm.Value if isZeroSize { received = llvm.ConstNull(valueType) @@ -85,7 +85,7 @@ func (b *builder) createChanRecv(unop *ssa.UnOp) llvm.Value { received = b.CreateLoad(valueType, valueAlloca, "chan.received") b.emitLifetimeEnd(valueAlloca, valueAllocaSize) } - b.emitLifetimeEnd(channelBlockedListAlloca, channelBlockedListAllocaSize) + b.emitLifetimeEnd(channelOpAlloca, channelOpAllocaSize) if unop.CommaOk { tuple := llvm.Undef(b.ctx.StructType([]llvm.Type{valueType, b.ctx.Int1Type()}, false)) @@ -198,10 +198,10 @@ func (b *builder) createSelect(expr *ssa.Select) llvm.Value { if expr.Blocking { // Stack-allocate operation structures. // If these were simply created as a slice, they would heap-allocate. - chBlockAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelBlockedList"), len(selectStates)) - chBlockAlloca, chBlockSize := b.createTemporaryAlloca(chBlockAllocaType, "select.block.alloca") - chBlockLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false) - chBlockPtr := b.CreateGEP(chBlockAllocaType, chBlockAlloca, []llvm.Value{ + opsAllocaType := llvm.ArrayType(b.getLLVMRuntimeType("channelOp"), len(selectStates)) + opsAlloca, opsSize := b.createTemporaryAlloca(opsAllocaType, "select.block.alloca") + opsLen := llvm.ConstInt(b.uintptrType, uint64(len(selectStates)), false) + opsPtr := b.CreateGEP(opsAllocaType, opsAlloca, []llvm.Value{ llvm.ConstInt(b.ctx.Int32Type(), 0, false), llvm.ConstInt(b.ctx.Int32Type(), 0, false), }, "select.block") @@ -209,15 +209,18 @@ func (b *builder) createSelect(expr *ssa.Select) llvm.Value { results = b.createRuntimeCall("chanSelect", []llvm.Value{ recvbuf, statesPtr, statesLen, statesLen, // []chanSelectState - chBlockPtr, chBlockLen, chBlockLen, // []channelBlockList + opsPtr, opsLen, opsLen, // []channelOp }, "select.result") // Terminate the lifetime of the operation structures. - b.emitLifetimeEnd(chBlockAlloca, chBlockSize) + b.emitLifetimeEnd(opsAlloca, opsSize) } else { - results = b.createRuntimeCall("tryChanSelect", []llvm.Value{ + opsPtr := llvm.ConstNull(b.dataPtrType) + opsLen := llvm.ConstInt(b.uintptrType, 0, false) + results = b.createRuntimeCall("chanSelect", []llvm.Value{ recvbuf, statesPtr, statesLen, statesLen, // []chanSelectState + opsPtr, opsLen, opsLen, // []channelOp (nil slice) }, "select.result") } diff --git a/compiler/testdata/channel.ll b/compiler/testdata/channel.ll index 65e18dea8..68982d051 100644 --- a/compiler/testdata/channel.ll +++ b/compiler/testdata/channel.ll @@ -3,7 +3,7 @@ source_filename = "channel.go" target datalayout = "e-m:e-p:32:32-p10:8:8-p20:8:8-i64:64-n32:64-S128-ni:1:10:20" target triple = "wasm32-unknown-wasi" -%runtime.channelBlockedList = type { ptr, ptr, ptr, { ptr, i32, i32 } } +%runtime.channelOp = type { ptr, ptr, i32, ptr } %runtime.chanSelectState = type { ptr, ptr } ; Function Attrs: allockind("alloc,zeroed") allocsize(0) @@ -18,15 +18,15 @@ entry: } ; Function Attrs: nounwind -define hidden void @main.chanIntSend(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 { +define hidden void @main.chanIntSend(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 { entry: - %chan.blockedList = alloca %runtime.channelBlockedList, align 8 + %chan.op = alloca %runtime.channelOp, align 8 %chan.value = alloca i32, align 4 call void @llvm.lifetime.start.p0(i64 4, ptr nonnull %chan.value) store i32 3, ptr %chan.value, align 4 - call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList) - call void @runtime.chanSend(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.blockedList, ptr undef) #4 - call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList) + call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op) + call void @runtime.chanSend(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.op, ptr undef) #4 + call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op) call void @llvm.lifetime.end.p0(i64 4, ptr nonnull %chan.value) ret void } @@ -34,48 +34,48 @@ entry: ; Function Attrs: nocallback nofree nosync nounwind willreturn memory(argmem: readwrite) declare void @llvm.lifetime.start.p0(i64 immarg, ptr nocapture) #3 -declare void @runtime.chanSend(ptr dereferenceable_or_null(32), ptr, ptr dereferenceable_or_null(24), ptr) #1 +declare void @runtime.chanSend(ptr dereferenceable_or_null(36), ptr, ptr dereferenceable_or_null(16), ptr) #1 ; Function Attrs: nocallback nofree nosync nounwind willreturn memory(argmem: readwrite) declare void @llvm.lifetime.end.p0(i64 immarg, ptr nocapture) #3 ; Function Attrs: nounwind -define hidden void @main.chanIntRecv(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 { +define hidden void @main.chanIntRecv(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 { entry: - %chan.blockedList = alloca %runtime.channelBlockedList, align 8 + %chan.op = alloca %runtime.channelOp, align 8 %chan.value = alloca i32, align 4 call void @llvm.lifetime.start.p0(i64 4, ptr nonnull %chan.value) - call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList) - %0 = call i1 @runtime.chanRecv(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.blockedList, ptr undef) #4 + call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op) + %0 = call i1 @runtime.chanRecv(ptr %ch, ptr nonnull %chan.value, ptr nonnull %chan.op, ptr undef) #4 call void @llvm.lifetime.end.p0(i64 4, ptr nonnull %chan.value) - call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList) + call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op) ret void } -declare i1 @runtime.chanRecv(ptr dereferenceable_or_null(32), ptr, ptr dereferenceable_or_null(24), ptr) #1 +declare i1 @runtime.chanRecv(ptr dereferenceable_or_null(36), ptr, ptr dereferenceable_or_null(16), ptr) #1 ; Function Attrs: nounwind -define hidden void @main.chanZeroSend(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 { +define hidden void @main.chanZeroSend(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 { entry: - %chan.blockedList = alloca %runtime.channelBlockedList, align 8 - call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList) - call void @runtime.chanSend(ptr %ch, ptr null, ptr nonnull %chan.blockedList, ptr undef) #4 - call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList) + %chan.op = alloca %runtime.channelOp, align 8 + call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op) + call void @runtime.chanSend(ptr %ch, ptr null, ptr nonnull %chan.op, ptr undef) #4 + call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op) ret void } ; Function Attrs: nounwind -define hidden void @main.chanZeroRecv(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 { +define hidden void @main.chanZeroRecv(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 { entry: - %chan.blockedList = alloca %runtime.channelBlockedList, align 8 - call void @llvm.lifetime.start.p0(i64 24, ptr nonnull %chan.blockedList) - %0 = call i1 @runtime.chanRecv(ptr %ch, ptr null, ptr nonnull %chan.blockedList, ptr undef) #4 - call void @llvm.lifetime.end.p0(i64 24, ptr nonnull %chan.blockedList) + %chan.op = alloca %runtime.channelOp, align 8 + call void @llvm.lifetime.start.p0(i64 16, ptr nonnull %chan.op) + %0 = call i1 @runtime.chanRecv(ptr %ch, ptr null, ptr nonnull %chan.op, ptr undef) #4 + call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %chan.op) ret void } ; Function Attrs: nounwind -define hidden void @main.selectZeroRecv(ptr dereferenceable_or_null(32) %ch1, ptr dereferenceable_or_null(32) %ch2, ptr %context) unnamed_addr #2 { +define hidden void @main.selectZeroRecv(ptr dereferenceable_or_null(36) %ch1, ptr dereferenceable_or_null(36) %ch2, ptr %context) unnamed_addr #2 { entry: %select.states.alloca = alloca [2 x %runtime.chanSelectState], align 8 %select.send.value = alloca i32, align 4 @@ -88,7 +88,7 @@ entry: store ptr %ch2, ptr %0, align 4 %.repack3 = getelementptr inbounds [2 x %runtime.chanSelectState], ptr %select.states.alloca, i32 0, i32 1, i32 1 store ptr null, ptr %.repack3, align 4 - %select.result = call { i32, i1 } @runtime.tryChanSelect(ptr undef, ptr nonnull %select.states.alloca, i32 2, i32 2, ptr undef) #4 + %select.result = call { i32, i1 } @runtime.chanSelect(ptr undef, ptr nonnull %select.states.alloca, i32 2, i32 2, ptr null, i32 0, i32 0, ptr undef) #4 call void @llvm.lifetime.end.p0(i64 16, ptr nonnull %select.states.alloca) %1 = extractvalue { i32, i1 } %select.result, 0 %2 = icmp eq i32 %1, 0 @@ -105,7 +105,7 @@ select.body: ; preds = %select.next br label %select.done } -declare { i32, i1 } @runtime.tryChanSelect(ptr, ptr, i32, i32, ptr) #1 +declare { i32, i1 } @runtime.chanSelect(ptr, ptr, i32, i32, ptr, i32, i32, ptr) #1 attributes #0 = { allockind("alloc,zeroed") allocsize(0) "alloc-family"="runtime.alloc" "target-features"="+bulk-memory,+mutable-globals,+nontrapping-fptoint,+sign-ext" } attributes #1 = { "target-features"="+bulk-memory,+mutable-globals,+nontrapping-fptoint,+sign-ext" } diff --git a/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll b/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll index f149f3a0c..a57bb20f3 100644 --- a/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll +++ b/compiler/testdata/goroutine-cortex-m-qemu-tasks.ll @@ -135,13 +135,13 @@ entry: declare i32 @runtime.sliceCopy(ptr nocapture writeonly, ptr nocapture readonly, i32, i32, i32, ptr) #2 ; Function Attrs: nounwind -define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #1 { +define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #1 { entry: call void @runtime.chanClose(ptr %ch, ptr undef) #9 ret void } -declare void @runtime.chanClose(ptr dereferenceable_or_null(32), ptr) #2 +declare void @runtime.chanClose(ptr dereferenceable_or_null(36), ptr) #2 ; Function Attrs: nounwind define hidden void @main.startInterfaceMethod(ptr %itf.typecode, ptr %itf.value, ptr %context) unnamed_addr #1 { diff --git a/compiler/testdata/goroutine-wasm-asyncify.ll b/compiler/testdata/goroutine-wasm-asyncify.ll index 699b9f205..c4af76037 100644 --- a/compiler/testdata/goroutine-wasm-asyncify.ll +++ b/compiler/testdata/goroutine-wasm-asyncify.ll @@ -144,13 +144,13 @@ entry: declare i32 @runtime.sliceCopy(ptr nocapture writeonly, ptr nocapture readonly, i32, i32, i32, ptr) #1 ; Function Attrs: nounwind -define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(32) %ch, ptr %context) unnamed_addr #2 { +define hidden void @main.closeBuiltinGoroutine(ptr dereferenceable_or_null(36) %ch, ptr %context) unnamed_addr #2 { entry: call void @runtime.chanClose(ptr %ch, ptr undef) #9 ret void } -declare void @runtime.chanClose(ptr dereferenceable_or_null(32), ptr) #1 +declare void @runtime.chanClose(ptr dereferenceable_or_null(36), ptr) #1 ; Function Attrs: nounwind define hidden void @main.startInterfaceMethod(ptr %itf.typecode, ptr %itf.value, ptr %context) unnamed_addr #2 { diff --git a/src/runtime/chan.go b/src/runtime/chan.go index 1f0d7ced8..c62685700 100644 --- a/src/runtime/chan.go +++ b/src/runtime/chan.go @@ -1,27 +1,45 @@ package runtime // This file implements the 'chan' type and send/receive/select operations. - -// A channel can be in one of the following states: -// empty: -// No goroutine is waiting on a send or receive operation. The 'blocked' -// member is nil. -// recv: -// A goroutine tries to receive from the channel. This goroutine is stored -// in the 'blocked' member. -// send: -// The reverse of send. A goroutine tries to send to the channel. This -// goroutine is stored in the 'blocked' member. -// closed: -// The channel is closed. Sends will panic, receives will get a zero value -// plus optionally the indication that the channel is zero (with the -// comma-ok value in the task). // -// A send/recv transmission is completed by copying from the data element of the -// sending task to the data element of the receiving task, and setting -// the 'comma-ok' value to true. -// A receive operation on a closed channel is completed by zeroing the data -// element of the receiving task and setting the 'comma-ok' value to false. +// Every channel has a list of senders and a list of receivers, and possibly a +// queue. There is no 'channel state', the state is inferred from the available +// senders/receivers and values in the buffer. +// +// - A sender will first try to send the value to a waiting receiver if there is +// one, but only if there is nothing in the queue (to keep the values flowing +// in the correct order). If it can't, it will add the value in the queue and +// possibly wait as a sender if there's no space available. +// - A receiver will first try to read a value from the queue, but if there is +// none it will try to read from a sender in the list. It will block if it +// can't proceed. +// +// State is kept in various ways: +// +// - The sender value is stored in the sender 'channelOp', which is really a +// queue entry. This works for both senders and select operations: a select +// operation has a separate value to send for each case. +// - The receiver value is stored inside Task.Ptr. This works for receivers, and +// importantly also works for select which has a single buffer for every +// receive operation. +// - The `Task.Data` value stores how the channel operation proceeded. For +// normal send/receive operations, it starts at chanOperationWaiting and then +// is changed to chanOperationOk or chanOperationClosed depending on whether +// the send/receive proceeded normally or because it was closed. For a select +// operation, it also stores the 'case' index in the upper bits (zero for +// non-select operations) so that the select operation knows which case did +// proceed. +// The value is at the same time also a way that goroutines can be the first +// (and only) goroutine to 'take' a channel operation to change it from +// 'waiting' to any other value. This is important for the select statement +// because multiple goroutines could try to let different channels in the +// select statement proceed at the same time. By using Task.Data, only a +// single channel operation in the select statement can proceed. +// - It is possible for the channel queues to contain already-processed senders +// or receivers. This can happen when the select statement managed to proceed +// but the goroutine doing the select has not yet cleaned up the stale queue +// entries before returning. This should therefore only happen for a short +// period. import ( "internal/task" @@ -29,490 +47,283 @@ import ( "unsafe" ) -func chanDebug(ch *channel) { - if schedulerDebug { - if ch.bufSize > 0 { - println("--- channel update:", ch, ch.state.String(), ch.bufSize, ch.bufUsed) - } else { - println("--- channel update:", ch, ch.state.String()) - } - } +// The runtime implementation of the Go 'chan' type. +type channel struct { + closed bool + elementSize uintptr + bufCap uintptr // 'cap' + bufLen uintptr // 'len' + bufHead uintptr + bufTail uintptr + senders chanQueue + receivers chanQueue + buf unsafe.Pointer } -// channelBlockedList is a list of channel operations on a specific channel which are currently blocked. -type channelBlockedList struct { - // next is a pointer to the next blocked channel operation on the same channel. - next *channelBlockedList - - // t is the task associated with this channel operation. - // If this channel operation is not part of a select, then the pointer field of the state holds the data buffer. - // If this channel operation is part of a select, then the pointer field of the state holds the receive buffer. - // If this channel operation is a receive, then the data field should be set to zero when resuming due to channel closure. - t *task.Task - - // s is a pointer to the channel select state corresponding to this operation. - // This will be nil if and only if this channel operation is not part of a select statement. - // If this is a send operation, then the send buffer can be found in this select state. - s *chanSelectState - - // allSelectOps is a slice containing all of the channel operations involved with this select statement. - // Before resuming the task, all other channel operations on this select statement should be canceled by removing them from their corresponding lists. - allSelectOps []channelBlockedList +const ( + chanOperationWaiting = 0b00 // waiting for a send/receive operation to continue + chanOperationOk = 0b01 // successfully sent or received (not closed) + chanOperationClosed = 0b10 // channel was closed, the value has been zeroed + chanOperationMask = 0b11 +) + +type chanQueue struct { + first *channelOp } -// remove takes the current list of blocked channel operations and removes the specified operation. -// This returns the resulting list, or nil if the resulting list is empty. -// A nil receiver is treated as an empty list. -func (b *channelBlockedList) remove(old *channelBlockedList) *channelBlockedList { - if b == old { - return b.next - } - c := b - for ; c != nil && c.next != old; c = c.next { - } - if c != nil { - c.next = old.next - } - return b +// Pus the next channel operation to the queue. All appropriate fields must have +// been initialized already. +// This function must be called with interrupts disabled. +func (q *chanQueue) push(node *channelOp) { + node.next = q.first + q.first = node } -// detach removes all other channel operations that are part of the same select statement. -// If the input is not part of a select statement, this is a no-op. -// This must be called before resuming any task blocked on a channel operation in order to ensure that it is not placed on the runqueue twice. -func (b *channelBlockedList) detach() { - if b.allSelectOps == nil { - // nothing to do - return - } - for i, v := range b.allSelectOps { - // cancel all other channel operations that are part of this select statement - switch { - case &b.allSelectOps[i] == b: - // This entry is the one that was already detached. - continue - case v.t == nil: - // This entry is not used (nil channel). - continue +// Pop the next waiting channel from the queue. Channels that are no longer +// waiting (for example, when they're part of a select operation) will be +// skipped. +// This function must be called with interrupts disabled. +func (q *chanQueue) pop(chanOp uint64) *channelOp { + for { + if q.first == nil { + return nil } - v.s.ch.blocked = v.s.ch.blocked.remove(&b.allSelectOps[i]) - if v.s.ch.blocked == nil { - if v.s.value == nil { - // recv operation - if v.s.ch.state != chanStateClosed { - v.s.ch.state = chanStateEmpty - } - } else { - // send operation - if v.s.ch.bufUsed == 0 { - // unbuffered channel - v.s.ch.state = chanStateEmpty - } else { - // buffered channel - v.s.ch.state = chanStateBuf - } - } + + // Pop next from the queue. + popped := q.first + q.first = q.first.next + + // The new value for the 'data' field will be a combination of the + // channel operation and the select index. (The select index is 0 for + // non-select channel operations). + newDataValue := chanOp | uint64(popped.index<<2) + + // Try to be the first to proceed with this goroutine. + if popped.task.Data == chanOperationWaiting { + popped.task.Data = newDataValue + return popped } - chanDebug(v.s.ch) } } -type channel struct { - elementSize uintptr // the size of one value in this channel - bufSize uintptr // size of buffer (in elements) - state chanState - blocked *channelBlockedList - bufHead uintptr // head index of buffer (next push index) - bufTail uintptr // tail index of buffer (next pop index) - bufUsed uintptr // number of elements currently in buffer - buf unsafe.Pointer // pointer to first element of buffer +// Remove the given to-be-removed node from the queue if it is part of the +// queue. If there are multiple, only one will be removed. +// This function must be called with interrupts disabled. +func (q *chanQueue) remove(remove *channelOp) { + n := &q.first + for *n != nil { + if *n == remove { + *n = (*n).next + return + } + n = &((*n).next) + } +} + +type channelOp struct { + next *channelOp + task *task.Task + index uintptr // select index, 0 for non-select operation + value unsafe.Pointer // if this is a sender, this is the value to send +} + +type chanSelectState struct { + ch *channel + value unsafe.Pointer } -// chanMake creates a new channel with the given element size and buffer length in number of elements. -// This is a compiler intrinsic. func chanMake(elementSize uintptr, bufSize uintptr) *channel { return &channel{ elementSize: elementSize, - bufSize: bufSize, + bufCap: bufSize, buf: alloc(elementSize*bufSize, nil), } } // Return the number of entries in this chan, called from the len builtin. // A nil chan is defined as having length 0. -// -//go:inline func chanLen(c *channel) int { if c == nil { return 0 } - return int(c.bufUsed) + return int(c.bufLen) } // Return the capacity of this chan, called from the cap builtin. // A nil chan is defined as having capacity 0. -// -//go:inline func chanCap(c *channel) int { if c == nil { return 0 } - return int(c.bufSize) + return int(c.bufCap) } -// resumeRX resumes the next receiver and returns the destination pointer. -// If the ok value is true, then the caller is expected to store a value into this pointer. -func (ch *channel) resumeRX(ok bool) unsafe.Pointer { - // pop a blocked goroutine off the stack - var b *channelBlockedList - b, ch.blocked = ch.blocked, ch.blocked.next - - // get destination pointer - dst := b.t.Ptr - - if !ok { - // the result value is zero - memzero(dst, ch.elementSize) - b.t.Data = 0 - } - - if b.s != nil { - // tell the select op which case resumed - b.t.Ptr = unsafe.Pointer(b.s) - - // detach associated operations - b.detach() - } - - scheduleTask(b.t) - - return dst -} - -// resumeTX resumes the next sender and returns the source pointer. -// The caller is expected to read from the value in this pointer before yielding. -func (ch *channel) resumeTX() unsafe.Pointer { - // pop a blocked goroutine off the stack - var b *channelBlockedList - b, ch.blocked = ch.blocked, ch.blocked.next - - // get source pointer - src := b.t.Ptr - - if b.s != nil { - // use state's source pointer - src = b.s.value - - // tell the select op which case resumed - b.t.Ptr = unsafe.Pointer(b.s) - - // detach associated operations - b.detach() - } - - scheduleTask(b.t) - - return src -} - -// push value to end of channel if space is available -// returns whether there was space for the value in the buffer -func (ch *channel) push(value unsafe.Pointer) bool { - // immediately return false if the channel is not buffered - if ch.bufSize == 0 { - return false - } - - // ensure space is available - if ch.bufUsed == ch.bufSize { - return false - } - - // copy value to buffer - memcpy( - unsafe.Add(ch.buf, // pointer to the base of the buffer + offset = pointer to destination element - ch.elementSize*ch.bufHead), // element size * equivalent slice index = offset - value, - ch.elementSize, - ) - - // update buffer state - ch.bufUsed++ +// Push the value to the channel buffer array, for a send operation. +// This function may only be called when interrupts are disabled and it is known +// there is space available in the buffer. +func (ch *channel) bufferPush(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize) + ch.bufLen++ ch.bufHead++ - if ch.bufHead == ch.bufSize { + if ch.bufHead == ch.bufCap { ch.bufHead = 0 } - return true + memcpy(elemAddr, value, ch.elementSize) } -// pop value from channel buffer if one is available -// returns whether a value was popped or not -// result is stored into value pointer -func (ch *channel) pop(value unsafe.Pointer) bool { - // channel is empty - if ch.bufUsed == 0 { - return false - } - - // compute address of source - addr := unsafe.Add(ch.buf, (ch.elementSize * ch.bufTail)) - - // copy value from buffer - memcpy( - value, - addr, - ch.elementSize, - ) - - // zero buffer element to allow garbage collection of value - memzero( - addr, - ch.elementSize, - ) - - // update buffer state - ch.bufUsed-- - - // move tail up +// Pop a value from the channel buffer and store it in the 'value' pointer, for +// a receive operation. +// This function may only be called when interrupts are disabled and it is known +// there is at least one value available in the buffer. +func (ch *channel) bufferPop(value unsafe.Pointer) { + elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize) + ch.bufLen-- ch.bufTail++ - if ch.bufTail == ch.bufSize { + if ch.bufTail == ch.bufCap { ch.bufTail = 0 } - return true + memcpy(value, elemAddr, ch.elementSize) + + // Zero the value to allow the GC to collect it. + memzero(elemAddr, ch.elementSize) } -// try to send a value to a channel, without actually blocking -// returns whether the value was sent -// will panic if channel is closed +// Try to proceed with this send operation without blocking, and return whether +// the send succeeded. Interrupts must be disabled when calling this function. func (ch *channel) trySend(value unsafe.Pointer) bool { - if ch == nil { - // send to nil channel blocks forever - // this is non-blocking, so just say no - return false + // To make sure we send values in the correct order, we can only send + // directly to a receiver when there are no values in the buffer. + + // Do not allow sending on a closed channel. + if ch.closed { + // Note: we cannot currently recover from this panic. + // There's some state in the select statement especially that would be + // corrupted if we allowed recovering from this panic. + runtimePanic("send on closed channel") } - i := interrupt.Disable() - - switch ch.state { - case chanStateEmpty, chanStateBuf: - // try to dump the value directly into the buffer - if ch.push(value) { - ch.state = chanStateBuf - interrupt.Restore(i) + // There is no value in the buffer and we have a receiver available. Copy + // the value directly into the receiver. + if ch.bufLen == 0 { + if receiver := ch.receivers.pop(chanOperationOk); receiver != nil { + memcpy(receiver.task.Ptr, value, ch.elementSize) + scheduleTask(receiver.task) return true } - interrupt.Restore(i) - return false - case chanStateRecv: - // unblock receiver - dst := ch.resumeRX(true) - - // copy value to receiver - memcpy(dst, value, ch.elementSize) - - // change state to empty if there are no more receivers - if ch.blocked == nil { - ch.state = chanStateEmpty - } + } - interrupt.Restore(i) + // If there is space in the buffer (if this is a buffered channel), we can + // store the value in the buffer and continue. + if ch.bufLen < ch.bufCap { + ch.bufferPush(value) return true - case chanStateSend: - // something else is already waiting to send - interrupt.Restore(i) - return false - case chanStateClosed: - interrupt.Restore(i) - runtimePanic("send on closed channel") - default: - interrupt.Restore(i) - runtimePanic("invalid channel state") } - - interrupt.Restore(i) return false } -// try to receive a value from a channel, without really blocking -// returns whether a value was received -// second return is the comma-ok value -func (ch *channel) tryRecv(value unsafe.Pointer) (bool, bool) { +func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) { if ch == nil { - // receive from nil channel blocks forever - // this is non-blocking, so just say no - return false, false + // A nil channel blocks forever. Do not schedule this goroutine again. + deadlock() } - i := interrupt.Disable() - - switch ch.state { - case chanStateBuf, chanStateSend: - // try to pop the value directly from the buffer - if ch.pop(value) { - // unblock next sender if applicable - if ch.blocked != nil { - src := ch.resumeTX() - - // push sender's value into buffer - ch.push(src) - - if ch.blocked == nil { - // last sender unblocked - update state - ch.state = chanStateBuf - } - } - - if ch.bufUsed == 0 { - // channel empty - update state - ch.state = chanStateEmpty - } + mask := interrupt.Disable() - interrupt.Restore(i) - return true, true - } else if ch.blocked != nil { - // unblock next sender if applicable - src := ch.resumeTX() + // See whether we can proceed immediately, and if so, return early. + if ch.trySend(value) { + interrupt.Restore(mask) + return + } - // copy sender's value - memcpy(value, src, ch.elementSize) + // Can't proceed. Add us to the list of senders and wait until we're awoken. + t := task.Current() + t.Data = chanOperationWaiting + op.task = t + op.index = 0 + op.value = value + ch.senders.push(op) + interrupt.Restore(mask) + + // Wait until this goroutine is resumed. + task.Pause() - if ch.blocked == nil { - // last sender unblocked - update state - ch.state = chanStateEmpty - } + // Check whether the sent happened normally (not because the channel was + // closed while sending). + if t.Data == chanOperationClosed { + // Oops, this channel was closed while sending! + runtimePanic("send on closed channel") + } +} - interrupt.Restore(i) - return true, true - } - interrupt.Restore(i) - return false, false - case chanStateRecv, chanStateEmpty: - // something else is already waiting to receive - interrupt.Restore(i) - return false, false - case chanStateClosed: - if ch.pop(value) { - interrupt.Restore(i) - return true, true +// Try to proceed with this receive operation without blocking, and return +// whether the receive operation succeeded. Interrupts must be disabled when +// calling this function. +func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) { + // To make sure we keep the values in the channel in the correct order, we + // first have to read values from the buffer before we can look at the + // senders. + + // If there is a value available in the buffer, we can pull it out and + // proceed immediately. + if ch.bufLen > 0 { + ch.bufferPop(value) + + // Check for the next sender available and push it to the buffer. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + ch.bufferPush(sender.value) + scheduleTask(sender.task) } - // channel closed - nothing to receive + return true, true + } + + if ch.closed { + // Channel is closed, so proceed immediately. memzero(value, ch.elementSize) - interrupt.Restore(i) return true, false - default: - runtimePanic("invalid channel state") } - runtimePanic("unreachable") - return false, false -} - -type chanState uint8 - -const ( - chanStateEmpty chanState = iota // nothing in channel, no senders/receivers - chanStateRecv // nothing in channel, receivers waiting - chanStateSend // senders waiting, buffer full if present - chanStateBuf // buffer not empty, no senders waiting - chanStateClosed // channel closed -) - -func (s chanState) String() string { - switch s { - case chanStateEmpty: - return "empty" - case chanStateRecv: - return "recv" - case chanStateSend: - return "send" - case chanStateBuf: - return "buffered" - case chanStateClosed: - return "closed" - default: - return "invalid" + // If there is a sender, we can proceed with the channel operation + // immediately. + if sender := ch.senders.pop(chanOperationOk); sender != nil { + memcpy(value, sender.value, ch.elementSize) + scheduleTask(sender.task) + return true, true } -} -// chanSelectState is a single channel operation (send/recv) in a select -// statement. The value pointer is either nil (for receives) or points to the -// value to send (for sends). -type chanSelectState struct { - ch *channel - value unsafe.Pointer + return false, false } -// chanSend sends a single value over the channel. -// This operation will block unless a value is immediately available. -// May panic if the channel is closed. -func chanSend(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) { - i := interrupt.Disable() - - if ch.trySend(value) { - // value immediately sent - chanDebug(ch) - interrupt.Restore(i) - return - } - +func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool { if ch == nil { // A nil channel blocks forever. Do not schedule this goroutine again. - interrupt.Restore(i) deadlock() } - // wait for receiver - sender := task.Current() - ch.state = chanStateSend - sender.Ptr = value - *blockedlist = channelBlockedList{ - next: ch.blocked, - t: sender, - } - ch.blocked = blockedlist - chanDebug(ch) - interrupt.Restore(i) - task.Pause() - sender.Ptr = nil -} + mask := interrupt.Disable() -// chanRecv receives a single value over a channel. -// It blocks if there is no available value to receive. -// The received value is copied into the value pointer. -// Returns the comma-ok value. -func chanRecv(ch *channel, value unsafe.Pointer, blockedlist *channelBlockedList) bool { - i := interrupt.Disable() - - if rx, ok := ch.tryRecv(value); rx { - // value immediately available - chanDebug(ch) - interrupt.Restore(i) + if received, ok := ch.tryRecv(value); received { + interrupt.Restore(mask) return ok } - if ch == nil { - // A nil channel blocks forever. Do not schedule this goroutine again. - interrupt.Restore(i) - deadlock() - } - - // wait for a value - receiver := task.Current() - ch.state = chanStateRecv - receiver.Ptr, receiver.Data = value, 1 - *blockedlist = channelBlockedList{ - next: ch.blocked, - t: receiver, - } - ch.blocked = blockedlist - chanDebug(ch) - interrupt.Restore(i) + // We can't proceed, so we add ourselves to the list of receivers and wait + // until we're awoken. + t := task.Current() + t.Ptr = value + t.Data = chanOperationWaiting + op.task = t + op.index = 0 + ch.receivers.push(op) + interrupt.Restore(mask) + + // Wait until the goroutine is resumed. task.Pause() - ok := receiver.Data == 1 - receiver.Ptr, receiver.Data = nil, 0 - return ok + + // Return whether the receive happened from a closed channel. + return t.Data != chanOperationClosed } // chanClose closes the given channel. If this channel has a receiver or is @@ -522,128 +333,135 @@ func chanClose(ch *channel) { // Not allowed by the language spec. runtimePanic("close of nil channel") } - i := interrupt.Disable() - switch ch.state { - case chanStateClosed: + + mask := interrupt.Disable() + + if ch.closed { // Not allowed by the language spec. - interrupt.Restore(i) + interrupt.Restore(mask) runtimePanic("close of closed channel") - case chanStateSend: - // This panic should ideally on the sending side, not in this goroutine. - // But when a goroutine tries to send while the channel is being closed, - // that is clearly invalid: the send should have been completed already - // before the close. - interrupt.Restore(i) - runtimePanic("close channel during send") - case chanStateRecv: - // unblock all receivers with the zero value - ch.state = chanStateClosed - for ch.blocked != nil { - ch.resumeRX(false) + } + + // Proceed all receiving operations that are blocked. + for { + receiver := ch.receivers.pop(chanOperationClosed) + if receiver == nil { + // Processed all receivers. + break } - case chanStateEmpty, chanStateBuf: - // Easy case. No available sender or receiver. + + // Zero the value that the receiver is getting. + memzero(receiver.task.Ptr, ch.elementSize) + + // Wake up the receiving goroutine. + scheduleTask(receiver.task) } - ch.state = chanStateClosed - interrupt.Restore(i) - chanDebug(ch) -} -// chanSelect is the runtime implementation of the select statement. This is -// perhaps the most complicated statement in the Go spec. It returns the -// selected index and the 'comma-ok' value. -// -// TODO: do this in a round-robin fashion (as specified in the Go spec) instead -// of picking the first one that can proceed. -func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelBlockedList) (uintptr, bool) { - istate := interrupt.Disable() - - if selected, ok := tryChanSelect(recvbuf, states); selected != ^uintptr(0) { - // one channel was immediately ready - interrupt.Restore(istate) - return selected, ok + // Let all senders panic. + for { + sender := ch.senders.pop(chanOperationClosed) + if sender == nil { + break // processed all senders + } + + // Wake up the sender. + scheduleTask(sender.task) } - // construct blocked operations - for i, v := range states { - if v.ch == nil { - // A nil channel receive will never complete. - // A nil channel send would have panicked during tryChanSelect. - ops[i] = channelBlockedList{} + ch.closed = true + + interrupt.Restore(mask) +} + +// chanSelect implements blocking or non-blocking select operations. +// The 'ops' slice must be set if (and only if) this is a blocking select. +func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uintptr, bool) { + mask := interrupt.Disable() + + const selectNoIndex = ^uintptr(0) + selectIndex := selectNoIndex + selectOk := true + + // Iterate over each state, and see if it can proceed. + // TODO: start from a random index. + for i, state := range states { + if state.ch == nil { + // A nil channel blocks forever, so it won't take part of the select + // operation. continue } - ops[i] = channelBlockedList{ - next: v.ch.blocked, - t: task.Current(), - s: &states[i], - allSelectOps: ops, - } - v.ch.blocked = &ops[i] - if v.value == nil { - // recv - switch v.ch.state { - case chanStateEmpty: - v.ch.state = chanStateRecv - case chanStateRecv: - // already in correct state - default: - interrupt.Restore(istate) - runtimePanic("invalid channel state") + if state.value == nil { // chan receive + if received, ok := state.ch.tryRecv(recvbuf); received { + selectIndex = uintptr(i) + selectOk = ok + break } - } else { - // send - switch v.ch.state { - case chanStateEmpty: - v.ch.state = chanStateSend - case chanStateSend: - // already in correct state - case chanStateBuf: - // already in correct state - default: - interrupt.Restore(istate) - runtimePanic("invalid channel state") + } else { // chan send + if state.ch.trySend(state.value) { + selectIndex = uintptr(i) + break } } - chanDebug(v.ch) } - // expose rx buffer + // If this select can immediately proceed, or is a non-blocking select, + // return early. + blocking := len(ops) != 0 + if selectIndex != selectNoIndex || !blocking { + interrupt.Restore(mask) + return selectIndex, selectOk + } + + // The select is blocking and no channel operation can proceed, so things + // become more complicated. + // We add ourselves as a sender/receiver to every channel, and wait for the + // first one to complete. Only one will successfully complete, because + // senders and receivers will check t.Data for the state so that only one + // will be able to "take" this select operation. t := task.Current() t.Ptr = recvbuf - t.Data = 1 + t.Data = chanOperationWaiting + for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + op.task = t + op.index = uintptr(i) + if state.value == nil { // chan receive + state.ch.receivers.push(op) + } else { // chan send + op.value = state.value + state.ch.senders.push(op) + } + } - // wait for one case to fire - interrupt.Restore(istate) + // Now we wait until one of the send/receive operations can proceed. + interrupt.Restore(mask) task.Pause() - // figure out which one fired and return the ok value - return (uintptr(t.Ptr) - uintptr(unsafe.Pointer(&states[0]))) / unsafe.Sizeof(chanSelectState{}), t.Data != 0 -} - -// tryChanSelect is like chanSelect, but it does a non-blocking select operation. -func tryChanSelect(recvbuf unsafe.Pointer, states []chanSelectState) (uintptr, bool) { - istate := interrupt.Disable() + // Resumed, so one channel operation must have progressed. - // See whether we can receive from one of the channels. + // Make sure all channel ops are removed from the senders/receivers + // queue before we return and the memory of them becomes invalid. for i, state := range states { + if state.ch == nil { + continue + } + op := &ops[i] + mask := interrupt.Disable() if state.value == nil { - // A receive operation. - if rx, ok := state.ch.tryRecv(recvbuf); rx { - chanDebug(state.ch) - interrupt.Restore(istate) - return uintptr(i), ok - } + state.ch.receivers.remove(op) } else { - // A send operation: state.value is not nil. - if state.ch.trySend(state.value) { - chanDebug(state.ch) - interrupt.Restore(istate) - return uintptr(i), true - } + state.ch.senders.remove(op) } + interrupt.Restore(mask) } - interrupt.Restore(istate) - return ^uintptr(0), false + // Pull the return values out of t.Data (which contains two bitfields). + selectIndex = uintptr(t.Data) >> 2 + selectOk = t.Data&chanOperationMask != chanOperationClosed + + return selectIndex, selectOk } |