aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/net
diff options
context:
space:
mode:
authorHrishi Hiraskar <[email protected]>2022-10-14 17:05:48 +0530
committerRon Evans <[email protected]>2022-10-18 07:50:31 +0200
commitcad5b79a2daad45d7eaae9887d352f9457ca4990 (patch)
treee15381385d8d78729ecb6fa1622a177d4469207e /src/net
parentd56c9f55335d61858dcfe2cdc66ec19ac56002fc (diff)
downloadtinygo-cad5b79a2daad45d7eaae9887d352f9457ca4990.tar.gz
tinygo-cad5b79a2daad45d7eaae9887d352f9457ca4990.zip
net: implement Pipe
Diffstat (limited to 'src/net')
-rw-r--r--src/net/conn_test.go478
-rw-r--r--src/net/pipe.go240
-rw-r--r--src/net/pipe_test.go48
3 files changed, 766 insertions, 0 deletions
diff --git a/src/net/conn_test.go b/src/net/conn_test.go
new file mode 100644
index 000000000..4e1ac28c4
--- /dev/null
+++ b/src/net/conn_test.go
@@ -0,0 +1,478 @@
+// The following is copied from x/net official implementation.
+// Source: https://cs.opensource.google/go/x/net/+/f15817d1:nettest/conntest.go
+// Changes from original the file:
+// - Some variables are pulled in from nettest/nettest.go file.
+// - The implementation of checkForTimeoutError() function is changed in
+// accordance with error returned by the Pipe implementation.
+
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "bytes"
+ "encoding/binary"
+ "io"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "runtime"
+ "sync"
+ "testing"
+ "time"
+)
+
+// The following variables are copied from nettest/nettest.go file
+var (
+ aLongTimeAgo = time.Unix(233431200, 0)
+ neverTimeout = time.Time{}
+)
+
+// MakePipe creates a connection between two endpoints and returns the pair
+// as c1 and c2, such that anything written to c1 is read by c2 and vice-versa.
+// The stop function closes all resources, including c1, c2, and the underlying
+// Listener (if there is one), and should not be nil.
+type MakePipe func() (c1, c2 Conn, stop func(), err error)
+
+// testConn tests that a Conn implementation properly satisfies the interface.
+// The tests should not produce any false positives, but may experience
+// false negatives. Thus, some issues may only be detected when the test is
+// run multiple times. For maximal effectiveness, run the tests under the
+// race detector.
+func testConn(t *testing.T, mp MakePipe) {
+ t.Run("BasicIO", func(t *testing.T) { timeoutWrapper(t, mp, testBasicIO) })
+ t.Run("PingPong", func(t *testing.T) { timeoutWrapper(t, mp, testPingPong) })
+ t.Run("RacyRead", func(t *testing.T) { timeoutWrapper(t, mp, testRacyRead) })
+ t.Run("RacyWrite", func(t *testing.T) { timeoutWrapper(t, mp, testRacyWrite) })
+ t.Run("ReadTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testReadTimeout) })
+ t.Run("WriteTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testWriteTimeout) })
+ t.Run("PastTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPastTimeout) })
+ t.Run("PresentTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testPresentTimeout) })
+ t.Run("FutureTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testFutureTimeout) })
+ t.Run("CloseTimeout", func(t *testing.T) { timeoutWrapper(t, mp, testCloseTimeout) })
+ t.Run("ConcurrentMethods", func(t *testing.T) { timeoutWrapper(t, mp, testConcurrentMethods) })
+}
+
+type connTester func(t *testing.T, c1, c2 Conn)
+
+func timeoutWrapper(t *testing.T, mp MakePipe, f connTester) {
+ t.Helper()
+ c1, c2, stop, err := mp()
+ if err != nil {
+ t.Fatalf("unable to make pipe: %v", err)
+ }
+ var once sync.Once
+ defer once.Do(func() { stop() })
+ timer := time.AfterFunc(time.Minute, func() {
+ once.Do(func() {
+ t.Error("test timed out; terminating pipe")
+ stop()
+ })
+ })
+ defer timer.Stop()
+ f(t, c1, c2)
+}
+
+// testBasicIO tests that the data sent on c1 is properly received on c2.
+func testBasicIO(t *testing.T, c1, c2 Conn) {
+ want := make([]byte, 1<<20)
+ rand.New(rand.NewSource(0)).Read(want)
+
+ dataCh := make(chan []byte)
+ go func() {
+ rd := bytes.NewReader(want)
+ if err := chunkedCopy(c1, rd); err != nil {
+ t.Errorf("unexpected c1.Write error: %v", err)
+ }
+ if err := c1.Close(); err != nil {
+ t.Errorf("unexpected c1.Close error: %v", err)
+ }
+ }()
+
+ go func() {
+ wr := new(bytes.Buffer)
+ if err := chunkedCopy(wr, c2); err != nil {
+ t.Errorf("unexpected c2.Read error: %v", err)
+ }
+ if err := c2.Close(); err != nil {
+ t.Errorf("unexpected c2.Close error: %v", err)
+ }
+ dataCh <- wr.Bytes()
+ }()
+
+ if got := <-dataCh; !bytes.Equal(got, want) {
+ t.Error("transmitted data differs")
+ }
+}
+
+// testPingPong tests that the two endpoints can synchronously send data to
+// each other in a typical request-response pattern.
+func testPingPong(t *testing.T, c1, c2 Conn) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ pingPonger := func(c Conn) {
+ defer wg.Done()
+ buf := make([]byte, 8)
+ var prev uint64
+ for {
+ if _, err := io.ReadFull(c, buf); err != nil {
+ if err == io.EOF {
+ break
+ }
+ t.Errorf("unexpected Read error: %v", err)
+ }
+
+ v := binary.LittleEndian.Uint64(buf)
+ binary.LittleEndian.PutUint64(buf, v+1)
+ if prev != 0 && prev+2 != v {
+ t.Errorf("mismatching value: got %d, want %d", v, prev+2)
+ }
+ prev = v
+ if v == 1000 {
+ break
+ }
+
+ if _, err := c.Write(buf); err != nil {
+ t.Errorf("unexpected Write error: %v", err)
+ break
+ }
+ }
+ if err := c.Close(); err != nil {
+ t.Errorf("unexpected Close error: %v", err)
+ }
+ }
+
+ wg.Add(2)
+ go pingPonger(c1)
+ go pingPonger(c2)
+
+ // Start off the chain reaction.
+ if _, err := c1.Write(make([]byte, 8)); err != nil {
+ t.Errorf("unexpected c1.Write error: %v", err)
+ }
+}
+
+// testRacyRead tests that it is safe to mutate the input Read buffer
+// immediately after cancelation has occurred.
+func testRacyRead(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(c2, rand.New(rand.NewSource(0)))
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ c1.SetReadDeadline(time.Now().Add(time.Millisecond))
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ b1 := make([]byte, 1024)
+ b2 := make([]byte, 1024)
+ for j := 0; j < 100; j++ {
+ _, err := c1.Read(b1)
+ copy(b1, b2) // Mutate b1 to trigger potential race
+ if err != nil {
+ checkForTimeoutError(t, err)
+ c1.SetReadDeadline(time.Now().Add(time.Millisecond))
+ }
+ }
+ }()
+ }
+}
+
+// testRacyWrite tests that it is safe to mutate the input Write buffer
+// immediately after cancelation has occurred.
+func testRacyWrite(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(ioutil.Discard, c2)
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ c1.SetWriteDeadline(time.Now().Add(time.Millisecond))
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ b1 := make([]byte, 1024)
+ b2 := make([]byte, 1024)
+ for j := 0; j < 100; j++ {
+ _, err := c1.Write(b1)
+ copy(b1, b2) // Mutate b1 to trigger potential race
+ if err != nil {
+ checkForTimeoutError(t, err)
+ c1.SetWriteDeadline(time.Now().Add(time.Millisecond))
+ }
+ }
+ }()
+ }
+}
+
+// testReadTimeout tests that Read timeouts do not affect Write.
+func testReadTimeout(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(ioutil.Discard, c2)
+
+ c1.SetReadDeadline(aLongTimeAgo)
+ _, err := c1.Read(make([]byte, 1024))
+ checkForTimeoutError(t, err)
+ if _, err := c1.Write(make([]byte, 1024)); err != nil {
+ t.Errorf("unexpected Write error: %v", err)
+ }
+}
+
+// testWriteTimeout tests that Write timeouts do not affect Read.
+func testWriteTimeout(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(c2, rand.New(rand.NewSource(0)))
+
+ c1.SetWriteDeadline(aLongTimeAgo)
+ _, err := c1.Write(make([]byte, 1024))
+ checkForTimeoutError(t, err)
+ if _, err := c1.Read(make([]byte, 1024)); err != nil {
+ t.Errorf("unexpected Read error: %v", err)
+ }
+}
+
+// testPastTimeout tests that a deadline set in the past immediately times out
+// Read and Write requests.
+func testPastTimeout(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(c2, c2)
+
+ testRoundtrip(t, c1)
+
+ c1.SetDeadline(aLongTimeAgo)
+ n, err := c1.Write(make([]byte, 1024))
+ if n != 0 {
+ t.Errorf("unexpected Write count: got %d, want 0", n)
+ }
+ checkForTimeoutError(t, err)
+ n, err = c1.Read(make([]byte, 1024))
+ if n != 0 {
+ t.Errorf("unexpected Read count: got %d, want 0", n)
+ }
+ checkForTimeoutError(t, err)
+
+ testRoundtrip(t, c1)
+}
+
+// testPresentTimeout tests that a past deadline set while there are pending
+// Read and Write operations immediately times out those operations.
+func testPresentTimeout(t *testing.T, c1, c2 Conn) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(3)
+
+ deadlineSet := make(chan bool, 1)
+ go func() {
+ defer wg.Done()
+ time.Sleep(100 * time.Millisecond)
+ deadlineSet <- true
+ c1.SetReadDeadline(aLongTimeAgo)
+ c1.SetWriteDeadline(aLongTimeAgo)
+ }()
+ go func() {
+ defer wg.Done()
+ n, err := c1.Read(make([]byte, 1024))
+ if n != 0 {
+ t.Errorf("unexpected Read count: got %d, want 0", n)
+ }
+ checkForTimeoutError(t, err)
+ if len(deadlineSet) == 0 {
+ t.Error("Read timed out before deadline is set")
+ }
+ }()
+ go func() {
+ defer wg.Done()
+ var err error
+ for err == nil {
+ _, err = c1.Write(make([]byte, 1024))
+ }
+ checkForTimeoutError(t, err)
+ if len(deadlineSet) == 0 {
+ t.Error("Write timed out before deadline is set")
+ }
+ }()
+}
+
+// testFutureTimeout tests that a future deadline will eventually time out
+// Read and Write operations.
+func testFutureTimeout(t *testing.T, c1, c2 Conn) {
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ c1.SetDeadline(time.Now().Add(100 * time.Millisecond))
+ go func() {
+ defer wg.Done()
+ _, err := c1.Read(make([]byte, 1024))
+ checkForTimeoutError(t, err)
+ }()
+ go func() {
+ defer wg.Done()
+ var err error
+ for err == nil {
+ _, err = c1.Write(make([]byte, 1024))
+ }
+ checkForTimeoutError(t, err)
+ }()
+ wg.Wait()
+
+ go chunkedCopy(c2, c2)
+ resyncConn(t, c1)
+ testRoundtrip(t, c1)
+}
+
+// testCloseTimeout tests that calling Close immediately times out pending
+// Read and Write operations.
+func testCloseTimeout(t *testing.T, c1, c2 Conn) {
+ go chunkedCopy(c2, c2)
+
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(3)
+
+ // Test for cancelation upon connection closure.
+ c1.SetDeadline(neverTimeout)
+ go func() {
+ defer wg.Done()
+ time.Sleep(100 * time.Millisecond)
+ c1.Close()
+ }()
+ go func() {
+ defer wg.Done()
+ var err error
+ buf := make([]byte, 1024)
+ for err == nil {
+ _, err = c1.Read(buf)
+ }
+ }()
+ go func() {
+ defer wg.Done()
+ var err error
+ buf := make([]byte, 1024)
+ for err == nil {
+ _, err = c1.Write(buf)
+ }
+ }()
+}
+
+// testConcurrentMethods tests that the methods of Conn can safely
+// be called concurrently.
+func testConcurrentMethods(t *testing.T, c1, c2 Conn) {
+ if runtime.GOOS == "plan9" {
+ t.Skip("skipping on plan9; see https://golang.org/issue/20489")
+ }
+ go chunkedCopy(c2, c2)
+
+ // The results of the calls may be nonsensical, but this should
+ // not trigger a race detector warning.
+ var wg sync.WaitGroup
+ for i := 0; i < 100; i++ {
+ wg.Add(7)
+ go func() {
+ defer wg.Done()
+ c1.Read(make([]byte, 1024))
+ }()
+ go func() {
+ defer wg.Done()
+ c1.Write(make([]byte, 1024))
+ }()
+ go func() {
+ defer wg.Done()
+ c1.SetDeadline(time.Now().Add(10 * time.Millisecond))
+ }()
+ go func() {
+ defer wg.Done()
+ c1.SetReadDeadline(aLongTimeAgo)
+ }()
+ go func() {
+ defer wg.Done()
+ c1.SetWriteDeadline(aLongTimeAgo)
+ }()
+ go func() {
+ defer wg.Done()
+ c1.LocalAddr()
+ }()
+ go func() {
+ defer wg.Done()
+ c1.RemoteAddr()
+ }()
+ }
+ wg.Wait() // At worst, the deadline is set 10ms into the future
+
+ resyncConn(t, c1)
+ testRoundtrip(t, c1)
+}
+
+// checkForTimeoutError checks that the error satisfies the OpError interface
+// and that underlying Err is os.ErrDeadlineExceeded
+func checkForTimeoutError(t *testing.T, err error) {
+ t.Helper()
+ operr, ok := err.(*OpError)
+ if !ok {
+ t.Errorf("got %T: %v, want OpError", err, err)
+ return
+ }
+ if operr.Err != os.ErrDeadlineExceeded {
+ t.Errorf("got %T: %v, want os.ErrDeadlineExceeded", err, err)
+ }
+}
+
+// testRoundtrip writes something into c and reads it back.
+// It assumes that everything written into c is echoed back to itself.
+func testRoundtrip(t *testing.T, c Conn) {
+ t.Helper()
+ if err := c.SetDeadline(neverTimeout); err != nil {
+ t.Errorf("roundtrip SetDeadline error: %v", err)
+ }
+
+ const s = "Hello, world!"
+ buf := []byte(s)
+ if _, err := c.Write(buf); err != nil {
+ t.Errorf("roundtrip Write error: %v", err)
+ }
+ if _, err := io.ReadFull(c, buf); err != nil {
+ t.Errorf("roundtrip Read error: %v", err)
+ }
+ if string(buf) != s {
+ t.Errorf("roundtrip data mismatch: got %q, want %q", buf, s)
+ }
+}
+
+// resyncConn resynchronizes the connection into a sane state.
+// It assumes that everything written into c is echoed back to itself.
+// It assumes that 0xff is not currently on the wire or in the read buffer.
+func resyncConn(t *testing.T, c Conn) {
+ t.Helper()
+ c.SetDeadline(neverTimeout)
+ errCh := make(chan error)
+ go func() {
+ _, err := c.Write([]byte{0xff})
+ errCh <- err
+ }()
+ buf := make([]byte, 1024)
+ for {
+ n, err := c.Read(buf)
+ if n > 0 && bytes.IndexByte(buf[:n], 0xff) == n-1 {
+ break
+ }
+ if err != nil {
+ t.Errorf("unexpected Read error: %v", err)
+ break
+ }
+ }
+ if err := <-errCh; err != nil {
+ t.Errorf("unexpected Write error: %v", err)
+ }
+}
+
+// chunkedCopy copies from r to w in fixed-width chunks to avoid
+// causing a Write that exceeds the maximum packet size for packet-based
+// connections like "unixpacket".
+// We assume that the maximum packet size is at least 1024.
+func chunkedCopy(w io.Writer, r io.Reader) error {
+ b := make([]byte, 1024)
+ _, err := io.CopyBuffer(struct{ io.Writer }{w}, struct{ io.Reader }{r}, b)
+ return err
+}
diff --git a/src/net/pipe.go b/src/net/pipe.go
new file mode 100644
index 000000000..02dd07cf9
--- /dev/null
+++ b/src/net/pipe.go
@@ -0,0 +1,240 @@
+// The following is copied from Go 1.19.2 official implementation.
+
+// Copyright 2010 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "io"
+ "os"
+ "sync"
+ "time"
+)
+
+// pipeDeadline is an abstraction for handling timeouts.
+type pipeDeadline struct {
+ mu sync.Mutex // Guards timer and cancel
+ timer *time.Timer
+ cancel chan struct{} // Must be non-nil
+}
+
+func makePipeDeadline() pipeDeadline {
+ return pipeDeadline{cancel: make(chan struct{})}
+}
+
+// set sets the point in time when the deadline will time out.
+// A timeout event is signaled by closing the channel returned by waiter.
+// Once a timeout has occurred, the deadline can be refreshed by specifying a
+// t value in the future.
+//
+// A zero value for t prevents timeout.
+func (d *pipeDeadline) set(t time.Time) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if d.timer != nil && !d.timer.Stop() {
+ <-d.cancel // Wait for the timer callback to finish and close cancel
+ }
+ d.timer = nil
+
+ // Time is zero, then there is no deadline.
+ closed := isClosedChan(d.cancel)
+ if t.IsZero() {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ return
+ }
+
+ // Time in the future, setup a timer to cancel in the future.
+ if dur := time.Until(t); dur > 0 {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ d.timer = time.AfterFunc(dur, func() {
+ close(d.cancel)
+ })
+ return
+ }
+
+ // Time in the past, so close immediately.
+ if !closed {
+ close(d.cancel)
+ }
+}
+
+// wait returns a channel that is closed when the deadline is exceeded.
+func (d *pipeDeadline) wait() chan struct{} {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.cancel
+}
+
+func isClosedChan(c <-chan struct{}) bool {
+ select {
+ case <-c:
+ return true
+ default:
+ return false
+ }
+}
+
+type pipeAddr struct{}
+
+func (pipeAddr) Network() string { return "pipe" }
+func (pipeAddr) String() string { return "pipe" }
+
+type pipe struct {
+ wrMu sync.Mutex // Serialize Write operations
+
+ // Used by local Read to interact with remote Write.
+ // Successful receive on rdRx is always followed by send on rdTx.
+ rdRx <-chan []byte
+ rdTx chan<- int
+
+ // Used by local Write to interact with remote Read.
+ // Successful send on wrTx is always followed by receive on wrRx.
+ wrTx chan<- []byte
+ wrRx <-chan int
+
+ once sync.Once // Protects closing localDone
+ localDone chan struct{}
+ remoteDone <-chan struct{}
+
+ readDeadline pipeDeadline
+ writeDeadline pipeDeadline
+}
+
+// Pipe creates a synchronous, in-memory, full duplex
+// network connection; both ends implement the Conn interface.
+// Reads on one end are matched with writes on the other,
+// copying data directly between the two; there is no internal
+// buffering.
+func Pipe() (Conn, Conn) {
+ cb1 := make(chan []byte)
+ cb2 := make(chan []byte)
+ cn1 := make(chan int)
+ cn2 := make(chan int)
+ done1 := make(chan struct{})
+ done2 := make(chan struct{})
+
+ p1 := &pipe{
+ rdRx: cb1, rdTx: cn1,
+ wrTx: cb2, wrRx: cn2,
+ localDone: done1, remoteDone: done2,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ p2 := &pipe{
+ rdRx: cb2, rdTx: cn2,
+ wrTx: cb1, wrRx: cn1,
+ localDone: done2, remoteDone: done1,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ return p1, p2
+}
+
+func (*pipe) LocalAddr() Addr { return pipeAddr{} }
+func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
+
+func (p *pipe) Read(b []byte) (int, error) {
+ n, err := p.read(b)
+ if err != nil && err != io.EOF && err != io.ErrClosedPipe {
+ err = &OpError{Op: "read", Net: "pipe", Err: err}
+ }
+ return n, err
+}
+
+func (p *pipe) read(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.EOF
+ case isClosedChan(p.readDeadline.wait()):
+ return 0, os.ErrDeadlineExceeded
+ }
+
+ select {
+ case bw := <-p.rdRx:
+ nr := copy(b, bw)
+ p.rdTx <- nr
+ return nr, nil
+ case <-p.localDone:
+ return 0, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return 0, io.EOF
+ case <-p.readDeadline.wait():
+ return 0, os.ErrDeadlineExceeded
+ }
+}
+
+func (p *pipe) Write(b []byte) (int, error) {
+ n, err := p.write(b)
+ if err != nil && err != io.ErrClosedPipe {
+ err = &OpError{Op: "write", Net: "pipe", Err: err}
+ }
+ return n, err
+}
+
+func (p *pipe) write(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.writeDeadline.wait()):
+ return 0, os.ErrDeadlineExceeded
+ }
+
+ p.wrMu.Lock() // Ensure entirety of b is written together
+ defer p.wrMu.Unlock()
+ for once := true; once || len(b) > 0; once = false {
+ select {
+ case p.wrTx <- b:
+ nw := <-p.wrRx
+ b = b[nw:]
+ n += nw
+ case <-p.localDone:
+ return n, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return n, io.ErrClosedPipe
+ case <-p.writeDeadline.wait():
+ return n, os.ErrDeadlineExceeded
+ }
+ }
+ return n, nil
+}
+
+func (p *pipe) SetDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ p.writeDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) SetReadDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) SetWriteDeadline(t time.Time) error {
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.writeDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) Close() error {
+ p.once.Do(func() { close(p.localDone) })
+ return nil
+}
diff --git a/src/net/pipe_test.go b/src/net/pipe_test.go
new file mode 100644
index 000000000..7978fc6aa
--- /dev/null
+++ b/src/net/pipe_test.go
@@ -0,0 +1,48 @@
+// The following is copied from Go 1.19.2 official implementation.
+
+// Copyright 2010 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "io"
+ "testing"
+ "time"
+)
+
+func TestPipe(t *testing.T) {
+ testConn(t, func() (c1, c2 Conn, stop func(), err error) {
+ c1, c2 = Pipe()
+ stop = func() {
+ c1.Close()
+ c2.Close()
+ }
+ return
+ })
+}
+
+func TestPipeCloseError(t *testing.T) {
+ c1, c2 := Pipe()
+ c1.Close()
+
+ if _, err := c1.Read(nil); err != io.ErrClosedPipe {
+ t.Errorf("c1.Read() = %v, want io.ErrClosedPipe", err)
+ }
+ if _, err := c1.Write(nil); err != io.ErrClosedPipe {
+ t.Errorf("c1.Write() = %v, want io.ErrClosedPipe", err)
+ }
+ if err := c1.SetDeadline(time.Time{}); err != io.ErrClosedPipe {
+ t.Errorf("c1.SetDeadline() = %v, want io.ErrClosedPipe", err)
+ }
+ if _, err := c2.Read(nil); err != io.EOF {
+ t.Errorf("c2.Read() = %v, want io.EOF", err)
+ }
+ if _, err := c2.Write(nil); err != io.ErrClosedPipe {
+ t.Errorf("c2.Write() = %v, want io.ErrClosedPipe", err)
+ }
+ if err := c2.SetDeadline(time.Time{}); err != io.ErrClosedPipe {
+ t.Errorf("c2.SetDeadline() = %v, want io.ErrClosedPipe", err)
+ }
+}