diff options
author | linquize <[email protected]> | 2019-07-20 03:29:49 +0800 |
---|---|---|
committer | Matt Holt <[email protected]> | 2019-07-19 13:29:49 -0600 |
commit | 0ba427a6f44edb5169e022e3cbc23f7b6de35a81 (patch) | |
tree | c998bd5517b50dd338bd0009089357edfbd98131 | |
parent | 7fab1b15c80835a06ed722a06fb0b1e31b1efe04 (diff) | |
download | caddy-0ba427a6f44edb5169e022e3cbc23f7b6de35a81.tar.gz caddy-0ba427a6f44edb5169e022e3cbc23f7b6de35a81.zip |
websocket: Enhancements, message types, and tests (#2359)
* websocket: Should reset respawn parameter when processing next config entry
* websocket: add message types: lines, text, binary
* websocket: Add unit test
* Add websocket sample files
-rw-r--r-- | caddyhttp/websocket/sample/Caddyfile | 45 | ||||
-rw-r--r-- | caddyhttp/websocket/sample/index.html | 407 | ||||
-rw-r--r-- | caddyhttp/websocket/setup.go | 47 | ||||
-rw-r--r-- | caddyhttp/websocket/websocket.go | 233 | ||||
-rw-r--r-- | caddyhttp/websocket/websocket_test.go | 300 |
5 files changed, 999 insertions, 33 deletions
diff --git a/caddyhttp/websocket/sample/Caddyfile b/caddyhttp/websocket/sample/Caddyfile new file mode 100644 index 000000000..c874df83f --- /dev/null +++ b/caddyhttp/websocket/sample/Caddyfile @@ -0,0 +1,45 @@ +:2015 { + websocket /cat-lines/ cat { + type lines + } + + websocket /cat-lines-32/ cat { + type lines + bufsize 32 + } + + websocket /cat-text/ cat { + type text + } + + websocket /cat-text-32/ cat { + type text + bufsize 32 + } + + websocket /cat-binary/ cat { + type binary + } + + websocket /cat-binary-32/ cat { + type binary + bufsize 32 + } + + websocket /curl-lines/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\" 2>&1'" { + type lines + } + + websocket /curl-text/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\" 2>&1'" { + type text + } + + websocket /curl-binary/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\"'" { + type binary + } + + websocket /curl-binary-32/ "sh -c 'read s; read url; if [ \"$s\" = \"true\" ]; then out=\"\"; else out=\"-o /dev/null\"; fi; curl -L $out \"$url\"'" { + type binary + bufsize 32 + } +} diff --git a/caddyhttp/websocket/sample/index.html b/caddyhttp/websocket/sample/index.html new file mode 100644 index 000000000..7d5f2bb24 --- /dev/null +++ b/caddyhttp/websocket/sample/index.html @@ -0,0 +1,407 @@ +<!DOCTYPE html> +<html lang="en"> +<head> +<meta charset="UTF-8"> +<title>Caddy WebSocket Test</title> +<meta name="viewport" content="width=device-width, initial-scale=1.0"> + +<style type="text/css"> +body, button { + font-family: Arial, Helvetica, sans-serif; +} + +.aim { + background-color: lightyellow; +} + +.console { + border: 1px solid black; + background-color: #EEEEEE; + color: #111111; + font-family: 'Courier New', Courier, monospace; +} +</style> + +<script type="text/javascript"> +function getWsUrl(path) { + let url = new URL(location.href); + url.protocol = url.protocol.replace('http', 'ws'); + url.pathname = path; + return url.toString(); +} + +function replaceCrCr(str) { + for (let i = str.indexOf('\r\n'); i >= 0; i = str.indexOf('\r\n')) + str = str.substr(0, i) + str.substr(i + 1); + let firstIndex = str.indexOf('\r'); + let lastIndex = str.lastIndexOf('\r'); + if (firstIndex < 0) + return str; + if (firstIndex !== lastIndex) + return str.substr(0, firstIndex) + str.substr(lastIndex + 1); + let nIndex = str.lastIndexOf('\n'); + if (nIndex >= 0) + return str.substr(0, nIndex + 1) + str.substr(lastIndex + 1); + return str; +} + +function writeConsole(cs, str) { + let data = replaceCrCr(str); + let rIndex = data.indexOf('\r') + if (rIndex >= 0) { + let oldText = cs.innerText; + let nIndex = oldText.lastIndexOf('\n'); + if (nIndex >= 0) + cs.innerText = oldText.substr(0, nIndex + 1) + data.substr(rIndex + 1); + else + cs.innerText = data.substr(rIndex + 1); + } else + cs.innerText += data; +} + +function createTest(buttonId, consoleId, path, callback) { + document.getElementById(buttonId).addEventListener('click', () => { + let ws = new WebSocket(getWsUrl(path)); + callback(ws, document.getElementById(consoleId)); + }); +} + +let arraybufferToHexString = ab => Array.from(new Uint8Array(ab)).map(num => num.toString(16).padStart(2, '0')).join(' '); + +function hexStringToUint8Array(str) { + let arr = []; + let i = 0; + for (let i = 0; i < str.length; i += 2) { + for (; i < str.length; i++) { + let b = str.charAt(i); + if ((b === ' ') || (b === '\t') || (b === '\r') || (b === '\n')) + continue; + break; + } + let c = str.charAt(i); + let d = str.charAt(i + 1); + if (!(((c >= '0') && (c <= '9')) || ((c >= 'A') && (c <= 'F')) || ((c >= 'a') && (c <= 'f'))) || + !(((d >= '0') && (d <= '9')) || ((d >= 'A') && (d <= 'F')) || ((d >= 'a') && (d <= 'f')))) + return null; + arr.push(parseInt(c + d, 16)); + } + return new Uint8Array(arr); +} + +function hexInputToUint8ArrayForEach(id, callback) { + let segments = document.getElementById(id).value.split('\n'); + for (let segment of segments) { + let arr = hexStringToUint8Array(segment); + if (arr) + callback(arr); + } +} + +function stringInputForEach(id, callback) { + let segments = document.getElementById(id).value.split('\n'); + segments.forEach(callback); +} + +document.addEventListener('DOMContentLoaded', () => { + createTest('test1', 'console1', 'cat-lines/', (ws, cs) => { + let send = 0; + let recv = 0; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN]\n'); + stringInputForEach('input1', str => { + ws.send(str); + send++; + }); + }; + ws.onmessage = e => { + writeConsole(cs, e.data + '\n'); + recv++; + if (recv >= 3) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test2a', 'console2a', 'cat-lines-32/', (ws, cs) => { + let send = 0; + let recv = 0; + ws.onopen = () => { + if (document.getElementById('hex2a').checked) { + writeConsole(cs, '[WS OPEN] Note: hex -> binary conversion is done at client-side!\n'); + hexInputToUint8ArrayForEach('input2a', arr => { + ws.send(arr); + send++; + }); + } else { + writeConsole(cs, '[WS OPEN]\n'); + stringInputForEach('input2a', str => { + ws.send(str); + send++; + }); + } + }; + ws.onmessage = e => { + writeConsole(cs, e.data + '\n'); + recv++; + if (recv >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test2b', 'console2b', 'cat-lines-32/', (ws, cs) => { + let send = 0; + let recv = 0; + ws.onopen = () => { + if (document.getElementById('hex2b').checked) { + writeConsole(cs, '[WS OPEN] Note: hex -> binary conversion is done at client-side!\n'); + hexInputToUint8ArrayForEach('input2b', arr => { + ws.send(arr); + send++; + }); + } else { + writeConsole(cs, '[WS OPEN]\n'); + stringInputForEach('input2b', str => { + ws.send(str); + send++; + }); + } + }; + ws.onmessage = e => { + writeConsole(cs, e.data + '\n'); + recv++; + if (recv >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test3', 'console3', 'cat-text/', (ws, cs) => { + let send = 0; + let recv = ''; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN]\n'); + stringInputForEach('input3', str => { + ws.send(str + '\n'); + send++; + }); + }; + ws.onmessage = e => { + writeConsole(cs, e.data); + recv += e.data; + let count = -1; + let pos = -1; + do { + count++; + pos = recv.indexOf('\n', pos + 1); + } while (pos >= 0); + if (count >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test4', 'console4', 'cat-text-32/', (ws, cs) => { + let send = 0; + let recv = ''; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN]\n'); + stringInputForEach('input4', str => { + ws.send(str + '\n'); + send++; + }); + }; + ws.onmessage = e => { + writeConsole(cs, e.data); + recv += e.data; + let count = -1; + let pos = -1; + do { + count++; + pos = recv.indexOf('\n', pos + 1); + } while (pos >= 0); + if (count >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test5', 'console5', 'cat-binary/', (ws, cs) => { + let send = 0; + let recv = 0; + ws.binaryType = 'arraybuffer'; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN] Note: hex <> binary conversion is done at client-side!\n'); + hexInputToUint8ArrayForEach('input5', arr => { + ws.send(arr); + send += arr.byteLength; + }); + }; + ws.onmessage = e => { + writeConsole(cs, arraybufferToHexString(e.data) + '\n'); + recv += e.data.byteLength; + if (recv >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test6', 'console6', 'cat-binary-32/', (ws, cs) => { + let send = 0; + let recv = 0; + ws.binaryType = 'arraybuffer'; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN] Note: hex <> binary conversion is done at client-side!\n'); + hexInputToUint8ArrayForEach('input6', arr => { + ws.send(arr); + send += arr.byteLength; + }); + }; + ws.onmessage = e => { + writeConsole(cs, arraybufferToHexString(e.data) + '\n'); + recv += e.data.byteLength; + if (recv >= send) { + writeConsole(cs, '[WS CLOSING]\n'); + ws.close(); + } + }; + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test7', 'console7', 'curl-lines/', (ws, cs) => { + ws.onopen = () => { + writeConsole(cs, '[WS OPEN]\n'); + ws.send(document.getElementById('output7').checked.toString()); + ws.send(document.getElementById('url7').value); + }; + ws.onmessage = e => writeConsole(cs, e.data + '\n'); + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test9', 'console9', 'curl-text/', (ws, cs) => { + ws.onopen = () => { + writeConsole(cs, '[WS OPEN]\n'); + ws.send(document.getElementById('output9').checked.toString() + '\n'); + ws.send(document.getElementById('url9').value + '\n'); + }; + ws.onmessage = e => writeConsole(cs, e.data); + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test11', 'console11', 'curl-binary/', (ws, cs) => { + ws.binaryType = 'arraybuffer'; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN] Note: hex <> binary conversion is done at client-side!\n'); + ws.send(document.getElementById('output11').checked.toString() + '\n'); + ws.send(document.getElementById('url11').value + '\n'); + }; + ws.onmessage = e => writeConsole(cs, arraybufferToHexString(e.data) + '\n'); + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); + + createTest('test12', 'console12', 'curl-binary-32/', (ws, cs) => { + ws.binaryType = 'arraybuffer'; + ws.onopen = () => { + writeConsole(cs, '[WS OPEN] Note: hex <> binary conversion is done at client-side!\n'); + ws.send(document.getElementById('output12').checked.toString() + '\n'); + ws.send(document.getElementById('url12').value + '\n'); + }; + ws.onmessage = e => writeConsole(cs, arraybufferToHexString(e.data) + '\n'); + ws.onclose = e => writeConsole(cs, '[WS CLOSED] ' + e.reason + '\n'); + }); +}); + +</script> + +</head> + +<body> +<h3>Caddy WebSocket Test</h3> +<button id="test1">Test 1</button> cat, type=lines, +input=<textarea id="input1" cols="100" rows="3">first line +0123456789ABCDEF0123456789ABCD✔✘EFabcdef +ghijklmnopqrstuvwxyz</textarea> +<br><span class="aim">Aim: as in v0.11, default setting.</span> +<pre><div id="console1" class="console"></div></pre> + +<button id="test2a">Test 2A</button> cat, type=lines, bufsize=32, +input=<textarea id="input2a" cols="100" rows="3">first line +0123456789ABCDEF0123456789ABCDEFabcdef +ghijklmnopqrstuvwxyz</textarea> +<label for="hex2a">hex</label><input id="hex2a" type="checkbox"> +<br><span class="aim">Aim: Same input as Test 1, Line 2 is longer than bufsize, so server-side error occurred (bufio.Scanner: token too long).</span> +<pre><div id="console2a" class="console"></div></pre> + +<button id="test2b">Test 2B</button> cat, type=lines, bufsize=32, +input=<textarea id="input2b" cols="100" rows="3">30 31 32 33 34 35 36 37 38 39 41 42 43 44 45 46 30 31 32 33 34 35 36 37 38 39 41 E2 9C 94 46 +30 31 32 33 34 35 36 37 38 39 41 42 43 44 45 46 30 31 32 33 34 35 36 37 38 39 41 41 42 E2 9C +67 68 69 6A 6B 6C 6D 6E 6F 70 71 72 73 74 75 76 77 78 79 7A</textarea> +<label for="hex2b">hex</label><input id="hex2b" type="checkbox" checked> +<br><span class="aim">Aim: Line 2 ends with an incomplete UTF-8 sequence, so web browser drops connection automatically.</span> +<pre><div id="console2b" class="console"></div></pre> + +<button id="test3">Test 3</button> cat, type=text, +input=<textarea id="input3" cols="100" rows="3">first line +0123456789ABCDEF0123456789ABCD✔✘EFabcdef +ghijklmnopqrstuvwxyz</textarea> +<br><span class="aim">Aim: Same input as Test 1, but use type=text. Web browser receives Line 2 in a single websocket message.</span> +<pre><div id="console3" class="console"></div></pre> + +<button id="test4">Test 4</button> cat, type=text, bufsize=32, +input=<textarea id="input4" cols="100" rows="3">first line +0123456789ABCDEF0123456789ABCD✔✘EFabcdef +ghijklmnopqrstuvwxyz</textarea> +<br><span class="aim">Aim: Same input as Test 2A, but use type=text. Server-side buffer can handle the case that UTF-8 character cut into 2 halves.</span> +<pre><div id="console4" class="console"></div></pre> + +<button id="test5">Test 5</button> cat, type=binary, +input=<textarea id="input5" cols="100" rows="3">02 03 16 30 41 52 63 00 00 49 9D 00 2A 01 FA C6 +FF 37 64 77 D8 FF E1 FF CF 4C 5B 57 8A 00 0D 0A 20 41</textarea> +<label for="hex5">hex</label>✅ +<br><span class="aim">Aim: Test type=binary.</span> +<pre><div id="console5" class="console"></div></pre> + +<button id="test6">Test 6</button> cat, type=binary, bufsize=32, +input=<textarea id="input6" cols="100" rows="3">02 03 16 30 41 52 63 00 00 49 9D 00 2A 01 FA C6 +FF 37 64 77 D8 FF E1 FF CF 4C 5B 57 8A 00 0D 0A 20 41</textarea> +<label for="hex6">hex</label>✅ +<br><span class="aim">Aim: Same input as Test 5, Web browser receives 2 packets when exceeding server-side buffer size.</span> +<pre><div id="console6" class="console"></div></pre> + +<button id="test7">Test 7</button> curl, type=lines, +<label for="output7">stdout=</label><input id="output7" type="checkbox" checked>, +<label for="url7">url=</label><input id="url7" type="text" size="40" value="https://httpbin.org/delay/10"> +<br><span class="aim">Aim: as in v0.11, default setting. Cannot display CURL progress messages at real-time. Also, whitespaces are trimmed.</span> +<pre><div id="console7" class="console"></div></pre> + +<button id="test9">Test 9</button> curl, type=text, +<label for="output9">stdout=</label><input id="output9" type="checkbox" checked>, +<label for="url9">url=</label><input id="url9" type="text" size="40" value="https://httpbin.org/delay/10"> +<br><span class="aim">Aim: Display real-time CURL progress messages. Do not trim whitespaces.</span> +<pre><div id="console9" class="console"></div></pre> + +<button id="test11">Test 11</button> curl, type=binary, +<label for="output11">stdout=</label><input id="output11" type="checkbox" checked>, +<label for="url11">url=</label><input id="url11" type="text" size="40" value="https://httpbin.org/stream-bytes/8192"> +<br><span class="aim">Aim: Display CURL binary output.</span> +<pre><div id="console11" class="console"></div></pre> + +<button id="test12">Test 12</button> curl, type=binary, bufsize=32, +<label for="output12">stdout=</label><input id="output12" type="checkbox" checked>, +<label for="url12">url=</label><input id="url12" type="text" size="40" value="https://httpbin.org/stream-bytes/256"> +<br><span class="aim">Aim: Display CURL binary output. Web browser receives more packets when exceeding server-side buffer size.</span> +<pre><div id="console12" class="console"></div></pre> +</body> +</html> diff --git a/caddyhttp/websocket/setup.go b/caddyhttp/websocket/setup.go index bebd9b5b4..9a09efea0 100644 --- a/caddyhttp/websocket/setup.go +++ b/caddyhttp/websocket/setup.go @@ -15,6 +15,8 @@ package websocket import ( + "strconv" + "github.com/caddyserver/caddy" "github.com/caddyserver/caddy/caddyhttp/httpserver" ) @@ -45,21 +47,38 @@ func setup(c *caddy.Controller) error { func webSocketParse(c *caddy.Controller) ([]Config, error) { var websocks []Config - var respawn bool - optionalBlock := func() (hadBlock bool, err error) { - for c.NextBlock() { - hadBlock = true - if c.Val() == "respawn" { - respawn = true - } else { - return true, c.Err("Expected websocket configuration parameter in block") + for c.Next() { + var respawn bool + var wsType string + var bufSize int + + optionalBlock := func() (hadBlock bool, err error) { + for c.NextBlock() { + hadBlock = true + if c.Val() == "respawn" { + respawn = true + } else if c.Val() == "type" { + arg := c.RemainingArgs() + if len(arg) > 0 { + wsType = arg[0] + } + } else if c.Val() == "bufsize" { + arg := c.RemainingArgs() + if len(arg) > 0 { + var err error + bufSize, err = strconv.Atoi(arg[0]) + if (bufSize < 0) || (err != nil) { + bufSize = 0 + } + } + } else { + return true, c.Err("Expected websocket configuration parameter in block") + } } + return } - return - } - for c.Next() { var val, path, command string // Path or command; not sure which yet @@ -97,11 +116,17 @@ func webSocketParse(c *caddy.Controller) ([]Config, error) { return nil, err } + if wsType == "" { + wsType = "lines" + } + websocks = append(websocks, Config{ Path: path, Command: cmd, Arguments: args, Respawn: respawn, // TODO: This isn't used currently + Type: wsType, + BufSize: bufSize, }) } diff --git a/caddyhttp/websocket/websocket.go b/caddyhttp/websocket/websocket.go index 0b8021492..074623c6c 100644 --- a/caddyhttp/websocket/websocket.go +++ b/caddyhttp/websocket/websocket.go @@ -28,6 +28,7 @@ import ( "os/exec" "strings" "time" + "unicode/utf8" "github.com/caddyserver/caddy/caddyhttp/httpserver" "github.com/gorilla/websocket" @@ -76,6 +77,27 @@ type ( Command string Arguments []string Respawn bool // TODO: Not used, but parser supports it until we decide on it + Type string + BufSize int + } + + wsGetUpgrader interface { + GetUpgrader() wsUpgrader + } + + wsUpgrader interface { + Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error) + } + + wsConn interface { + Close() error + ReadMessage() (messageType int, p []byte, err error) + SetPongHandler(h func(appData string) error) + SetReadDeadline(t time.Time) error + SetReadLimit(limit int64) + SetWriteDeadline(t time.Time) error + WriteControl(messageType int, data []byte, deadline time.Time) error + WriteMessage(messageType int, data []byte) error } ) @@ -94,12 +116,19 @@ func (ws WebSocket) ServeHTTP(w http.ResponseWriter, r *http.Request) (int, erro // serveWS is used for setting and upgrading the HTTP connection to a websocket connection. // It also spawns the child process that is associated with matched HTTP path/url. func serveWS(w http.ResponseWriter, r *http.Request, config *Config) (int, error) { - upgrader := websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { return true }, + gu, castok := w.(wsGetUpgrader) + var u wsUpgrader + if gu != nil && castok { + u = gu.GetUpgrader() + } else { + u = &realWsUpgrader{o: &websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { return true }, + }} } - conn, err := upgrader.Upgrade(w, r, nil) + + conn, err := u.Upgrade(w, r, nil) if err != nil { // the connection has been "handled" -- WriteHeader was called with Upgrade, // so don't return an error status code; just return an error @@ -133,8 +162,8 @@ func serveWS(w http.ResponseWriter, r *http.Request, config *Config) (int, error } done := make(chan struct{}) - go pumpStdout(conn, stdout, done) - pumpStdin(conn, stdin) + go pumpStdout(conn, stdout, done, config) + pumpStdin(conn, stdin, config) _ = stdin.Close() // close stdin to end the process @@ -220,7 +249,7 @@ func buildEnv(cmdPath string, r *http.Request) (metavars []string, err error) { // pumpStdin handles reading data from the websocket connection and writing // it to stdin of the process. -func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) { +func pumpStdin(conn wsConn, stdin io.WriteCloser, config *Config) { // Setup our connection's websocket ping/pong handlers from our const values. defer conn.Close() conn.SetReadLimit(maxMessageSize) @@ -238,7 +267,10 @@ func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) { if err != nil { break } - message = append(message, '\n') + if config.Type == "lines" { + // no '\n' from client, so append '\n' to spawned process + message = append(message, '\n') + } if _, err := stdin.Write(message); err != nil { break } @@ -247,32 +279,102 @@ func pumpStdin(conn *websocket.Conn, stdin io.WriteCloser) { // pumpStdout handles reading data from stdout of the process and writing // it to websocket connection. -func pumpStdout(conn *websocket.Conn, stdout io.Reader, done chan struct{}) { +func pumpStdout(conn wsConn, stdout io.Reader, done chan struct{}, config *Config) { go pinger(conn, done) defer func() { _ = conn.Close() close(done) // make sure to close the pinger when we are done. }() - s := bufio.NewScanner(stdout) - for s.Scan() { - if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { - log.Println("[ERROR] failed to set write deadline: ", err) + if config.Type == "lines" { + // message must end with '\n' + s := bufio.NewScanner(stdout) + if config.BufSize > 0 { + s.Buffer(make([]byte, config.BufSize), config.BufSize) } - if err := conn.WriteMessage(websocket.TextMessage, bytes.TrimSpace(s.Bytes())); err != nil { - break + for s.Scan() { + if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + log.Println("[ERROR] failed to set write deadline: ", err) + } + if err := conn.WriteMessage(websocket.TextMessage, bytes.TrimSpace(s.Bytes())); err != nil { + break + } } - } - if s.Err() != nil { - err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, s.Err().Error()), time.Time{}) - if err != nil { - log.Println("[ERROR] WriteControl failed: ", err) + if s.Err() != nil { + err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, s.Err().Error()), time.Time{}) + if err != nil { + log.Println("[ERROR] WriteControl failed: ", err) + } + } + } else if config.Type == "text" { + // handle UTF-8 text message, newline is not required + r := bufio.NewReader(stdout) + var err1 error + var len int + remainBuf := make([]byte, utf8.UTFMax) + remainLen := 0 + bufSize := config.BufSize + if bufSize <= 0 { + bufSize = 2048 + } + for { + out := make([]byte, bufSize) + copy(out[:remainLen], remainBuf[:remainLen]) + len, err1 = r.Read(out[remainLen:]) + if err1 != nil { + break + } + len += remainLen + remainLen = findIncompleteRuneLength(out, len) + if remainLen > 0 { + remainBuf = out[len-remainLen : len] + } + if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + log.Println("[ERROR] failed to set write deadline: ", err) + } + if err := conn.WriteMessage(websocket.TextMessage, out[0:len-remainLen]); err != nil { + break + } + } + if err1 != nil { + err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, err1.Error()), time.Time{}) + if err != nil { + log.Println("[ERROR] WriteControl failed: ", err) + } + } + } else if config.Type == "binary" { + // treat message as binary data + r := bufio.NewReader(stdout) + var err1 error + var len int + bufSize := config.BufSize + if bufSize <= 0 { + bufSize = 2048 + } + for { + out := make([]byte, bufSize) + len, err1 = r.Read(out) + if err1 != nil { + break + } + if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { + log.Println("[ERROR] failed to set write deadline: ", err) + } + if err := conn.WriteMessage(websocket.BinaryMessage, out[0:len]); err != nil { + break + } + } + if err1 != nil { + err := conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseGoingAway, err1.Error()), time.Time{}) + if err != nil { + log.Println("[ERROR] WriteControl failed: ", err) + } } } } // pinger simulates the websocket to keep it alive with ping messages. -func pinger(conn *websocket.Conn, done chan struct{}) { +func pinger(conn wsConn, done chan struct{}) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() @@ -291,3 +393,90 @@ func pinger(conn *websocket.Conn, done chan struct{}) { } } } + +type realWsUpgrader struct { + o *websocket.Upgrader +} + +type realWsConn struct { + o *websocket.Conn +} + +func (u *realWsUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error) { + a, b := u.o.Upgrade(w, r, responseHeader) + return &realWsConn{o: a}, b +} + +func (c *realWsConn) Close() error { + return c.o.Close() +} +func (c *realWsConn) ReadMessage() (messageType int, p []byte, err error) { + return c.o.ReadMessage() +} +func (c *realWsConn) SetPongHandler(h func(appData string) error) { + c.o.SetPongHandler(h) +} +func (c *realWsConn) SetReadDeadline(t time.Time) error { + return c.o.SetReadDeadline(t) +} +func (c *realWsConn) SetReadLimit(limit int64) { + c.o.SetReadLimit(limit) +} +func (c *realWsConn) SetWriteDeadline(t time.Time) error { + return c.o.SetWriteDeadline(t) +} +func (c *realWsConn) WriteControl(messageType int, data []byte, deadline time.Time) error { + return c.o.WriteControl(messageType, data, deadline) +} +func (c *realWsConn) WriteMessage(messageType int, data []byte) error { + return c.o.WriteMessage(messageType, data) +} + +func findIncompleteRuneLength(p []byte, length int) int { + if length == 0 { + return 0 + } + if rune(p[length-1]) < utf8.RuneSelf { + // ASCII 7-bit always complete + return 0 + } + + lowest := length - utf8.UTFMax + if lowest < 0 { + lowest = 0 + } + for start := length - 1; start >= lowest; start-- { + if (p[start] >> 5) == 0x06 { + // 2-byte utf-8 start byte + if length-start >= 2 { + // enough bytes + return 0 + } + // 1 byte outstanding + return 1 + } + + if (p[start] >> 4) == 0x0E { + // 3-byte utf-8 start byte + if length-start >= 3 { + // enough bytes + return 0 + } + // some bytes outstanding + return length - start + } + + if (p[start] >> 3) == 0x1E { + // 4-byte utf-8 start byte + if length-start >= 4 { + // enough bytes + return 0 + } + // some bytes outstanding + return length - start + } + } + + // No utf-8 start byte + return 0 +} diff --git a/caddyhttp/websocket/websocket_test.go b/caddyhttp/websocket/websocket_test.go index c4bb08cf7..274b0a57b 100644 --- a/caddyhttp/websocket/websocket_test.go +++ b/caddyhttp/websocket/websocket_test.go @@ -15,8 +15,16 @@ package websocket import ( + "bytes" + "errors" + "fmt" "net/http" + "net/http/httptest" + "strings" "testing" + "time" + + "github.com/gorilla/websocket" ) func TestBuildEnv(t *testing.T) { @@ -34,3 +42,295 @@ func TestBuildEnv(t *testing.T) { t.Fatalf("Expected non-empty environment; got %#v", env) } } + +func TestWebSocketCatOneLineLines(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "lines"}}} + readCount := 0 + waitClose := make(chan bool) + inputStr := "123456" + expectedStr := inputStr + outputStr := "" + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.TextMessage, []byte(inputStr), nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputStr += string(buf) + waitClose <- true + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if outputStr != expectedStr { + t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr) + } +} + +func TestWebSocketCatTwoLinesLines(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "lines"}}} + readCount := 0 + waitClose := make(chan bool) + inputStr1 := "Hello World!" + inputStr2 := "This is golang." + expectedStr := inputStr1 + inputStr2 + outputStr := "" + outputCount := 0 + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.TextMessage, []byte(inputStr1), nil + } + if rc == 1 { + return websocket.TextMessage, []byte(inputStr2), nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputStr += string(buf) + outputCount++ + if outputCount >= 2 { + waitClose <- true + } + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if outputStr != expectedStr { + t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr) + } +} + +func TestWebSocketCatOneLineText(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}} + readCount := 0 + waitClose := make(chan bool) + inputStr := "123456\n" + expectedStr := inputStr + outputStr := "" + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.TextMessage, []byte(inputStr), nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputStr += string(buf) + if strings.Count(outputStr, "\n") >= 1 { + waitClose <- true + } + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if outputStr != expectedStr { + t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr) + } +} + +func TestWebSocketCatTwoLinesText(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}} + readCount := 0 + waitClose := make(chan bool) + inputStr1 := "Hello World!\n" + inputStr2 := "This is golang.\n" + expectedStr := inputStr1 + inputStr2 + outputStr := "" + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.TextMessage, []byte(inputStr1), nil + } + if rc == 1 { + return websocket.TextMessage, []byte(inputStr2), nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputStr += string(buf) + if strings.Count(outputStr, "\n") >= 2 { + waitClose <- true + } + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if outputStr != expectedStr { + t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr) + } +} + +func TestWebSocketCatLongLinesText(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "text"}}} + readCount := 0 + waitClose := make(chan bool) + inputStr1 := "Hello World!\n" + inputStr2 := "" + for i := 0; i < 100000; i++ { + inputStr2 += fmt.Sprintf("No newline %v.", i) + } + inputStr2 += "\n" + inputStr3 := "End of message.\n" + expectedStr := inputStr1 + inputStr2 + inputStr3 + outputStr := "" + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.TextMessage, []byte(inputStr1), nil + } + if rc == 1 { + return websocket.TextMessage, []byte(inputStr2), nil + } + if rc == 2 { + return websocket.TextMessage, []byte(inputStr3), nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputStr += string(buf) + if strings.Count(outputStr, "\n") >= 3 { + waitClose <- true + } + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if outputStr != expectedStr { + t.Errorf("Received Websocket response %v != %v", outputStr, expectedStr) + } +} + +func TestWebSocketCatBinary(t *testing.T) { + r := httptest.NewRequest("GET", "/cat", nil) + p := &WebSocket{Sockets: []Config{{Path: "/cat", Command: "cat", Type: "binary"}}} + readCount := 0 + waitClose := make(chan bool) + inputArr1 := []byte("Hello World!") + inputArr2 := []byte("End of message.") + expectedArr := make([]byte, 0) + expectedArr = append(expectedArr, inputArr1...) + expectedArr = append(expectedArr, inputArr2...) + outputArr := make([]byte, 0) + conn := &dummyWsConn{ + close: func() {}, + readMessage: func() (messageType int, buf []byte, err error) { + rc := readCount + readCount++ + if rc == 0 { + return websocket.BinaryMessage, inputArr1, nil + } + if rc == 1 { + return websocket.BinaryMessage, inputArr2, nil + } + <-waitClose + return websocket.CloseMessage, nil, errors.New("EOF") + }, + writeControl: func(messageType int, buf []byte) {}, + writeMessage: func(messageType int, buf []byte) { + outputArr = append(outputArr, buf...) + if len(outputArr) >= len(expectedArr) { + waitClose <- true + } + }, + } + w := &myResponseRecorder{o: httptest.NewRecorder(), u: &dummyWsUpgrader{c: conn}} + p.ServeHTTP(w, r) + if !bytes.Equal(outputArr, expectedArr) { + t.Errorf("Received Websocket response %v != %v", outputArr, expectedArr) + } +} + +type myResponseRecorder struct { + o *httptest.ResponseRecorder + u *dummyWsUpgrader +} + +func (t *myResponseRecorder) Header() http.Header { + return t.o.Header() +} +func (t *myResponseRecorder) Write(buf []byte) (int, error) { + return t.o.Write(buf) +} +func (t *myResponseRecorder) WriteHeader(code int) { + t.o.WriteHeader(code) +} +func (t *myResponseRecorder) Result() *http.Response { + return t.o.Result() +} +func (t *myResponseRecorder) GetUpgrader() wsUpgrader { + return t.u +} + +type dummyWsUpgrader struct { + c *dummyWsConn +} + +func (t *dummyWsUpgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (wsConn, error) { + return t.c, nil +} + +type dummyWsConn struct { + close func() + readMessage func() (messageType int, buf []byte, err error) + writeControl func(messageType int, buf []byte) + writeMessage func(messageType int, buf []byte) +} + +func (c *dummyWsConn) Close() error { + c.close() + return nil +} +func (c *dummyWsConn) ReadMessage() (messageType int, p []byte, err error) { + return c.readMessage() +} +func (c *dummyWsConn) SetPongHandler(h func(appData string) error) { +} +func (c *dummyWsConn) SetReadDeadline(t time.Time) error { + return nil +} +func (c *dummyWsConn) SetReadLimit(limit int64) { +} +func (c *dummyWsConn) SetWriteDeadline(t time.Time) error { + return nil +} +func (c *dummyWsConn) WriteControl(messageType int, data []byte, deadline time.Time) error { + c.writeControl(messageType, data) + return nil +} +func (c *dummyWsConn) WriteMessage(messageType int, data []byte) error { + c.writeMessage(messageType, data) + return nil +} |