From d81781420029a4618267dead36bb2adcd912b74d Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 30 Sep 2017 19:34:25 -0700 Subject: [PATCH] Optimized pipelining Performance gains for pipelining commands over the network. Using tile38-benchmark and the -P flag it's possible to see 2x-10x boost in requests per second. --- controller/aof.go | 18 +- controller/controller.go | 2 +- controller/live.go | 24 +- controller/server/anyreader.go | 334 ------- controller/server/reader.go | 305 +++++++ controller/server/server.go | 119 +-- vendor/github.com/tidwall/redcon/LICENSE | 20 + vendor/github.com/tidwall/redcon/README.md | 182 ++++ vendor/github.com/tidwall/redcon/append.go | 312 +++++++ .../github.com/tidwall/redcon/append_test.go | 94 ++ .../tidwall/redcon/example/clone.go | 87 ++ .../tidwall/redcon/example/tls/clone.go | 118 +++ vendor/github.com/tidwall/redcon/logo.png | Bin 0 -> 24104 bytes vendor/github.com/tidwall/redcon/redcon.go | 861 ++++++++++++++++++ .../github.com/tidwall/redcon/redcon_test.go | 556 +++++++++++ 15 files changed, 2621 insertions(+), 411 deletions(-) delete mode 100644 controller/server/anyreader.go create mode 100644 controller/server/reader.go create mode 100644 vendor/github.com/tidwall/redcon/LICENSE create mode 100644 vendor/github.com/tidwall/redcon/README.md create mode 100644 vendor/github.com/tidwall/redcon/append.go create mode 100644 vendor/github.com/tidwall/redcon/append_test.go create mode 100644 vendor/github.com/tidwall/redcon/example/clone.go create mode 100644 vendor/github.com/tidwall/redcon/example/tls/clone.go create mode 100644 vendor/github.com/tidwall/redcon/logo.png create mode 100644 vendor/github.com/tidwall/redcon/redcon.go create mode 100644 vendor/github.com/tidwall/redcon/redcon_test.go diff --git a/controller/aof.go b/controller/aof.go index 46eb2308..084a35d6 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -359,7 +359,7 @@ func (c *Controller) cmdAOF(msg *server.Message) (res string, err error) { return "", s } -func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWriter, msg *server.Message) error { +func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.PipelineReader, msg *server.Message) error { c.mu.Lock() c.aofconnM[conn] = true c.mu.Unlock() @@ -394,19 +394,21 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWrite cond.L.Unlock() }() for { - v, err := rd.ReadMessage() + vs, err := rd.ReadMessages() if err != nil { if err != io.EOF { log.Error(err) } return } - switch v.Command { - default: - log.Error("received a live command that was not QUIT") - return - case "quit", "": - return + for _, v := range vs { + switch v.Command { + default: + log.Error("received a live command that was not QUIT") + return + case "quit", "": + return + } } } }() diff --git a/controller/controller.go b/controller/controller.go index b9c5fc94..2913feb9 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -194,7 +194,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http c.stopWatchingMemory.set(true) c.stopWatchingAutoGC.set(true) }() - handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error { + handler := func(conn *server.Conn, msg *server.Message, rd *server.PipelineReader, w io.Writer, websocket bool) error { c.connsmu.RLock() if cc, ok := c.conns[conn]; ok { cc.last.set(time.Now()) diff --git a/controller/live.go b/controller/live.go index 3c2c088d..38c157e9 100644 --- a/controller/live.go +++ b/controller/live.go @@ -64,7 +64,7 @@ func writeMessage(conn net.Conn, message []byte, wrapRESP bool, connType server. return err } -func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWriter, msg *server.Message, websocket bool) error { +func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.PipelineReader, msg *server.Message, websocket bool) error { addr := conn.RemoteAddr().String() log.Info("live " + addr) defer func() { @@ -114,22 +114,24 @@ func (c *Controller) goLive(inerr error, conn net.Conn, rd *server.AnyReaderWrit conn.Close() }() for { - v, err := rd.ReadMessage() + vs, err := rd.ReadMessages() if err != nil { if err != io.EOF && !(websocket && err == io.ErrUnexpectedEOF) { log.Error(err) } return } - if v == nil { - continue - } - switch v.Command { - default: - log.Error("received a live command that was not QUIT") - return - case "quit", "": - return + for _, v := range vs { + if v == nil { + continue + } + switch v.Command { + default: + log.Error("received a live command that was not QUIT") + return + case "quit", "": + return + } } } }() diff --git a/controller/server/anyreader.go b/controller/server/anyreader.go deleted file mode 100644 index d73625e2..00000000 --- a/controller/server/anyreader.go +++ /dev/null @@ -1,334 +0,0 @@ -package server - -import ( - "bufio" - "crypto/sha1" - "encoding/base64" - "errors" - "io" - "net/url" - "strconv" - "strings" - - "github.com/tidwall/resp" -) - -const telnetIsJSON = false - -// Type is resp type -type Type int - -const ( - Null Type = iota - RESP - Telnet - Native - HTTP - WebSocket - JSON -) - -// String return a string for type. -func (t Type) String() string { - switch t { - default: - return "Unknown" - case Null: - return "Null" - case RESP: - return "RESP" - case Telnet: - return "Telnet" - case Native: - return "Native" - case HTTP: - return "HTTP" - case WebSocket: - return "WebSocket" - case JSON: - return "JSON" - } -} - -type errRESPProtocolError struct { - msg string -} - -func (err errRESPProtocolError) Error() string { - return "Protocol error: " + err.msg -} - -// Message is a resp message -type Message struct { - Command string - Values []resp.Value - ConnType Type - OutputType Type - Auth string -} - -// AnyReaderWriter is resp or native reader writer. -type AnyReaderWriter struct { - rd *bufio.Reader - wr io.Writer - ws bool -} - -// NewAnyReaderWriter returns an AnyReaderWriter object. -func NewAnyReaderWriter(rd io.Reader) *AnyReaderWriter { - ar := &AnyReaderWriter{} - if rd2, ok := rd.(*bufio.Reader); ok { - ar.rd = rd2 - } else { - ar.rd = bufio.NewReader(rd) - } - if wr, ok := rd.(io.Writer); ok { - ar.wr = wr - } - return ar -} - -func (ar *AnyReaderWriter) peekcrlfline() (string, error) { - // this is slow operation. - for i := 0; ; i++ { - bb, err := ar.rd.Peek(i) - if err != nil { - return "", err - } - if len(bb) > 2 && bb[len(bb)-2] == '\r' && bb[len(bb)-1] == '\n' { - return string(bb[:len(bb)-2]), nil - } - } -} - -func (ar *AnyReaderWriter) readcrlfline() (string, error) { - var line []byte - for { - bb, err := ar.rd.ReadBytes('\r') - if err != nil { - return "", err - } - if line == nil { - line = bb - } else { - line = append(line, bb...) - } - b, err := ar.rd.ReadByte() - if err != nil { - return "", err - } - if b == '\n' { - return string(line[:len(line)-1]), nil - } - line = append(line, b) - } -} - -// ReadMessage reads the next resp message. -func (ar *AnyReaderWriter) ReadMessage() (*Message, error) { - b, err := ar.rd.ReadByte() - if err != nil { - return nil, err - } - if err := ar.rd.UnreadByte(); err != nil { - return nil, err - } - switch b { - case 'G', 'P': - line, err := ar.peekcrlfline() - if err != nil { - return nil, err - } - if len(line) > 9 && line[len(line)-9:len(line)-3] == " HTTP/" { - return ar.readHTTPMessage() - } - case '$': - return ar.readNativeMessage() - } - // MultiBulk also reads telnet - return ar.readMultiBulkMessage() -} - -func readNativeMessageLine(line []byte) (*Message, error) { - values := make([]resp.Value, 0, 16) -reading: - for len(line) != 0 { - if line[0] == '{' { - // The native protocol cannot understand json boundaries so it assumes that - // a json element must be at the end of the line. - values = append(values, resp.StringValue(string(line))) - break - } - if line[0] == '"' && line[len(line)-1] == '"' { - if len(values) > 0 && - strings.ToLower(values[0].String()) == "set" && - strings.ToLower(values[len(values)-1].String()) == "string" { - // Setting a string value that is contained inside double quotes. - // This is only because of the boundary issues of the native protocol. - values = append(values, resp.StringValue(string(line[1:len(line)-1]))) - break - } - } - i := 0 - for ; i < len(line); i++ { - if line[i] == ' ' { - value := string(line[:i]) - if value != "" { - values = append(values, resp.StringValue(value)) - } - line = line[i+1:] - continue reading - } - } - values = append(values, resp.StringValue(string(line))) - break - } - return &Message{Command: commandValues(values), Values: values, ConnType: Native, OutputType: JSON}, nil -} - -func (ar *AnyReaderWriter) readNativeMessage() (*Message, error) { - b, err := ar.rd.ReadBytes(' ') - if err != nil { - return nil, err - } - if len(b) > 0 && b[0] != '$' { - return nil, errors.New("invalid message") - } - n, err := strconv.ParseUint(string(b[1:len(b)-1]), 10, 32) - if err != nil { - return nil, errors.New("invalid size") - } - if n > 0x1FFFFFFF { // 536,870,911 bytes - return nil, errors.New("message too big") - } - b = make([]byte, int(n)+2) - if _, err := io.ReadFull(ar.rd, b); err != nil { - return nil, err - } - if b[len(b)-2] != '\r' || b[len(b)-1] != '\n' { - return nil, errors.New("expecting crlf") - } - - return readNativeMessageLine(b[:len(b)-2]) -} - -func commandValues(values []resp.Value) string { - if len(values) == 0 { - return "" - } - return strings.ToLower(values[0].String()) -} - -func (ar *AnyReaderWriter) readMultiBulkMessage() (*Message, error) { - rd := resp.NewReader(ar.rd) - v, telnet, _, err := rd.ReadMultiBulk() - if err != nil { - return nil, err - } - values := v.Array() - if len(values) == 0 { - return nil, nil - } - if telnet && telnetIsJSON { - return &Message{Command: commandValues(values), Values: values, ConnType: Telnet, OutputType: JSON}, nil - } - return &Message{Command: commandValues(values), Values: values, ConnType: RESP, OutputType: RESP}, nil - -} - -func (ar *AnyReaderWriter) readHTTPMessage() (*Message, error) { - msg := &Message{ConnType: HTTP, OutputType: JSON} - line, err := ar.readcrlfline() - if err != nil { - return nil, err - } - parts := strings.Split(line, " ") - if len(parts) != 3 { - return nil, errors.New("invalid HTTP request") - } - method := parts[0] - path := parts[1] - if len(path) == 0 || path[0] != '/' { - return nil, errors.New("invalid HTTP request") - } - path, err = url.QueryUnescape(path[1:]) - if err != nil { - return nil, errors.New("invalid HTTP request") - } - if method != "GET" && method != "POST" { - return nil, errors.New("invalid HTTP method") - } - contentLength := 0 - websocket := false - websocketVersion := 0 - websocketKey := "" - for { - header, err := ar.readcrlfline() - if err != nil { - return nil, err - } - if header == "" { - break // end of headers - } - if header[0] == 'a' || header[0] == 'A' { - if strings.HasPrefix(strings.ToLower(header), "authorization:") { - msg.Auth = strings.TrimSpace(header[len("authorization:"):]) - } - } else if header[0] == 'u' || header[0] == 'U' { - if strings.HasPrefix(strings.ToLower(header), "upgrade:") && strings.ToLower(strings.TrimSpace(header[len("upgrade:"):])) == "websocket" { - websocket = true - } - } else if header[0] == 's' || header[0] == 'S' { - if strings.HasPrefix(strings.ToLower(header), "sec-websocket-version:") { - var n uint64 - n, err = strconv.ParseUint(strings.TrimSpace(header[len("sec-websocket-version:"):]), 10, 64) - if err != nil { - return nil, err - } - websocketVersion = int(n) - } else if strings.HasPrefix(strings.ToLower(header), "sec-websocket-key:") { - websocketKey = strings.TrimSpace(header[len("sec-websocket-key:"):]) - } - } else if header[0] == 'c' || header[0] == 'C' { - if strings.HasPrefix(strings.ToLower(header), "content-length:") { - var n uint64 - n, err = strconv.ParseUint(strings.TrimSpace(header[len("content-length:"):]), 10, 64) - if err != nil { - return nil, err - } - contentLength = int(n) - } - } - } - if websocket && websocketVersion >= 13 && websocketKey != "" { - msg.ConnType = WebSocket - if ar.wr == nil { - return nil, errors.New("connection is nil") - } - sum := sha1.Sum([]byte(websocketKey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) - accept := base64.StdEncoding.EncodeToString(sum[:]) - wshead := "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: " + accept + "\r\n\r\n" - if _, err = ar.wr.Write([]byte(wshead)); err != nil { - return nil, err - } - ar.ws = true - } else if contentLength > 0 { - msg.ConnType = HTTP - buf := make([]byte, contentLength) - if _, err = io.ReadFull(ar.rd, buf); err != nil { - return nil, err - } - path += string(buf) - } - if path == "" { - return msg, nil - } - nmsg, err := readNativeMessageLine([]byte(path)) - if err != nil { - return nil, err - } - msg.OutputType = JSON - msg.Values = nmsg.Values - msg.Command = commandValues(nmsg.Values) - return msg, nil -} diff --git a/controller/server/reader.go b/controller/server/reader.go new file mode 100644 index 00000000..aac3f998 --- /dev/null +++ b/controller/server/reader.go @@ -0,0 +1,305 @@ +package server + +import ( + "crypto/sha1" + "encoding/base64" + "errors" + "io" + "net/url" + "strconv" + "strings" + + "github.com/tidwall/redcon" + "github.com/tidwall/resp" +) + +var errInvalidHTTP = errors.New("invalid HTTP request") + +// Type is resp type +type Type int + +const ( + Null Type = iota + RESP + Telnet + Native + HTTP + WebSocket + JSON +) + +// Message is a resp message +type Message struct { + Command string + Values []resp.Value + ConnType Type + OutputType Type + Auth string +} + +// PipelineReader ... +type PipelineReader struct { + rd io.Reader + wr io.Writer + pbuf [0xFFFF]byte + rbuf []byte +} + +const kindHTTP redcon.Kind = 9999 + +// NewPipelineReader ... +func NewPipelineReader(rd io.ReadWriter) *PipelineReader { + return &PipelineReader{rd: rd, wr: rd} +} + +func readcrlfline(packet []byte) (line string, leftover []byte, ok bool) { + for i := 1; i < len(packet); i++ { + if packet[i] == '\n' && packet[i-1] == '\r' { + return string(packet[:i-1]), packet[i+1:], true + } + } + return "", packet, false +} + +func readNextHTTPCommand(packet []byte, argsIn [][]byte, msg *Message, wr io.Writer) ( + complete bool, args [][]byte, kind redcon.Kind, leftover []byte, err error, +) { + args = argsIn[:0] + msg.ConnType = HTTP + msg.OutputType = JSON + opacket := packet + + ready, err := func() (bool, error) { + var line string + var ok bool + + // read header + var headers []string + for { + line, packet, ok = readcrlfline(packet) + if !ok { + return false, nil + } + if line == "" { + break + } + headers = append(headers, line) + } + parts := strings.Split(headers[0], " ") + if len(parts) != 3 { + return false, errInvalidHTTP + } + method := parts[0] + path := parts[1] + if len(path) == 0 || path[0] != '/' { + return false, errInvalidHTTP + } + path, err = url.QueryUnescape(path[1:]) + if err != nil { + return false, errInvalidHTTP + } + if method != "GET" && method != "POST" { + return false, errInvalidHTTP + } + contentLength := 0 + websocket := false + websocketVersion := 0 + websocketKey := "" + for _, header := range headers[1:] { + if header[0] == 'a' || header[0] == 'A' { + if strings.HasPrefix(strings.ToLower(header), "authorization:") { + msg.Auth = strings.TrimSpace(header[len("authorization:"):]) + } + } else if header[0] == 'u' || header[0] == 'U' { + if strings.HasPrefix(strings.ToLower(header), "upgrade:") && strings.ToLower(strings.TrimSpace(header[len("upgrade:"):])) == "websocket" { + websocket = true + } + } else if header[0] == 's' || header[0] == 'S' { + if strings.HasPrefix(strings.ToLower(header), "sec-websocket-version:") { + var n uint64 + n, err = strconv.ParseUint(strings.TrimSpace(header[len("sec-websocket-version:"):]), 10, 64) + if err != nil { + return false, err + } + websocketVersion = int(n) + } else if strings.HasPrefix(strings.ToLower(header), "sec-websocket-key:") { + websocketKey = strings.TrimSpace(header[len("sec-websocket-key:"):]) + } + } else if header[0] == 'c' || header[0] == 'C' { + if strings.HasPrefix(strings.ToLower(header), "content-length:") { + var n uint64 + n, err = strconv.ParseUint(strings.TrimSpace(header[len("content-length:"):]), 10, 64) + if err != nil { + return false, err + } + contentLength = int(n) + } + } + } + if websocket && websocketVersion >= 13 && websocketKey != "" { + msg.ConnType = WebSocket + if wr == nil { + return false, errors.New("connection is nil") + } + sum := sha1.Sum([]byte(websocketKey + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) + accept := base64.StdEncoding.EncodeToString(sum[:]) + wshead := "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: " + accept + "\r\n\r\n" + if _, err = wr.Write([]byte(wshead)); err != nil { + println(4) + return false, err + } + } else if contentLength > 0 { + msg.ConnType = HTTP + if len(packet) < contentLength { + return false, nil + } + path += string(packet[:contentLength]) + packet = packet[contentLength:] + } + if path == "" { + return true, nil + } + nmsg, err := readNativeMessageLine([]byte(path)) + if err != nil { + return false, err + } + + msg.OutputType = JSON + msg.Values = nmsg.Values + msg.Command = commandValues(nmsg.Values) + return true, nil + }() + if err != nil || !ready { + return false, args[:0], kindHTTP, opacket, err + } + return true, args[:0], kindHTTP, packet, nil +} +func readNextCommand(packet []byte, argsIn [][]byte, msg *Message, wr io.Writer) ( + complete bool, args [][]byte, kind redcon.Kind, leftover []byte, err error, +) { + if packet[0] == 'G' || packet[0] == 'P' { + // could be an HTTP request + var line []byte + for i := 1; i < len(packet); i++ { + if packet[i] == '\n' { + if packet[i-1] == '\r' { + line = packet[:i+1] + break + } + } + } + if len(line) == 0 { + return false, argsIn[:0], redcon.Redis, packet, nil + } + if len(line) > 11 && string(line[len(line)-11:len(line)-5]) == " HTTP/" { + return readNextHTTPCommand(packet, argsIn, msg, wr) + } + } + return redcon.ReadNextCommand(packet, args) +} + +// ReadMessages ... +func (rd *PipelineReader) ReadMessages() ([]*Message, error) { + var msgs []*Message +moreData: + n, err := rd.rd.Read(rd.pbuf[:]) + if err != nil { + return nil, err + } + if n == 0 { + // need more data + goto moreData + } + var packet []byte + if len(rd.rbuf) == 0 { + packet = rd.pbuf[:n] + } else { + rd.rbuf = append(rd.rbuf, rd.pbuf[:n]...) + packet = rd.rbuf + } + + for len(packet) > 0 { + msg := &Message{} + complete, args, kind, leftover, err := readNextCommand(packet, nil, msg, rd.wr) + if err != nil { + break + } + if !complete { + break + } + if kind != kindHTTP { + msg.Command = strings.ToLower(string(args[0])) + for i := 0; i < len(args); i++ { + msg.Values = append(msg.Values, resp.BytesValue(args[i])) + } + switch kind { + case redcon.Redis: + msg.ConnType = RESP + msg.OutputType = RESP + case redcon.Tile38: + msg.ConnType = Native + msg.OutputType = JSON + case redcon.Telnet: + msg.ConnType = RESP + msg.OutputType = RESP + } + } else if len(msg.Values) == 0 { + return nil, errInvalidHTTP + } + msgs = append(msgs, msg) + packet = leftover + } + if len(packet) > 0 { + rd.rbuf = append(rd.rbuf[:0], packet...) + } else if rd.rbuf != nil { + rd.rbuf = rd.rbuf[:0] + } + if err != nil && len(msgs) == 0 { + return nil, err + } + return msgs, nil +} + +func readNativeMessageLine(line []byte) (*Message, error) { + values := make([]resp.Value, 0, 16) +reading: + for len(line) != 0 { + if line[0] == '{' { + // The native protocol cannot understand json boundaries so it assumes that + // a json element must be at the end of the line. + values = append(values, resp.StringValue(string(line))) + break + } + if line[0] == '"' && line[len(line)-1] == '"' { + if len(values) > 0 && + strings.ToLower(values[0].String()) == "set" && + strings.ToLower(values[len(values)-1].String()) == "string" { + // Setting a string value that is contained inside double quotes. + // This is only because of the boundary issues of the native protocol. + values = append(values, resp.StringValue(string(line[1:len(line)-1]))) + break + } + } + i := 0 + for ; i < len(line); i++ { + if line[i] == ' ' { + value := string(line[:i]) + if value != "" { + values = append(values, resp.StringValue(value)) + } + line = line[i+1:] + continue reading + } + } + values = append(values, resp.StringValue(string(line))) + break + } + return &Message{Command: commandValues(values), Values: values, ConnType: Native, OutputType: JSON}, nil +} + +func commandValues(values []resp.Value) string { + if len(values) == 0 { + return "" + } + return strings.ToLower(values[0].String()) +} diff --git a/controller/server/server.go b/controller/server/server.go index fff1bf6b..4500b45e 100644 --- a/controller/server/server.go +++ b/controller/server/server.go @@ -1,6 +1,7 @@ package server import ( + "bytes" "encoding/binary" "errors" "fmt" @@ -45,6 +46,7 @@ type Conn struct { Authenticated bool } +// SetKeepAlive sets the connection keepalive func (conn Conn) SetKeepAlive(period time.Duration) error { if tcp, ok := conn.Conn.(*net.TCPConn); ok { if err := tcp.SetKeepAlive(true); err != nil { @@ -61,7 +63,7 @@ var errCloseHTTP = errors.New("close http") func ListenAndServe( host string, port int, protected func() bool, - handler func(conn *Conn, msg *Message, rd *AnyReaderWriter, w io.Writer, websocket bool) error, + handler func(conn *Conn, msg *Message, rd *PipelineReader, w io.Writer, websocket bool) error, opened func(conn *Conn), closed func(conn *Conn), lnp *net.Listener, @@ -85,87 +87,90 @@ func ListenAndServe( } } -// func writeCommandErr(proto client.Proto, conn *Conn, err error) error { -// if proto == client.HTTP || proto == client.WebSocket { -// conn.Write([]byte(`HTTP/1.1 500 ` + err.Error() + "\r\nConnection: close\r\n\r\n")) -// } -// return err -// } - func handleConn( conn *Conn, protected func() bool, - handler func(conn *Conn, msg *Message, rd *AnyReaderWriter, w io.Writer, websocket bool) error, + handler func(conn *Conn, msg *Message, rd *PipelineReader, w io.Writer, websocket bool) error, opened func(conn *Conn), closed func(conn *Conn), http bool, ) { - opened(conn) - defer closed(conn) addr := conn.RemoteAddr().String() + opened(conn) if core.ShowDebugMessages { log.Debugf("opened connection: %s", addr) - defer func() { - log.Debugf("closed connection: %s", addr) - }() } + defer func() { + conn.Close() + closed(conn) + if core.ShowDebugMessages { + log.Debugf("closed connection: %s", addr) + } + }() if !strings.HasPrefix(addr, "127.0.0.1:") && !strings.HasPrefix(addr, "[::1]:") { if protected() { // This is a protected server. Only loopback is allowed. conn.Write(deniedMessage) - conn.Close() return } } - defer conn.Close() + + wr := &bytes.Buffer{} outputType := Null - rd := NewAnyReaderWriter(conn) + rd := NewPipelineReader(conn) for { - msg, err := rd.ReadMessage() - - // Just closing connection if we have deprecated HTTP or WS connection, - // And --http-transport = false - if !http && (msg.ConnType == WebSocket || msg.ConnType == HTTP) { - conn.Close() - return - } - - if err != nil { - if err == io.EOF { - return - } - if err == errCloseHTTP || - strings.Contains(err.Error(), "use of closed network connection") { - return - } - log.Error(err) - return - } - if msg != nil && msg.Command != "" { - if outputType != Null { - msg.OutputType = outputType - } - if msg.Command == "quit" { - if msg.OutputType == RESP { - io.WriteString(conn, "+OK\r\n") - } - return - } - err := handler(conn, msg, rd, conn, msg.ConnType == WebSocket) + wr.Reset() + ok := func() bool { + msgs, err := rd.ReadMessages() if err != nil { + if err == io.EOF { + return false + } + if err == errCloseHTTP || + strings.Contains(err.Error(), "use of closed network connection") { + return false + } log.Error(err) - return + return false } - outputType = msg.OutputType - } else { - conn.Write([]byte("HTTP/1.1 500 Bad Request\r\nConnection: close\r\n\r\n")) - return + for _, msg := range msgs { + // Just closing connection if we have deprecated HTTP or WS connection, + // And --http-transport = false + if !http && (msg.ConnType == WebSocket || msg.ConnType == HTTP) { + return false + } + if msg != nil && msg.Command != "" { + if outputType != Null { + msg.OutputType = outputType + } + if msg.Command == "quit" { + if msg.OutputType == RESP { + io.WriteString(wr, "+OK\r\n") + } + return false + } + err := handler(conn, msg, rd, wr, msg.ConnType == WebSocket) + if err != nil { + log.Error(err) + return false + } + outputType = msg.OutputType + } else { + wr.Write([]byte("HTTP/1.1 500 Bad Request\r\nConnection: close\r\n\r\n")) + return false + } + if msg.ConnType == HTTP || msg.ConnType == WebSocket { + return false + } + } + return true + }() + conn.Write(wr.Bytes()) + if !ok { + break } - if msg.ConnType == HTTP || msg.ConnType == WebSocket { - return - } - } + // all done } // WriteWebSocketMessage write a websocket message to an io.Writer. diff --git a/vendor/github.com/tidwall/redcon/LICENSE b/vendor/github.com/tidwall/redcon/LICENSE new file mode 100644 index 00000000..58f5819a --- /dev/null +++ b/vendor/github.com/tidwall/redcon/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/tidwall/redcon/README.md b/vendor/github.com/tidwall/redcon/README.md new file mode 100644 index 00000000..fc5d8b3e --- /dev/null +++ b/vendor/github.com/tidwall/redcon/README.md @@ -0,0 +1,182 @@ +

+REDCON +
+Build Status +GoDoc +

+ +

Fast Redis compatible server framework for Go

+ +Redcon is a custom Redis server framework for Go that is fast and simple to use. The reason for this library it to give an efficient server front-end for the [BuntDB](https://github.com/tidwall/buntdb) and [Tile38](https://github.com/tidwall/tile38) projects. + +Features +-------- +- Create a [Fast](#benchmarks) custom Redis compatible server in Go +- Simple interface. One function `ListenAndServe` and two types `Conn` & `Command` +- Support for pipelining and telnet commands +- Works with Redis clients such as [redigo](https://github.com/garyburd/redigo), [redis-py](https://github.com/andymccurdy/redis-py), [node_redis](https://github.com/NodeRedis/node_redis), and [jedis](https://github.com/xetorthio/jedis) +- [TLS Support](#tls-example) + +Installing +---------- + +``` +go get -u github.com/tidwall/redcon +``` + +Example +------- + +Here's a full example of a Redis clone that accepts: + +- SET key value +- GET key +- DEL key +- PING +- QUIT + +You can run this example from a terminal: + +```sh +go run example/clone.go +``` + +```go +package main + +import ( + "log" + "strings" + "sync" + + "github.com/tidwall/redcon" +) + +var addr = ":6380" + +func main() { + var mu sync.RWMutex + var items = make(map[string][]byte) + go log.Printf("started server at %s", addr) + err := redcon.ListenAndServe(addr, + func(conn redcon.Conn, cmd redcon.Command) { + switch strings.ToLower(string(cmd.Args[0])) { + default: + conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "set": + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + items[string(cmd.Args[1])] = cmd.Args[2] + mu.Unlock() + conn.WriteString("OK") + case "get": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.RLock() + val, ok := items[string(cmd.Args[1])] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteBulk(val) + } + case "del": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + _, ok := items[string(cmd.Args[1])] + delete(items, string(cmd.Args[1])) + mu.Unlock() + if !ok { + conn.WriteInt(0) + } else { + conn.WriteInt(1) + } + } + }, + func(conn redcon.Conn) bool { + // use this function to accept or deny the connection. + // log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn redcon.Conn, err error) { + // this is called when the connection has been closed + // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) + }, + ) + if err != nil { + log.Fatal(err) + } +} +``` + +TLS Example +----------- + +Redcon has full TLS support through the `ListenAndServeTLS` function. + +The [same example](example/tls/clone.go) is also provided for serving Redcon over TLS. + +```sh +go run example/tls/clone.go +``` + +Benchmarks +---------- + +**Redis**: Single-threaded, no disk persistence. + +``` +$ redis-server --port 6379 --appendonly no +``` +``` +redis-benchmark -p 6379 -t set,get -n 10000000 -q -P 512 -c 512 +SET: 941265.12 requests per second +GET: 1189909.50 requests per second +``` + +**Redcon**: Single-threaded, no disk persistence. + +``` +$ GOMAXPROCS=1 go run example/clone.go +``` +``` +redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 +SET: 2018570.88 requests per second +GET: 2403846.25 requests per second +``` + +**Redcon**: Multi-threaded, no disk persistence. + +``` +$ GOMAXPROCS=0 go run example/clone.go +``` +``` +$ redis-benchmark -p 6380 -t set,get -n 10000000 -q -P 512 -c 512 +SET: 1944390.38 requests per second +GET: 3993610.25 requests per second +``` + +*Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7* + +Contact +------- +Josh Baker [@tidwall](http://twitter.com/tidwall) + +License +------- +Redcon source code is available under the MIT [License](/LICENSE). diff --git a/vendor/github.com/tidwall/redcon/append.go b/vendor/github.com/tidwall/redcon/append.go new file mode 100644 index 00000000..2c4ea717 --- /dev/null +++ b/vendor/github.com/tidwall/redcon/append.go @@ -0,0 +1,312 @@ +package redcon + +import ( + "strconv" + "strings" +) + +// Kind is the kind of command +type Kind int + +const ( + // Redis is returned for Redis protocol commands + Redis Kind = iota + // Tile38 is returnd for Tile38 native protocol commands + Tile38 + // Telnet is returnd for plain telnet commands + Telnet +) + +var errInvalidMessage = &errProtocol{"invalid message"} + +// ReadNextCommand reads the next command from the provided packet. It's +// possible that the packet contains multiple commands, or zero commands +// when the packet is incomplete. +// 'argsbuf' is an optional reusable buffer and it can be nil. +// 'complete' indicates that a command was read. false means no more commands. +// 'args' are the output arguments for the command. +// 'kind' is the type of command that was read. +// 'leftover' is any remaining unused bytes which belong to the next command. +// 'err' is returned when a protocol error was encountered. +func ReadNextCommand(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + args = argsbuf[:0] + if len(packet) > 0 { + if packet[0] != '*' { + if packet[0] == '$' { + return readTile38Command(packet, args) + } + return readTelnetCommand(packet, args) + } + // standard redis command + for s, i := 1, 1; i < len(packet); i++ { + if packet[i] == '\n' { + if packet[i-1] != '\r' { + return false, args[:0], Redis, packet, errInvalidMultiBulkLength + } + count, ok := parseInt(packet[s : i-1]) + if !ok || count < 0 { + return false, args[:0], Redis, packet, errInvalidMultiBulkLength + } + i++ + if count == 0 { + return true, args[:0], Redis, packet[i:], nil + } + nextArg: + for j := 0; j < count; j++ { + if i == len(packet) { + break + } + if packet[i] != '$' { + return false, args[:0], Redis, packet, + &errProtocol{"expected '$', got '" + + string(packet[i]) + "'"} + } + for s := i + 1; i < len(packet); i++ { + if packet[i] == '\n' { + if packet[i-1] != '\r' { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + n, ok := parseInt(packet[s : i-1]) + if !ok || count <= 0 { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + i++ + if len(packet)-i >= n+2 { + if packet[i+n] != '\r' || packet[i+n+1] != '\n' { + return false, args[:0], Redis, packet, errInvalidBulkLength + } + args = append(args, packet[i:i+n]) + i += n + 2 + if j == count-1 { + // done reading + return true, args, Redis, packet[i:], nil + } + continue nextArg + } + break + } + } + break + } + break + } + } + } + return false, args[:0], Redis, packet, nil +} + +func readTile38Command(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + for i := 1; i < len(packet); i++ { + if packet[i] == ' ' { + n, ok := parseInt(packet[1:i]) + if !ok || n < 0 { + return false, args[:0], Tile38, packet, errInvalidMessage + } + i++ + if len(packet) >= i+n+2 { + if packet[i+n] != '\r' || packet[i+n+1] != '\n' { + return false, args[:0], Tile38, packet, errInvalidMessage + } + line := packet[i : i+n] + reading: + for len(line) != 0 { + if line[0] == '{' { + // The native protocol cannot understand json boundaries so it assumes that + // a json element must be at the end of the line. + args = append(args, line) + break + } + if line[0] == '"' && line[len(line)-1] == '"' { + if len(args) > 0 && + strings.ToLower(string(args[0])) == "set" && + strings.ToLower(string(args[len(args)-1])) == "string" { + // Setting a string value that is contained inside double quotes. + // This is only because of the boundary issues of the native protocol. + args = append(args, line[1:len(line)-1]) + break + } + } + i := 0 + for ; i < len(line); i++ { + if line[i] == ' ' { + value := line[:i] + if len(value) > 0 { + args = append(args, value) + } + line = line[i+1:] + continue reading + } + } + args = append(args, line) + break + } + return true, args, Tile38, packet[i+n+2:], nil + } + break + } + } + return false, args[:0], Tile38, packet, nil +} +func readTelnetCommand(packet []byte, argsbuf [][]byte) ( + complete bool, args [][]byte, kind Kind, leftover []byte, err error, +) { + // just a plain text command + for i := 0; i < len(packet); i++ { + if packet[i] == '\n' { + var line []byte + if i > 0 && packet[i-1] == '\r' { + line = packet[:i-1] + } else { + line = packet[:i] + } + var quote bool + var quotech byte + var escape bool + outer: + for { + nline := make([]byte, 0, len(line)) + for i := 0; i < len(line); i++ { + c := line[i] + if !quote { + if c == ' ' { + if len(nline) > 0 { + args = append(args, nline) + } + line = line[i+1:] + continue outer + } + if c == '"' || c == '\'' { + if i != 0 { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + quotech = c + quote = true + line = line[i+1:] + continue outer + } + } else { + if escape { + escape = false + switch c { + case 'n': + c = '\n' + case 'r': + c = '\r' + case 't': + c = '\t' + } + } else if c == quotech { + quote = false + quotech = 0 + args = append(args, nline) + line = line[i+1:] + if len(line) > 0 && line[0] != ' ' { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + continue outer + } else if c == '\\' { + escape = true + continue + } + } + nline = append(nline, c) + } + if quote { + return false, args[:0], Telnet, packet, errUnbalancedQuotes + } + if len(line) > 0 { + args = append(args, line) + } + break + } + return true, args, Telnet, packet[i+1:], nil + } + } + return false, args[:0], Telnet, packet, nil +} + +// AppendUint appends a Redis protocol uint64 to the input bytes. +func AppendUint(b []byte, n uint64) []byte { + b = append(b, ':') + b = strconv.AppendUint(b, n, 10) + return append(b, '\r', '\n') +} + +// AppendInt appends a Redis protocol int64 to the input bytes. +func AppendInt(b []byte, n int64) []byte { + b = append(b, ':') + b = strconv.AppendInt(b, n, 10) + return append(b, '\r', '\n') +} + +// AppendArray appends a Redis protocol array to the input bytes. +func AppendArray(b []byte, n int) []byte { + b = append(b, '*') + b = strconv.AppendInt(b, int64(n), 10) + return append(b, '\r', '\n') +} + +// AppendBulk appends a Redis protocol bulk byte slice to the input bytes. +func AppendBulk(b []byte, bulk []byte) []byte { + b = append(b, '$') + b = strconv.AppendInt(b, int64(len(bulk)), 10) + b = append(b, '\r', '\n') + b = append(b, bulk...) + return append(b, '\r', '\n') +} + +// AppendBulkString appends a Redis protocol bulk string to the input bytes. +func AppendBulkString(b []byte, bulk string) []byte { + b = append(b, '$') + b = strconv.AppendInt(b, int64(len(bulk)), 10) + b = append(b, '\r', '\n') + b = append(b, bulk...) + return append(b, '\r', '\n') +} + +// AppendString appends a Redis protocol string to the input bytes. +func AppendString(b []byte, s string) []byte { + b = append(b, '+') + b = append(b, stripNewlines(s)...) + return append(b, '\r', '\n') +} + +// AppendError appends a Redis protocol error to the input bytes. +func AppendError(b []byte, s string) []byte { + b = append(b, '-') + b = append(b, stripNewlines(s)...) + return append(b, '\r', '\n') +} + +// AppendOK appends a Redis protocol OK to the input bytes. +func AppendOK(b []byte) []byte { + return append(b, '+', 'O', 'K', '\r', '\n') +} +func stripNewlines(s string) string { + for i := 0; i < len(s); i++ { + if s[i] == '\r' || s[i] == '\n' { + s = strings.Replace(s, "\r", " ", -1) + s = strings.Replace(s, "\n", " ", -1) + break + } + } + return s +} + +// AppendTile38 appends a Tile38 message to the input bytes. +func AppendTile38(b []byte, data []byte) []byte { + b = append(b, '$') + b = strconv.AppendInt(b, int64(len(data)), 10) + b = append(b, ' ') + b = append(b, data...) + return append(b, '\r', '\n') +} + +// AppendNull appends a Redis protocol null to the input bytes. +func AppendNull(b []byte) []byte { + return append(b, '$', '-', '1', '\r', '\n') +} diff --git a/vendor/github.com/tidwall/redcon/append_test.go b/vendor/github.com/tidwall/redcon/append_test.go new file mode 100644 index 00000000..238b1e48 --- /dev/null +++ b/vendor/github.com/tidwall/redcon/append_test.go @@ -0,0 +1,94 @@ +package redcon + +import ( + "bytes" + "math/rand" + "testing" + "time" +) + +func TestNextCommand(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + start := time.Now() + for time.Since(start) < time.Second { + // keep copy of pipeline args for final compare + var plargs [][][]byte + + // create a pipeline of random number of commands with random data. + N := rand.Int() % 10000 + var data []byte + for i := 0; i < N; i++ { + nargs := rand.Int() % 10 + data = AppendArray(data, nargs) + var args [][]byte + for j := 0; j < nargs; j++ { + arg := make([]byte, rand.Int()%100) + if _, err := rand.Read(arg); err != nil { + t.Fatal(err) + } + data = AppendBulk(data, arg) + args = append(args, arg) + } + plargs = append(plargs, args) + } + + // break data into random number of chunks + chunkn := rand.Int() % 100 + if chunkn == 0 { + chunkn = 1 + } + if len(data) < chunkn { + continue + } + var chunks [][]byte + var chunksz int + for i := 0; i < len(data); i += chunksz { + chunksz = rand.Int() % (len(data) / chunkn) + var chunk []byte + if i+chunksz < len(data) { + chunk = data[i : i+chunksz] + } else { + chunk = data[i:] + } + chunks = append(chunks, chunk) + } + + // process chunks + var rbuf []byte + var fargs [][][]byte + for _, chunk := range chunks { + var data []byte + if len(rbuf) > 0 { + data = append(rbuf, chunk...) + } else { + data = chunk + } + for { + complete, args, _, leftover, err := ReadNextCommand(data, nil) + data = leftover + if err != nil { + t.Fatal(err) + } + if !complete { + break + } + fargs = append(fargs, args) + } + rbuf = append(rbuf[:0], data...) + } + // compare final args to original + if len(plargs) != len(fargs) { + t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs)) + } + for i := 0; i < len(plargs); i++ { + if len(plargs[i]) != len(fargs[i]) { + t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i])) + } + for j := 0; j < len(plargs[i]); j++ { + if !bytes.Equal(plargs[i][j], plargs[i][j]) { + t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j])) + } + } + } + } +} diff --git a/vendor/github.com/tidwall/redcon/example/clone.go b/vendor/github.com/tidwall/redcon/example/clone.go new file mode 100644 index 00000000..f32bc1c0 --- /dev/null +++ b/vendor/github.com/tidwall/redcon/example/clone.go @@ -0,0 +1,87 @@ +package main + +import ( + "log" + "strings" + "sync" + + "github.com/tidwall/redcon" +) + +var addr = ":6380" + +func main() { + var mu sync.RWMutex + var items = make(map[string][]byte) + go log.Printf("started server at %s", addr) + err := redcon.ListenAndServe(addr, + func(conn redcon.Conn, cmd redcon.Command) { + switch strings.ToLower(string(cmd.Args[0])) { + default: + conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "detach": + hconn := conn.Detach() + log.Printf("connection has been detached") + go func() { + defer hconn.Close() + hconn.WriteString("OK") + hconn.Flush() + }() + return + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "set": + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + items[string(cmd.Args[1])] = cmd.Args[2] + mu.Unlock() + conn.WriteString("OK") + case "get": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.RLock() + val, ok := items[string(cmd.Args[1])] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteBulk(val) + } + case "del": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + _, ok := items[string(cmd.Args[1])] + delete(items, string(cmd.Args[1])) + mu.Unlock() + if !ok { + conn.WriteInt(0) + } else { + conn.WriteInt(1) + } + } + }, + func(conn redcon.Conn) bool { + // use this function to accept or deny the connection. + // log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn redcon.Conn, err error) { + // this is called when the connection has been closed + // log.Printf("closed: %s, err: %v", conn.RemoteAddr(), err) + }, + ) + if err != nil { + log.Fatal(err) + } +} diff --git a/vendor/github.com/tidwall/redcon/example/tls/clone.go b/vendor/github.com/tidwall/redcon/example/tls/clone.go new file mode 100644 index 00000000..8d1b67cf --- /dev/null +++ b/vendor/github.com/tidwall/redcon/example/tls/clone.go @@ -0,0 +1,118 @@ +package main + +import ( + "crypto/tls" + "log" + "strings" + "sync" + + "github.com/tidwall/redcon" +) + +const serverKey = `-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIHg+g2unjA5BkDtXSN9ShN7kbPlbCcqcYdDu+QeV8XWuoAoGCCqGSM49 +AwEHoUQDQgAEcZpodWh3SEs5Hh3rrEiu1LZOYSaNIWO34MgRxvqwz1FMpLxNlx0G +cSqrxhPubawptX5MSr02ft32kfOlYbaF5Q== +-----END EC PRIVATE KEY----- +` + +const serverCert = `-----BEGIN CERTIFICATE----- +MIIB+TCCAZ+gAwIBAgIJAL05LKXo6PrrMAoGCCqGSM49BAMCMFkxCzAJBgNVBAYT +AkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRn +aXRzIFB0eSBMdGQxEjAQBgNVBAMMCWxvY2FsaG9zdDAeFw0xNTEyMDgxNDAxMTNa +Fw0yNTEyMDUxNDAxMTNaMFkxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0 +YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQxEjAQBgNVBAMM +CWxvY2FsaG9zdDBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHGaaHVod0hLOR4d +66xIrtS2TmEmjSFjt+DIEcb6sM9RTKS8TZcdBnEqq8YT7m2sKbV+TEq9Nn7d9pHz +pWG2heWjUDBOMB0GA1UdDgQWBBR0fqrecDJ44D/fiYJiOeBzfoqEijAfBgNVHSME +GDAWgBR0fqrecDJ44D/fiYJiOeBzfoqEijAMBgNVHRMEBTADAQH/MAoGCCqGSM49 +BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 +1xOP035EcraRmWPe7tO0LpXgMxlh2VItpc2uc2w= +-----END CERTIFICATE----- +` + +var addr = ":6380" + +func main() { + cer, err := tls.X509KeyPair([]byte(serverCert), []byte(serverKey)) + if err != nil { + log.Fatal(err) + } + config := &tls.Config{Certificates: []tls.Certificate{cer}} + + var mu sync.RWMutex + var items = make(map[string][]byte) + + go log.Printf("started server at %s", addr) + err = redcon.ListenAndServeTLS(addr, + func(conn redcon.Conn, cmd redcon.Command) { + switch strings.ToLower(string(cmd.Args[0])) { + default: + conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "detach": + hconn := conn.Detach() + log.Printf("connection has been detached") + go func() { + defer hconn.Close() + hconn.WriteString("OK") + hconn.Flush() + }() + return + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "set": + if len(cmd.Args) != 3 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + items[string(cmd.Args[1])] = cmd.Args[2] + mu.Unlock() + conn.WriteString("OK") + case "get": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.RLock() + val, ok := items[string(cmd.Args[1])] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteBulk(val) + } + case "del": + if len(cmd.Args) != 2 { + conn.WriteError("ERR wrong number of arguments for '" + string(cmd.Args[0]) + "' command") + return + } + mu.Lock() + _, ok := items[string(cmd.Args[1])] + delete(items, string(cmd.Args[1])) + mu.Unlock() + if !ok { + conn.WriteInt(0) + } else { + conn.WriteInt(1) + } + } + }, + func(conn redcon.Conn) bool { + return true + }, + func(conn redcon.Conn, err error) { + }, + config, + ) + + if err != nil { + log.Fatal(err) + } +} diff --git a/vendor/github.com/tidwall/redcon/logo.png b/vendor/github.com/tidwall/redcon/logo.png new file mode 100644 index 0000000000000000000000000000000000000000..ee336156d9c76d8182916b7e87fd7fcc51995899 GIT binary patch literal 24104 zcmbTd1#lfr(k&=vu$aM;EM{hAi#}fe42S2Lb|uC?zSX1Ofu?4_ptyfC2v(ZzjWmFB}&!4Hsp5 zGZ%M5CsPn16MG|5Vkuigb5kW#LlaNOVN+h9l!v8?hKq)*47ahpExqACGV~s{4nS!T z5MBWf2Sa0PQx{?*Q*%o@K9b9}E)rr(6Fw4kHd#hl2N6>XOGz&$Q)MqX6=N@JV=faC z0e)g$4{o3WTT>T9Vh>vzJ7;bWK9Yac%MD!rbDM#L_+KI})_f%Y(>Mmk0&ZYD-PYt4` z&c;rb4lb7VcEtbGXlP{b>cU3?^z?sT!PY@m_P-U|Isbd0fFWb>FmzyGqGx2Vwf*P4 z{w3|~qGbAicjJF4?X2SIV9KCm>TK`oWDLxQ8R>rt17r7pZs;FDpf}v|PL{x=7}|*1 z8@t+?+PO%H@{s^f=uIq5xQ*CMSk1VM4C#zn4UOqo*$qwUxJ)=$=~y`pIoUZ(jG5Ss znEzAH|EfF(3zxXKIEN6MIFm3FlNh_O5Tl4N2OB%^$HBtH&ibFSQg+TRhIYoL|7qJ2 zX#3w~x&Du3xka2z4PES=RP60-{-Xs77WOXo&KC9##3Ca9nss7ISwmw>yMJy{{WDSj z^WLJSPL^(_CgM)^w#5G$X>QB^MKAupjllnuH~IfNf($^X82(Y0|A+eg=N6!o|6Kle zF#tdQyPQnzfHmXY3TAN^NI(Dxt9^UvqMHT_rU1nqyGfc#e|m;XO~`F{)jr+twB3jMc}ApfoDe=igu zArkb@8}K5xt;^vcu!=n!V4Je$=P)4{V?&X`>EfVLD7KvSGT)K z5fYizD)Nf@=N8N?teDx^(=*a-kk&0{Pp5Mvdx(!CAS@=k%uQo*h+e+4s*>JQ#Y<0b z!k92SI@FL+OXK2F!^0<)tNhRyqgKiNGz{oBIyqx)tjA8zkUdh>k6Y=NQ!G_HQ=?fZ z;BRa5#YcrJlSeI+PdCF8%j=d>BF2pB?JX*`{M&2#_3O-*fezWa#^&Mv zZN>IlsCTfnG^|0ZCMvD6>=s7pwabvY>}_oFh|YAxLCgI%(WV*K{%sBOTw&)Imn%x*x5kp6aaf3I;L;bMMF zQ*A-7FRqx|M&!6X{O?7VmrLmepDaYb(ulF3qOaXHmCkdFW(@YJc zr$@778JU5hbpWHgYXcUB>%jqutTCWNY^*h{A!G^M$0+H1F-z4jeN5oBpS7#g@KWFM zq`yT)Dv63x*8NdgUOs|m6%f*XWB7StH^)s-B6n?oafKbP~hN@C)N1?^q4u`9uQ%Xuq#2A!qs3>v|}C_L^z`oc;QD*;(0 ziYnCT`O8|lDlg`yki|vMB}LCgh0Tk0)GKzA+FhL=Q4H~o-5suOh6HWPv@wym&u4Wb zg=GBv`7conk?gj@;K;Jy>3?awjnBbX*ef8r7wSRy`T3Q4Q;<_D^kHCynei{N4RN%# za=S`NjSeSed3q-6#kErY;mjyRhsoT%SSrj~k2PWurhSl=f+gUfXQUbRpCY#K_|aHg z4Bs#jfp!uEHeykqA!8#MpcO`zwZQsAR2T-L)6&uqjU2+ngi=|Fx6M+HPIKntcJk>m zqz4H!ozYxwr6A60RY&dlxTJrzG4C1jGR)b4CnZjPH4lkGf=7Z&nQLrNh;n`fyOMgt)26egD zuO*1)RaJ5|*WQ14%Bz1a-q5G(d{b2XQ^UAfSeU!r8_dky?pYRNsfYnfsGBB z!VmU%oo@ zWBisw!tdI68<)-KT%LJdpMbV>Z=$|+XL8S|sqJJZqq;X|Wbj2aV0w>na?D{$uDQ3Ts)E4;Fgb$04ugbd8IDWBIq5GI^3Vy^c}W-A%UKMerMc z@2`yX=!S-K^T`FDn~3g3ELo2pE-p|U9Kfa>iM{8Y&J-OKNMs-pFgMiaT40WUE2N?H z9^B#2fW(43hY~rx-aRjoPwlu@>#Mh<9D6u4uk6OqKHTXq1_~h)v9qzyu;W=}1;({~ z6j)MLHlELw#jdU0xNjn6u)z}APH#fERXS4TIeW?AtzI8uu1+P|U4h3yLez^LT+Z!7Kf*%@d&XS|1 z8Bk?m>2dtRvS`23h9~$Jb#_(p+?MNbz=WPN?BRTUckNsHwX@R;$OrH*hUTVwHq}E! zu`aJEzosy=u3x~2;~^2sw7yEh_Q`4j#O&uE2aOJe_x$ z#;S}9$$f`?H3Ul^9TG}hCKG03z!l{Fc+MbsJXlHPywexZLJEO9g)!GfAuh3QWpAE~ zr4X~y%F5F6^3RvkQXCeesxCbg49LcopyEG&c7l=9kqC@9+)12U)-~((B?vj49P5{L z-nYZX1>TcACi!Yv%;N1{6jK|ZY?~?WNE2(%T3VN>szc+RiTWTr8SLKS7${j<@xoXn4g~kcDM2| z6;kBE<&FPG-St*aIF&N$W$SnRFI)R&d1D8W9o`iJFGDlafp<9D7p=Df6y6eqbH!?@ zXG_ygXBs!b$WZ2{EsLZ}w(N7%sbXqOn`KV{l zNgD1_qFX(w-(+6u#i)sX%APXSve~)1xEUe7yjG{2J z{ExpmyEmosdPpcv4?o`~pc~t;U$#`DJ0}Z@dtKZGL8iPuw%Mf1>gxGfP3K28ZHhZB zC$S5^Sqe(VoAj#dXZ_-+zCKc%%yAGSQMNsAQLShl9qGmZ-q;R1G%Jj<++($I^s)>2 z0i<0dk@Jrcy6U_CXPBrIjX=Y@ZX}=qfRit6aR^M7X1Z*f4UCrtO#HC(B!=Cl-TbNU z`RP19uZP#iTUDKTN3APCLL;Fkly4mg3AVvg2Ljz<6JI=~VHyY6BOUo}Z zM})S!lM2ZIq@nc^5Jr%7!odV6u^?La>zycISvG0$-7wMN%3h?iNqR7mCwb>&Ty&+9 zhPTS|b2S(UxtkV6`Ou7*P%m}9zl9<;Uf4Cdwl~1QdPmU6rgq&g9QQ}vkB?`+*A$B8 zceqg5kK^i9lz{^_WRPu_6E)X$(+?gP;jnD*FQ(O9*=-cOc zDNJoM4FELUTam&j06_}w8+Is${pNT&1^|LB^8soT50-gaq8c5gF>iSrf^6KBbZ8m9xheA_H zZuo1IzAX}Rk{zFxlG@np8_SOSlkjm1#EK|PYd`XyV|6jW2%Y&6GH^}V{m=Lva# zH8~X(wJbZAi#Z(~L4XZd(XF&U-(>TF-#{w-@OK+V_4VX}4ea?05Efdopw(!qGTP)v z#>!>Zyxs1F)NWKTW2xsUm9Mr(&5N%LseGhcoELbwItxV>yXdATN~ zoD3~cCP;4m^m6(po5B0hj8Q7T#qc8=RZ89NqMu^}L>Xj*^E^Tn+vigl z#iiN+W1!9k;Ht4cu8}QVfH`WTXf@mHEH)nOMh{q&_2!I)4=Me`YF+(F8)_)M>2o_% zrTdGt30n49JqXd-y{W2df{RG~aoruVLS31g04lIY9|0L6sGQF`l8+BuTDnM;$VcJr zW;7a0=M~}v?J3Fd(zr?A>)n;3HE$vbgot$uOocdw-S1v?JC?xCg~EJ)qr>#`jpA!U z%`VoHH8|q3G;tEwc3rxBCZ*>#Bn*NV4=QXz=|L|tA1`?1ZlQw4^M-fL9?S`v0Y`up zgKqiLl?Y==AkXKym2%lttKcERgie zS=soodkW{B&_Y$!NQZZwdX))l5^Y2L6DDoO&cx!h5yqMe}$JC)>Y=3TEo zvl|~Dz7*x4H+Nprr>C$mZPn)vd?s0)+HZ&P^5}ChF|iB|7PZp=0+ti1F9EDY9|!xQQhp^TJ-&z zXYd=|=Xq>l2`jnTQz&9&IEf(P8c#Er+o4*AZBp*m60E`VJC&NJ-g|?w+}^ z9-7)OZMj9_puy}pI^JA4G%+$CA7TlZ$4Zuo3Owq2W~{7Cp-bW*96>AlcE}bODqFDQ zDZ%o`?yiE9SK{L+kO}rv&u_Ij>sEesK}0-kvK3L>3CPSdTOBgGc1%9S3JZmb!7r1M zNtTfK{`arw$H$(2>EjQTxT3bzwP`~ei=(R2BU;KSz4oETw&5Rz$QiVILU*~Bv~Wlh zI<>+m1mhLH@s0Fr^-}tZLfod-$Cq4~D^_AjkWLJxX$sn~!NL2rwTV*-wOs-y>=q_c zU|?P!m8vCmI59^=1(;0kuE!PSV{~*j3W3FV6&TL-LxQ&FM^qBJ^|l#goJ#_1^UA6y z+R;DrrQOd_p`k)!aKM>S<_<5p_0gxorn58}dKqH_mqICh|Q=CjC5@k|H9;x>R2cS<)gK?H5y0xU|=DnZ#!) z83l9j;hSLQJfmRMqn&2j-TS^)!|oU+Q1}LWU)*F-yH|GSi||Yug4PpjIl&RHOb2xiAMOWXBAUHheOan zd0w2Mu^_WLJ!$>^efhV9a#CY{(9Ef#NCC?Bn+>JYSz~N$J^_!oz-N27_2e$M0hyF# z1LWx3|52}8s2kM~N6I1&kwD`ibw z#HWQMiUcHuzP@Jp3P#4(%mn4_ZC;N_Ds}c$%r9S(e;%;-a;J+R-ru*5NRvrx`M(UW ztdElqWM&|ms`p_R@AYGuCM1;qrp2-h9JTL?QAtlKqR5CDV03hF80w2aQPBL}>=R3b@#`3itDs({Nd zy=*rwE#D!9jm+$9DJr@?nx6B%gv{z_ z0s$_+sr1p+_S!}r^}{XN;a;heQ=<9gMY}`ZaTt-7d?w?<{QC^$1nTk+m6fOSrjh#k zKAwv!gRU@>j)poG|MQjY)y9_T2@Zvv4Y9kqG9gV(N_-oBA8c5KiN}57!dc)rt5I1WAv8RHzkFFUy zAJXI2W4Xk9AAWB~iU=!Rw8BeG(yT&cfrJ`@Qc;0^|7l@uecJh441h|s1O&cQx%KnZ z2rv$E3$x|(*_*Y!3+u9yWD~WhUfgryiC0(Qr&UqESfz%Y5vkB4%0iLE7~X$tR#M34 zF9{w}#t3toV*lYWgb4b>v2jJj%jq!mMINdB8>+Ig@x>og(ndSgAdvk=@B+r6r-qO@Zqy_Zdw3WN`^)D3MQOf%l?kL2AGZ z7y1aEZmrlhChE3L)$ietkJC3KnVb>0NBi%_?6qCG72my8W5z|`Fxzh&rRv9aKARB* z)jSqMVcOPh8(qhXMP=4m@7%mzRzyW+lJOCk-~R{$vVVPj#^^F4i$nQPW6AoB$Guh8HVJU}#BPp!|;tsF5{JQ94gB0hr zgQyoBseiOuFmlh9NrORUTMbOQi@s2!N4T#snJv<@-tQ<`PJ(*+NodL|9<8^n-rPc; zlvO;OTl4s3itdCShKU7bGvmI`%!r>YCA4_djzK=)QKF%SR_dhHxmNg#L%rDf7b#Hg za$aQB+Gx(PEpl$-HDaL8U2I-v(&iFqg7g}hP2k_&j0PW0U?3|P5XVcRPP;F)uy}6p z7XI|3>KkaHMT?pb2{9$|+1lohHIz&n(@2e#s2y6*fltw-v5($ zzp)G^zpZ`EiZLJoR9yrYHSuAH)L7q}&f_lxMKZnlEjO}a1pHy#AcpQL8<`@BB-r=)-IzzWN*rxEx(8%ZG+6!L9lwT$D(<@IaNnqAd&$Wu^OGb^ozz6Gm_VCMJh zI~cc&!b>E6s$*^DH=3&Zq&my_}ihzLM>a;%v3I78w z1fHFP3iYc5$IkQNJ{470)7IgVGp+Bajp9;L@G>rHqF0E139cIp9Z4R%i+osH}etsJ-_sLkiw59S*=f!|x zQskgRLF6{)^g7xtS|xx$0)5W-`UM8nd;jM6Wg-T&z^inIE*^3L7z2p2;p0S<})6UE3vdOBy{hMh~iR51W?~*uDI^Fvo zXdfbpI=q;~p`px5?O7s!9nJu;vrhv(itr~K7V*w2;i+&Gb~3@|HKHK$w%9sz>#nyM z8Z)=D`rWy*AUbVM{Z)v2Scg!18=IX>uC?smZb{S@^%}aZ9S8(M-j|j`61(<UtBWT<&Ci!Gp2m%gu_I zs5|20vM><9%97P@O;(_^(W3J*6gC8~>xtjc%h@@bK2MCpn_TYc#WKae(pbXmmq`wv z?YyeK44G(tPeszKhrxoYhJac-l}}`4W8?D28|&U$I$wDj=ji8g=Pqdv7(DS{1dSWO zh$^seuQgd^&n)oXV8jRV#9FT0i)+wOAYDdN!N~cR)n_pr*lk&H%EZ5dM*u7x3e_-FX1uH zPf|XAE~9s)koo%DEZu2ZGZ~$WDH&@V-n~{#cBtHhd)xm($eT7!u&@ph!O$VO?sI&} z<442BGNAoB<9wlHx@q#z$cK!Kxe&OQX$~?ndTIjN;+ef4rc!7Ux(=woBa8V&WM!Wv zh^}9;5tJM3iL|$pIQE5bVtRH5_HEWE-40LdQlz9{n>MrGn!30&y%zjGeFuk=c-Ofi zj3}V)_aum9z4uVo_%2_=+h0x$PZwa{-nn=quk?Erh#8w2=6xR0Qk&Tj2B#}_*4@7Y zLFtMnk3WzW?GVJoJxNXct9jYSxW^-tm1UsO9TT3-&9=WCwmfeBMyrySR(-uCq*uUf zYioO6l5^u2XNT-ox6DCjyOhpmN5=;ZH)2x$e5X8MwreavziP=uYr4_?k9)DE36cL5 z-(rTS{dLr8zW}dbdirVIeWjJ#9wiq-2qq-uvQxXkUm#jYA*X6l$kzuk z9^EL&5cHA6^WiL6g7COZxwiw0R7%;|dHgWv_VN9+>;32Y=+LAgsR^c(iOIuCLt?k! z5w&k_K~2r=8duOP-u!0~AI^x4T>IxoviHR`h)75bVb=W^OS8(zAfW;)kZ*LI2e7^A zw~J=SdY8iq>T0Q%pcfa4H)SP_`(fR2_`@}G7?8C@r%m7Kri~F>lr1^++lWtJ#2*4-8dx}B~%-fM60OkgYvK6Aap!#cg zz31&)7~-x-Kt5OZ+M3Sm{i2W0ZD=rb%>8}4&66?qirM!}77Cxg5Jz8q3vT~Hy87`G zss2=uPz9F0$%ju+&UqoY*w|dzFfyNLK~2~xcP^zcOG7=Y-)D!rCwm}>@h|w*CXO6V z&|Q?d5_q9-Q6RWWZ~O60#S>%=J+Jz5sy|AM8iNSl-vY^k%V%kAef)e?>;7ec$lnD1 zmvg5oQZHqa6q}vS&Dru4$BWE+PcVB-=xrn%C3XBjp9#Gt$yW7`+3o}YlwV4w@=2C4 zM-C?9W+hhNLc&JqJxIhUHh6#eIa^{V2@E{}-fp4F$f+$F$QC^%8k#bZ=MTqbo7jd- z!Q6^6OKztht4-%aqqBOgeS?Npq4_^RK`rC>#BVeO=jx_8*c$M820ZOh*$}O+qrZJO zF*GE!GEx6}a*7A+^iK~w0|RD`Z`7*oAVJ|yW>bVoLsy%?lfW*% z1L%2(Y+t3V|MH-;x^O$z)D~MP|!7nZt=fj2( zYzVj8#{PUDobb`aNsa(xiWE7GTHwqqyLN4e zFCzZD*1-5?=0MeD$OJAgXS>WCJ;D&O(@r=dQ#9cq&XKHRF_bgt-Tmc{(1~v;`sThb zjnwg_R6e(QHoE^Wq2`tem@&2vS)#ajkSb`?00%w&!B6ex&G!gHi)P2>lSQYu=9Y$( zxLa}7Ub1y}qTel#VHc5bBCc#rX+SmuriJ%@I}G;HB0c%_@)3{s*Xll4G#Oo1)_F_I z#xkAaB-6rtmUe@*vLa|O`rM(Q(b-WAUW^WbW;aaG-qD5}2(}=qU6*b`a^HI+OQMZ{ z+t@(=pU!nFF&4@J5na0x3xu|Zm3YO1T)exm#v}?ltACoEJ3^T)F8lMRy)U#-G?Cez ze!x;z@WECr|7VW>!Li;qjxV`JwVmbV4zF&_xKKc=`4ttrC&}EsjGR)DxnyixA21k^ zOWJmM(^Fe6D4`s$)#sHJf&D5JQ#QQ@Wzu^2lzr;*M`Lhc_nI9Ec8t6jun`u-hdZ;O74lw z?JF}DVd8iQ$Us$4WMZckR{g5k9PHTD`V}|&*env*(Vyh|+>cVLHo)u&;owYxz65tR zs_ClyaBk+GU}P2RpU9*_guSLm5&S~sdO^?%;Ya#2fye7+5_E2Z#MS zcZD7@IObf*EVxUZrt>kXdDg0;`q*kF=~h`ddgM`teQ`^b{`YWE-EZICt2vjv?kE{b z)|qJC1`sUX{qj3qB4$pxY>3e@hK!8ztGhPfV~(+ih}_26)-FEoYFFECZr4S8xV3T^ zK)nuRwi|8Hn;IeSVc)0-AR*Hu;i|w~LW}$J@<;>(fK368sGf5S8Ey1xAv`_0)%WkN zs;cjvXd>DfwPg{vvN?c{5TN;NWHB&fNv(8!oJD}A0ZTnaheMAJ{b$kD@nssXX& zmY;VnHPl;7;cxG~Rh{YHUD1XF{r&Hr;I2{;EfrC_vlp>{ySC9NP#xwv{524vY&CJh zBIMDRqQJll$mJijl3u=%LB!!pIl9BsbZb3o)r+}u`*7=vbNM1qZZg&kiaagwy|JYah+k6>BhrihZU9dqE_?{sM;%?t-zI@#44K^P`>MX2R3PYJf}bSWD`3%hKk6dM z(=ocWQBn>uuwmH1d!PndS^~*?JDSOYvbnh_gOih!T^p0SV_aNLhTDh(*Dj%8lN$TB z%~E){-@3Z^RaCGkBU9JA9c$&~(FmF{2w{%I^yqG>sm^|75b_X+1nHeFRODZ3yak_! z0wVi?^m=mV2Y0l-LeATa$DZV+cDU;C%I+C?Q z?y+~ z?hrhCb?uo1o#G@gc$7{fdQg8Qm2mpa4+}Q#xiT}uAE*D_o2MyEEB+3hG6ggkO33*U zEu+W4%DR`!h5$i1hT!{^LqWa5-Fv1`+BlF#alt@$VDYLRB9BxmE`dfQJflBJV#bzkw0G~h45g)l zLzHcQ+$ijNN1Umn4-%w*l>a`f+uB$3LIz`OPGQ=R*r889b*n%gT1?sXVUq=!k%%NEA%HEhVggYE<2j3mDOpju}zE>{ZCSh3|U=;E$30X3)8T@<+^bK z0=CzO1QL|PQ;(t1Jj>F%qErB%P0;g!LecqJBIN1rK3zA-?)ykUmEf>ydah=@cXxbI zpLtHnvYn3-B(B`*T*69Aiz-LdCT-pm&%E2{O^Q`rUDasGS|~lgx;d}gKIAwiGFzCF zgE~^=7}!}9()(SSD(V|CY4F|)zv|;Ri@xxa4)1eJc=dj}aa&s*OU;r%G|_>-A<4sY z4qG5ZXgK1D_48gphDVqd36F1u#~b(Tbq~SS^Eu6QG`;gP+xb{ZoA&0%4+Z=4f;D%q zyf19>_Y^q+R65XNPJgC0bGGZ38d}PRi{~F6ux!oh8rm{exo>TH;EI$2(b=c$5oBdE zximtBHo6(8V1Ii-0w#kh#Yxp2wjY)~X?W%`Y7ojGz56=4BpZ%U0KhXm(?d%yE`nT7 zwO-WmkmgF%jhnP+1U{bbl;z8AZBuFD$gGvghvxTDr|3(=Y23qsrO9Of8d@%r zkn}xqF?-~C_{agAW#;`UxU&;wa3BS!m2w6ALiMG?bNU;wJw+O@tNy$mRF1;L5O~Bd zns(68_}D}%9Vd?n4~WWp{*r3|u_%O#PL@2%rIy2KchT{}>h4BzKWzIb2uhYdZ8%Fo zfo*NoJR&p8N_Tgk5*iw$`ib`WRiqg=!seW%V`F9gytr5pjMe(yYEvlq?9D_FwsvV@ zS+m(rw}~ftANu}T;C)q5e6%b-$&ht^o(&oa-+w0}Xug7HGV`k#iQQ@4TS<3J$GclC zkT^iOIx`T&$2{o07bBLGet1v^!cQWsi(zBqyp-3taT8&y(7H7W*(k-O-Ey8eXgDz$ zgw&I|w$+(e;wjhourTD!KVqmD{j*|M%^JU}tN5F2G~LBQ)LmJc;`+UXD~fjqLoyjw z-+xvxH#MD(jsPs3msW0LUp_dUB_AkzD#W(s==`c*r`6D5Fdn>reqm3r<+KzOG<6wS z*%jY;O~mC%8Z2kQQzk|9w!ux`eGMty@_)4w1TiN?>BY_%w_|Q;!jFp{)9Gm4N;9Xm zwQZK{Yp5|8r8=>;x>>9yx_!Gfs$Moad^kbFf&ual{Tb}F1tEucDma7udD#`jQ-^Tq zaDF*4#fmD)-0oFFi&*$1Ti4;<{+#cV{gHZ`Vfu!HoQrK1eSv{feeZ+i^8iUno#yh$ z*JOIyU|t` zl@^=Nt(n@pH)!^sd&Fb8bhFRB+YlxMxi6)uxmXh-pRf0{7VHum4&cc`Q0)*2N)e7( zudEAmcfg!jxNRHC*QAb7wtI^eBy@ylR`><&DiInHx=Z9LgoA-`>U=lh{Y#riDvsd! zVktvw_SDjn9WU8>bL*`B`=~b($Oi<1bCH&GWCj6gMly;L{wt7X;cjhadfvUKSB1Y3 zI}*N$y!cXg;>{1;?F@!31+-rx6x{c1!oWinI~aTp()(mG*gZacaF3k`FSgIFQ2}1d za|XMjU8j0a#$1VcpfHUv8Sc%TGO9);csn%zgYBcx?<*8TW`thGslV@oa~pPtRfL$K zhQbOtpA~_9Mr0#gJRUy`FElQ1AH%zz50WK_8kEuH4`m>jHD8ZsAy*r(U4~%iD6S-& z8Eu;=1YRrv$j<DXb1VBE~k+&51LN$tIXRuCoxKf9otF|Ku< z({&=Vx$VNJV(Jz}$q4)F7goP_*Pfqr*;$~F_s3Q&7$RxoOf2T)8-6_4;8*nb$A^b$ z#_9Q@)EHk=gwdRtIh-$?dP)s`ZX`Y|8P_&#+Oc4Jpyn2e&RpFu7TLFho}f9PQZB%z zaWeUS_)y9qdaU~lPO6m$E`EnFoVDQfBR~XchZZKrX7=|xYFNBU$rc4up{@G3>V>cF znsBF*QpgEFHf4l}Bg*M8i%E()1P`FuDqg&P$GEq&vikGb`GB?+y>R_o%&^-Mrf|lP zMy+%pX7`v-&J7$wNTvPN?l{9f=mp{_Ia}QL>IW`Qf#GoIJkZ3b!=WvLHm_G z;)TzxC-H6uBI-%><@wo^R%?&PgV7r`Hm5rUBiFu5*LMpbfd?RmjnvFIzddC(SZ{-n zZ;i7MG8h#-?k2>A)m=Nw*EfAtD_gqN#&1CBWi)HR{rzGsX2-7Gphds?ZF2rkiU5%5S+MsnDFWQi9~)LM}I?xDuRSS><7vho#|f-nV34>G4h0kZfnLj&puKS zTP2$(OeqYp8Cukf)fL1GWGng&tB3T-8A(aKcg2Ni%^)d}O>w+PsuB-#?74h#=P0H& zKTJ5%M1_QU7b@}IrbHaZU_hb)$J9l!{K_ryL$Ys>P*zzRkJa%L1sWT&xxjdY%Ds3#&M48c{pGP4AhqWuu^lE2@Ht#Km=E9D4SJc@Hh@q z)JZ60!GaN@KOU!;G3|cyS5$CK`bq;^GzuidZ~IOYZDEb=`034`HG$kpVmkL|=?khV z0jmBXEd@aGB*w9jU(qVwxhiio*G!$myDUkrnXyZ zp`y>N>Z3wJqM*W#AEUV5>9A)_%q@_^BZf3y z&h7L`tB}n(W2=)cDt+vo@|DWs%a}Q(+3*ZQkMQwW>J4wFRU6lwK0Jl}Nbn<`m{pzOjtp zsnY#{fgSIBjtcRDB(*cFVU`+d`=#?h=%JAFE`dvM?)*Tjer|MeG&MgL@bdO(Bc_e&M86eRFF{yq+2ESvF9!qRqmn_=XC&f!-!fkz(RCF>ON#)QIJ{dT z?H`!ECXSAdEau{}IrMk2!ROKI^l&iP*K5vIi^ON!LY^L+RG1=w5(^T~P)TXz1~ zaL*gC;Wi&LmI6RmutvN#^B+Xi$mn{&g-p5Ku;V1@6gf92@ zY7Mx%^p8@~WF!nU6)?frEFfQ>q<&AC>YcQF8N-^{%-4ww~K zYNaKhFnKzSA#!-A3Vl=CgN3l`pakJ;=$)!>EDjy*D7H)pT zymjjjrId@F2QodOh&Ht*3ON%4j?D-S zc3pa2e71Ft8Amn8_np_*HGknj{G}iFM-P5t64`P1!a>}0E+k)lm)zp@5uU9nDk$hE zg-LJgI$4^MT365rswYOg6ERJ>b zU)e}3$ zJF`q2YvP9*q7M&)MHI$W;ZrU4iN?C`a{d&Rp`)W4j1?+)MC+PUQ6k?xIYXiU@+&=k zMK1f!sp911lxH$)*I_ve03Jh;&UVE4Ya7GX?D z-amb-)NTuQi~1IZU}$aW=y8)pjEh8YbK_RV<1tk}?a&M-w0izAJbaNDxtlR0mBjp$ zZub(mFq?B^(Dygu$?YbDscD&bY`r#Z<~W-L_Nt9(oBQqJDoA(b^!$7SA&>YSo7A)S z3>sDh7&PL^q&I|#c~o$)WfdC6QB>J?>6P2tyyMqkURoJ+^gh$|ZwZphS}W-prw!x{ zrSo(WaequpI1?Ex7SNy|o~{-6|N2G%sgUQquz1SHKnz8I0|n>5Yd&s7A$$?rW+a?< z@b)~A;|F#N*8QpUeq55QH)-nz0U1AX=VB=;>c7#)z^;ahKQ>4fNZ4R?sfSB4^8xu# z&Scyib28ng>U)=vb$X6@A-wLsSqt6LT;VmK_xOeI0$MfaGh^=c;=9+C(1=M;{*PS# zPk2=`v#FzLor@2b!ettPx2f{kKu^#u0j9>LCPKbQXIV~+^+~j+JE4u)}d6%5!iIv`GJKyn?DO=g>ZU~ad?O%)^$K_Uf$?H zaao+nOPN_4k_T;KTs+^rMC7@Tv9&F#?i`_S za@Mj%t}lzu}zp9P82jaF8l1Ply(HO*Gxp{Hx%7W|h({T_y|hD$7yFyLVE99P%HBLIfsKS31Rk!~aFs_dO(4&*-<> z#NwJSz9~N`aGIkuJ7!w;hu}mW@5i_FEg32Y{Ngo7|Y{jo? zU(xRAq$}vWUu_u~wrn-#PJ+fW0AiHS@4IQW3SN5p^ss378*s%%T?z&2=T-(R*-r## z$Hl6JI`h1wyTBS)G;~;4pRr{CmTfVY-~V_gG-J94a)rP_wXeaBUK8tswXY%Z(gnrx zkAZFL-tquREBIj2{L6+T3U2A7KoFqJ083QBwH%S&3#uJ9JwH;czyL~eZ;oKpSRv6F; z&e%|kSz52;Ox0q}W_!!Wl_4@?a%}^f`#iO;7-Y|+r`V0(?GqN({6jl!w(wT^{hl7|5usy!zAxB0ty z4uG1Vsh zkgp`R?f{;_v}5>pAVhe@!UVE3!(0$iR@Pko_f&GSgPz?^s-ZMAku40uvJ_hI-i|v2`fn+*}RQLDj2rw!AyuWCOYZvOnataK0%IoGZTFv9+jM zcAv^^&am5AR7^Ahm@Pm8%M943!!S`INlkri_BSm z?uX(GliFHV0Sb1;!0ruF!F6`<6B$|V>_6F*C*7j?ecqSfJeTIrZHWcq`ThCf z-1)S>0;A;ormP%17-!C@yC8I3fABYCB^bJ=J)p!$v0}1PcNG9~y^*qVep7hQ;2i&L zyTqhM_Ysf-4zF2_VY^{*=1@?>l&R?J`o5Hvtl3$6+#lxTXg1f-aHInCE4VXydj2LC zOK-zGXMSB&baeMYf;WucEs3CXpGTxkcoCK6li8Z>8s66=`tTRA#EcG0H8r)yJ(rF_ z3iL#scDhqT3CTSmC$ru{%CbAU?glUIf zp?trCcaNZ@t0%P^$~#Jjxb#WAR#^Arzq(~x8FMk`)(^Wl9!}gcbXqtkwYA_7<%~@7 zi}*(Hne|Nuwe12~UlOTo7U;vg#Pmlv1%a--(Vx%7g$nJimI^s`0nAiXE;D3yvs=xQ zg)|J+|5q(%6&2+hc70kJK_sNR8|jn=3E@vlD-zN$^iT>2NSAbjNGKqUbobChBcL!2 zjWok|`+jHd`Fk{L%^b|kGxvICU;Em>9d*XB;W0}mW53T#J-e1e}2 z%F564=~FZDY7$u%0&Ah4DfR$I;I zwl&c6_fNHp3)WZZE-DHz#j z>HIcU)oU~VUZ&pRMbj4JsM`RVEXLW&$oHp^t&=D7G$gp1 zfTJ8_5p`uye&d4-^%V|sB!x3Zm7?6-b`!Km!9g1|k5U>uOIdlxPHPk~{$3d?rkLXb zfuS*NVr6yQ{lMC?3-j~M`Lcnotl>S}ULxFhFWM$`csoBcGw5PfCs?&sKl5bc9Wu-n zl~ByQs-Rhl2oE>VmGL;oTO3tLM&fqD%cvk9p^hh`H*2V4y3ebTMSwg*zgD-%A?yx*{uAUhAZB#|i0#8qp@%5@FB9V|;88t;`Pc!yn@7B@#%Ll?*{T)zD$dh567k{Hj z^3A@5#-UYKCN&75v^u~U;*^m1o22GW`yT61SvNEMr%T%$0vK!uth=RF*^>1;yD!>c zLz@ud_sY|rm*2p8zQnb=C-kMYNaoUH?pqn}&pW^K;VBc$q#^z0!LqO?aho&cz*G2* zCf6Y<;Wt#IqOMhIN1XfgFEdMV!dqF-lDpZ8M7&2uWx6CI!=w^SopudA8418p^@qJor^S;PN>ioAaPDK(1WJT?wHWS90Qz=XfR z%u&DP@n-ubq@3^&GlskHJNV^Jh8$Q|i3S$#{?ohl3pEjpWbSqjG1K#NPw$SDZZbu( zv@)o&;~vIpRLnYBb-9Z1=<7$0lw9A2wO)+K%61{)u1RFoBC@;^e9^LZkR}sYq6Imjwp!7LL40d#adEdcY<^v z$5jsll@`Vp1_l@Xlt3g;MLz-+0YND#5$gPBR;Q-COJBG_L&D6Y{2xs%mw+D$EL;b# znZjkzVuGgg{CY5j-Cfm|wh|234OUTi&VxSoH}9^IPk|!;LqfaZKGN+@J6;7Qc|fS! z=Dqkv*`Dl$mAN`Wue5oC~0V-dHHe<8@p`Y=nOMUk2>ZV_j?O_a{S!& z4%xQc>0gs1&$YZBA97LH^O|}FbYplKJj(X>Ji)y&rk`~chU7?uJ+lQK5wn~n*%O4g z-?#e$#pD7;4QuPk%}??Od9m!;>hZ<3HRx4c<9G6gr!k_=tE->Wgpywy*R%wG&m-K0 z+l_*xcCv(E;^2|>fjK$7_wOB2bIwjnyGM&-SiKGc7SA@(EA$Gw6zCt9jt%LRm0hc< z7De8~O1_v?P)F=#XPd%I^+K~$jSXh%i_sd0g&vk4IK(jToP6w$WeaMgJ8CH zo?2O3J3pPAh!0eR5=#UjQOfnkzgxWG)Dl#6vbIqpkBa%yK{dgu_6V%m1eu+Kq_BKE zE(^G{b|RX^u?K31M|h?NN|>cE!FZ=Eb>_Cj%f0mOtUDFZ7zx3KEKTccdkDRx7^!vdzSXtvI{57IQDL@Bw&Lws zCypF!hf_T1_NW&&Qx--@W%B=L`trPBGb_$L6pMlytaY&%xH zXd-3T<6Jp>EAp|UsK{ZI!b49F>>F#`w!^aYa-}vi$6tuNDQWA{C|+lA9UwGNQv=-% zKUt$&db+#muRPgu+zqFy-~ne2AffTA$|CkOU6PnuhP6??Ty_(!4sP zLK-(rF5>m8-~j^`(FVS{(z8gn#ZYD*sNrKFcLD<_j*iI%shm^DwS*XYFxb!BzAIUrNSZPnMw$YJGA`;hj zkEutx1@kt3{ECO&jDI3PKnLtGzZI`Tj^g6R&06%S9(Gy2+jh0*Q-GfL79V?@wkZ?< zMSG1D5j<4T)JvXCPYV5S?d+3-1il_jwxi3G9q~eD*W!`i1UGJ76{VfL_iu z*gpF`Bck(qWEO@_K|du)%6yBSp6Afx6aE>?u@S5n)jpE3W2Z*QSFh}`sFKR zd17vsJvowz==*$PxjLD7c`ddPUAej@fr4MEs=)CUO5PTu#6iqNVB_nGcg8A5A`meo z@>m3zmX+$rH(qBhO?LdI&syiAb@X{v*Cm|>2!}#ZDh{!1?u&w;HAqB8EWZ5y?lAgd zJ;Ce&)~*#H#y*AC)F+|dCg-qg&6?-4+rxWW8O#a2%?ZHngA4dA-D!MKCVD`E$j<8O z30)ZXC66G-OU6~A6kuOH;^kl>ucMx#7GWGbFW19)x1PxKjvhLjNkHUy`ipHMQ;7u~ z!F6GG$|385qck*UoUqhkK|zLb?(Sjh+RU_*q~@^gwbHjAdW{qSH?)5)%)HV0K^tL- zE}9$0o{l*;U7a&^GA`C$&11<#2l_@^m1!lC!a?-t(+ATc?tLF}h-kKLY$k5a2?wKY z?+%sH`AexI8{q{^!7u%n_8s(N+m8p3bCovTt9%Mf=7=8`Ma{XMDG`)ltrbxY>@#5A zZzTjVU}K|YWFAnlsa{9?!`yBZ4&+t`jBiV<56 zDA3R;??T@Ji-Af}&&&n5^NF`uG7PM0A64fTYm4(jFuoqv>I?3Usm%r-^jF=|z(fhxa~L7bc?F9)*Ihu1=T^+EE(~4*?i7 zT?*{U=HRb5e}~a;=TOeF0pEWVpz#v=2?8p*`?%x*Jyk8Fl9_v?H;>=;_A+Tw?oWP_ z&CDMg8qPptR*f>b^Tnz1e4hw?oNiN77GEST@LN_(2m)C_;8`?=g+&dyNE&iFzLB*; zp2HZ)Z~hsmT45?s@F558uYDd%wp<|>7SdHK==73sm}qid-Qr@7c6Ox9`KK-y0bbOr z+n-x%viCt_E={#fb8Uti7uBKGDm8X|-PUm`B*C>fz;e9gnT0PgY_frzinV9K9?6kB;yjneYEe zvh}#$O0aiZ>^eviq-qybiMwXyPb!f^T^LO!OL&Oit>{k(u8Jg$ zcua5aKEFQ^*=OB}BV8ThwrikQL%y7z0s?}9m#|s-`?xA~97d@p5cmI1mG_y0TYaci z&@IreI0C@xuSn#Q@&8)5wJS>VhLBF`)B%-WhN$Pve|my z@agZX4t=H5+ffYk^PB-2FK5#3pr0sz0>N=pDOTAEVZ$f&o{nm3^Tf7jqpV`DLiLg)7cSN-N zzp@O_NXy*gx1t?9Zl-aix|j@6UH8c*QCyTgoBogd0SMJkX`}!KbsGGooGo?l61{(I zUoKz~qs3a|Lqs-fut9bYBCdFk5I{PLjd1kY?)S$wScT*UEm#1EaQv5K<>UTWoZPYP zn~@;?eXOg4lg$XEykw=ZxK*Ib)xj#F=NZEwk$qmFG;qCDlbMVy(ylk&4Y9RuDXANN zu^G)!)6Vn+Rcjz@MBSF_LPvNSdT!$jo%-(P>G@OkLRlyCr_^948tTI*!S3_3nf~P_ z7W9~wF*2;ry8PXOyjQ53Wu3f|vw)8?FXBj*$2*{~nex~)&2J(b6x=1h$*Xy}x&4rf zpI*BC)qRdtc^mT$Rty+|uXn$)vvlR>%$pT(v5+epW!!@vkcZkcVZD+4>k#Aje{%T| zd|!~ruCcL!{E(V}dng!NbdsBI8Yen+8b-IairredP>st4AKQ&d1MmI$%NOV zaxM5-x5nT}QWEKcg>gfmI9H0tWn;5~wUcv|q&p*If>uy?W8{og8ZSvbJ@KK(_|r{e zD_Ok@cr&*8RNtz%!Qy+ozw5y-@5jZjPfo0aU0hRBiyY5QFcmLLOW9U}19vZv>5HA+ z!#}iMBS7RJE1tf3tx4C)bXbp^pUeid#Kk*{;^UNVZVorCJ$VZGLeB4Up$VAZva0#I z1uG7V-XEdY+j-5|Zmv?^LrbtQuYlmh!oBzvb3{#>dR{zRPwt6=yc9|JT&_t&h{fD%X64UyhZa*3hmfdJN?u?))nODNK+z)!qCE)``f{?2E8xk4{ z=vEwnD1YYpNyiUOlBnQ@eEw=7QF;BL3bsuOd%wAF zt^_06z>=HKD*^xM*_*?r?+tUoCwN>LftA<<>vFf`O$r8oo$(&k8QD^S&YP-$Evco#G9Bj!`l{GP+gjJpfH7oF*AI+1-7IWyv ze-f<_k?-7$Vpxi2X!7$#}^B1DS#2N^;L zq#4l539j$wIERO;;(dR|Y3F1vIT3CAZX^Mf2wZ7(J`}hsItf;!JI;#AF?Gw%KNBBz z?mCz%zjs0|7JMXhbQk1QC(fzUqY`^V7_dh}LqkmGBp+zxb<7OPTZq#{tl?96jZMo* z2YvmvcB+w1eIGvW&Bz{l&dS3Iz@{)rB7k~%=?MvgtE^Ae&52#M*|;h$=CV}0GU16u z^M8o^BvMhQ|N5RTTn_a6mxyPms5TaG?ASVDgB*f>vT;PSQeM#X3LNh76YIEK8UXfb z)?*5du- zo)9*&Bq1;3Y9|ybFueE>na=dIcZwpGllAp1fxsnj0FFs_0xvB6&P}&Z%;7^-FAE&! zpBBi|$!}nw-#qjBs?Hi0t5l2c36FV?MSkOtAj;&&(7ythMs7qzHE2;B9fe7#Sdceh zM_THre>l%!tAh(BD{Bi7laNr0u>19W2tD(bX#7dV@V&L9r1&##L;U$qm3boy0w>DG zuuFyw9n=SK|>Uwp&>AieDM@#9Bkv7HzV)u z4};CJ7keb)#Z&ecb6yKq6v0p$Rjg;8wasyW3X_y64x(_`D|FJZF9ZUy?G~ZQ}*ex521{Y;lgL zI(J9$Yhx%@~`H5Ziaf*sGD6_`SXFmkyVG9P^YX+^^%PA zpRuubt=1poegJ&`=x4+Q#`jI^y#ylgLN@w$%T&b1ZaZh&V7UoeMzLS=w z@Kf&&-xB4D%E|nw>vR6ru+quZPWk-9%e0^9>>5|%B9nP^CxdB{{kDKLag6Sf|3s$2 z{mjAYXn_0+#VkMIi|Zhha>Ps1SRJm=KwKV`I}_^a4sCRIp~J{k*hpiL_^9!F#k zG3;^R%c!)gca=NLJwMIpZii@gN@VDBZ&3bsLsWkd?eBIs_VV<;>S{6&QRAMVt1~c> zpHQj~^OoY>^Kyz2yN?#`F!EYh*biC@zHs&OZ>g=DK4!qG-TKO`W$@ruCn-TXSQomW zxSAVvz_Es|cK?JtHiZ`2_;1Sp71bS1Pxm9v8 zi>*P`+TD(jLkj$tX_{^DRAMLbC)22txx+?9(O$vZ?Dr+Osqaf}N(-{G33BE58fs$( zOW9waqDK|w4Bwt^DBffT{`b1?32gVQ;q9rG7SuwM0}7HbWh_av9fvc(9j6c4*jsf7K$q*sGhcQiB&6&>Z8XV&5W4?VEi761SM literal 0 HcmV?d00001 diff --git a/vendor/github.com/tidwall/redcon/redcon.go b/vendor/github.com/tidwall/redcon/redcon.go new file mode 100644 index 00000000..0a9b56da --- /dev/null +++ b/vendor/github.com/tidwall/redcon/redcon.go @@ -0,0 +1,861 @@ +// Package redcon implements a Redis compatible server framework +package redcon + +import ( + "bufio" + "crypto/tls" + "errors" + "io" + "net" + "sync" +) + +var ( + errUnbalancedQuotes = &errProtocol{"unbalanced quotes in request"} + errInvalidBulkLength = &errProtocol{"invalid bulk length"} + errInvalidMultiBulkLength = &errProtocol{"invalid multibulk length"} + errDetached = errors.New("detached") + errIncompleteCommand = errors.New("incomplete command") + errTooMuchData = errors.New("too much data") +) + +type errProtocol struct { + msg string +} + +func (err *errProtocol) Error() string { + return "Protocol error: " + err.msg +} + +// Conn represents a client connection +type Conn interface { + // RemoteAddr returns the remote address of the client connection. + RemoteAddr() string + // Close closes the connection. + Close() error + // WriteError writes an error to the client. + WriteError(msg string) + // WriteString writes a string to the client. + WriteString(str string) + // WriteBulk writes bulk bytes to the client. + WriteBulk(bulk []byte) + // WriteBulkString writes a bulk string to the client. + WriteBulkString(bulk string) + // WriteInt writes an integer to the client. + WriteInt(num int) + // WriteInt64 writes a 64-but signed integer to the client. + WriteInt64(num int64) + // WriteArray writes an array header. You must then write additional + // sub-responses to the client to complete the response. + // For example to write two strings: + // + // c.WriteArray(2) + // c.WriteBulk("item 1") + // c.WriteBulk("item 2") + WriteArray(count int) + // WriteNull writes a null to the client + WriteNull() + // WriteRaw writes raw data to the client. + WriteRaw(data []byte) + // Context returns a user-defined context + Context() interface{} + // SetContext sets a user-defined context + SetContext(v interface{}) + // SetReadBuffer updates the buffer read size for the connection + SetReadBuffer(bytes int) + // Detach return a connection that is detached from the server. + // Useful for operations like PubSub. + // + // dconn := conn.Detach() + // go func(){ + // defer dconn.Close() + // cmd, err := dconn.ReadCommand() + // if err != nil{ + // fmt.Printf("read failed: %v\n", err) + // return + // } + // fmt.Printf("received command: %v", cmd) + // hconn.WriteString("OK") + // if err := dconn.Flush(); err != nil{ + // fmt.Printf("write failed: %v\n", err) + // return + // } + // }() + Detach() DetachedConn + // ReadPipeline returns all commands in current pipeline, if any + // The commands are removed from the pipeline. + ReadPipeline() []Command + // PeekPipeline returns all commands in current pipeline, if any. + // The commands remain in the pipeline. + PeekPipeline() []Command + // NetConn returns the base net.Conn connection + NetConn() net.Conn +} + +// NewServer returns a new Redcon server configured on "tcp" network net. +func NewServer(addr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), +) *Server { + return NewServerNetwork("tcp", addr, handler, accept, closed) +} + +// NewServerNetwork returns a new Redcon server. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket" +func NewServerNetwork( + net, laddr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), +) *Server { + if handler == nil { + panic("handler is nil") + } + s := &Server{ + net: net, + laddr: laddr, + handler: handler, + accept: accept, + closed: closed, + conns: make(map[*conn]bool), + } + return s +} + +// NewServerNetworkTLS returns a new TLS Redcon server. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket" +func NewServerNetworkTLS( + net, laddr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), + config *tls.Config, +) *TLSServer { + if handler == nil { + panic("handler is nil") + } + s := Server{ + net: net, + laddr: laddr, + handler: handler, + accept: accept, + closed: closed, + conns: make(map[*conn]bool), + } + + tls := &TLSServer{ + config: config, + Server: &s, + } + return tls +} + +// Close stops listening on the TCP address. +// Already Accepted connections will be closed. +func (s *Server) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.ln == nil { + return errors.New("not serving") + } + s.done = true + return s.ln.Close() +} + +// ListenAndServe serves incoming connections. +func (s *Server) ListenAndServe() error { + return s.ListenServeAndSignal(nil) +} + +// Close stops listening on the TCP address. +// Already Accepted connections will be closed. +func (s *TLSServer) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.ln == nil { + return errors.New("not serving") + } + s.done = true + return s.ln.Close() +} + +// ListenAndServe serves incoming connections. +func (s *TLSServer) ListenAndServe() error { + return s.ListenServeAndSignal(nil) +} + +// ListenAndServe creates a new server and binds to addr configured on "tcp" network net. +func ListenAndServe(addr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), +) error { + return ListenAndServeNetwork("tcp", addr, handler, accept, closed) +} + +// ListenAndServeTLS creates a new TLS server and binds to addr configured on "tcp" network net. +func ListenAndServeTLS(addr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), + config *tls.Config, +) error { + return ListenAndServeNetworkTLS("tcp", addr, handler, accept, closed, config) +} + +// ListenAndServeNetwork creates a new server and binds to addr. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket" +func ListenAndServeNetwork( + net, laddr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), +) error { + return NewServerNetwork(net, laddr, handler, accept, closed).ListenAndServe() +} + +// ListenAndServeNetworkTLS creates a new TLS server and binds to addr. The network net must be +// a stream-oriented network: "tcp", "tcp4", "tcp6", "unix" or "unixpacket" +func ListenAndServeNetworkTLS( + net, laddr string, + handler func(conn Conn, cmd Command), + accept func(conn Conn) bool, + closed func(conn Conn, err error), + config *tls.Config, +) error { + return NewServerNetworkTLS(net, laddr, handler, accept, closed, config).ListenAndServe() +} + +// ListenServeAndSignal serves incoming connections and passes nil or error +// when listening. signal can be nil. +func (s *Server) ListenServeAndSignal(signal chan error) error { + ln, err := net.Listen(s.net, s.laddr) + if err != nil { + if signal != nil { + signal <- err + } + return err + } + if signal != nil { + signal <- nil + } + return serve(s, ln) +} + +// ListenServeAndSignal serves incoming connections and passes nil or error +// when listening. signal can be nil. +func (s *TLSServer) ListenServeAndSignal(signal chan error) error { + ln, err := tls.Listen(s.net, s.laddr, s.config) + if err != nil { + if signal != nil { + signal <- err + } + return err + } + if signal != nil { + signal <- nil + } + return serve(s.Server, ln) +} + +func serve(s *Server, ln net.Listener) error { + s.mu.Lock() + s.ln = ln + s.mu.Unlock() + defer func() { + ln.Close() + func() { + s.mu.Lock() + defer s.mu.Unlock() + for c := range s.conns { + c.Close() + } + s.conns = nil + }() + }() + for { + lnconn, err := ln.Accept() + if err != nil { + s.mu.Lock() + done := s.done + s.mu.Unlock() + if done { + return nil + } + return err + } + c := &conn{ + conn: lnconn, + addr: lnconn.RemoteAddr().String(), + wr: NewWriter(lnconn), + rd: NewReader(lnconn), + } + s.mu.Lock() + s.conns[c] = true + s.mu.Unlock() + if s.accept != nil && !s.accept(c) { + s.mu.Lock() + delete(s.conns, c) + s.mu.Unlock() + c.Close() + continue + } + go handle(s, c) + } +} + +// handle manages the server connection. +func handle(s *Server, c *conn) { + var err error + defer func() { + if err != errDetached { + // do not close the connection when a detach is detected. + c.conn.Close() + } + func() { + // remove the conn from the server + s.mu.Lock() + defer s.mu.Unlock() + delete(s.conns, c) + if s.closed != nil { + if err == io.EOF { + err = nil + } + s.closed(c, err) + } + }() + }() + + err = func() error { + // read commands and feed back to the client + for { + // read pipeline commands + cmds, err := c.rd.readCommands(nil) + if err != nil { + if err, ok := err.(*errProtocol); ok { + // All protocol errors should attempt a response to + // the client. Ignore write errors. + c.wr.WriteError("ERR " + err.Error()) + c.wr.Flush() + } + return err + } + c.cmds = cmds + for len(c.cmds) > 0 { + cmd := c.cmds[0] + if len(c.cmds) == 1 { + c.cmds = nil + } else { + c.cmds = c.cmds[1:] + } + s.handler(c, cmd) + } + if c.detached { + // client has been detached + return errDetached + } + if c.closed { + return nil + } + if err := c.wr.Flush(); err != nil { + return err + } + } + }() +} + +// conn represents a client connection +type conn struct { + conn net.Conn + wr *Writer + rd *Reader + addr string + ctx interface{} + detached bool + closed bool + cmds []Command +} + +func (c *conn) Close() error { + c.wr.Flush() + c.closed = true + return c.conn.Close() +} +func (c *conn) Context() interface{} { return c.ctx } +func (c *conn) SetContext(v interface{}) { c.ctx = v } +func (c *conn) SetReadBuffer(n int) {} +func (c *conn) WriteString(str string) { c.wr.WriteString(str) } +func (c *conn) WriteBulk(bulk []byte) { c.wr.WriteBulk(bulk) } +func (c *conn) WriteBulkString(bulk string) { c.wr.WriteBulkString(bulk) } +func (c *conn) WriteInt(num int) { c.wr.WriteInt(num) } +func (c *conn) WriteInt64(num int64) { c.wr.WriteInt64(num) } +func (c *conn) WriteError(msg string) { c.wr.WriteError(msg) } +func (c *conn) WriteArray(count int) { c.wr.WriteArray(count) } +func (c *conn) WriteNull() { c.wr.WriteNull() } +func (c *conn) WriteRaw(data []byte) { c.wr.WriteRaw(data) } +func (c *conn) RemoteAddr() string { return c.addr } +func (c *conn) ReadPipeline() []Command { + cmds := c.cmds + c.cmds = nil + return cmds +} +func (c *conn) PeekPipeline() []Command { + return c.cmds +} +func (c *conn) NetConn() net.Conn { + return c.conn +} + +// BaseWriter returns the underlying connection writer, if any +func BaseWriter(c Conn) *Writer { + if c, ok := c.(*conn); ok { + return c.wr + } + return nil +} + +// DetachedConn represents a connection that is detached from the server +type DetachedConn interface { + // Conn is the original connection + Conn + // ReadCommand reads the next client command. + ReadCommand() (Command, error) + // Flush flushes any writes to the network. + Flush() error +} + +// Detach removes the current connection from the server loop and returns +// a detached connection. This is useful for operations such as PubSub. +// The detached connection must be closed by calling Close() when done. +// All writes such as WriteString() will not be written to the client +// until Flush() is called. +func (c *conn) Detach() DetachedConn { + c.detached = true + cmds := c.cmds + c.cmds = nil + return &detachedConn{conn: c, cmds: cmds} +} + +type detachedConn struct { + *conn + cmds []Command +} + +// Flush writes and Write* calls to the client. +func (dc *detachedConn) Flush() error { + return dc.conn.wr.Flush() +} + +// ReadCommand read the next command from the client. +func (dc *detachedConn) ReadCommand() (Command, error) { + if dc.closed { + return Command{}, errors.New("closed") + } + if len(dc.cmds) > 0 { + cmd := dc.cmds[0] + if len(dc.cmds) == 1 { + dc.cmds = nil + } else { + dc.cmds = dc.cmds[1:] + } + return cmd, nil + } + cmd, err := dc.rd.ReadCommand() + if err != nil { + return Command{}, err + } + return cmd, nil +} + +// Command represent a command +type Command struct { + // Raw is a encoded RESP message. + Raw []byte + // Args is a series of arguments that make up the command. + Args [][]byte +} + +// Server defines a server for clients for managing client connections. +type Server struct { + mu sync.Mutex + net string + laddr string + handler func(conn Conn, cmd Command) + accept func(conn Conn) bool + closed func(conn Conn, err error) + conns map[*conn]bool + ln net.Listener + done bool +} + +// TLSServer defines a server for clients for managing client connections. +type TLSServer struct { + *Server + config *tls.Config +} + +// Writer allows for writing RESP messages. +type Writer struct { + w io.Writer + b []byte +} + +// NewWriter creates a new RESP writer. +func NewWriter(wr io.Writer) *Writer { + return &Writer{ + w: wr, + } +} + +// WriteNull writes a null to the client +func (w *Writer) WriteNull() { + w.b = AppendNull(w.b) +} + +// WriteArray writes an array header. You must then write additional +// sub-responses to the client to complete the response. +// For example to write two strings: +// +// c.WriteArray(2) +// c.WriteBulk("item 1") +// c.WriteBulk("item 2") +func (w *Writer) WriteArray(count int) { + w.b = AppendArray(w.b, count) +} + +// WriteBulk writes bulk bytes to the client. +func (w *Writer) WriteBulk(bulk []byte) { + w.b = AppendBulk(w.b, bulk) +} + +// WriteBulkString writes a bulk string to the client. +func (w *Writer) WriteBulkString(bulk string) { + w.b = AppendBulkString(w.b, bulk) +} + +// Buffer returns the unflushed buffer. This is a copy so changes +// to the resulting []byte will not affect the writer. +func (w *Writer) Buffer() []byte { + return append([]byte(nil), w.b...) +} + +// SetBuffer replaces the unflushed buffer with new bytes. +func (w *Writer) SetBuffer(raw []byte) { + w.b = w.b[:0] + w.b = append(w.b, raw...) +} + +// Flush writes all unflushed Write* calls to the underlying writer. +func (w *Writer) Flush() error { + if _, err := w.w.Write(w.b); err != nil { + return err + } + w.b = w.b[:0] + return nil +} + +// WriteError writes an error to the client. +func (w *Writer) WriteError(msg string) { + w.b = AppendError(w.b, msg) +} + +// WriteString writes a string to the client. +func (w *Writer) WriteString(msg string) { + w.b = AppendString(w.b, msg) +} + +// WriteInt writes an integer to the client. +func (w *Writer) WriteInt(num int) { + w.WriteInt64(int64(num)) +} + +// WriteInt64 writes a 64-bit signed integer to the client. +func (w *Writer) WriteInt64(num int64) { + w.b = AppendInt(w.b, num) +} + +// WriteRaw writes raw data to the client. +func (w *Writer) WriteRaw(data []byte) { + w.b = append(w.b, data...) +} + +// Reader represent a reader for RESP or telnet commands. +type Reader struct { + rd *bufio.Reader + buf []byte + start int + end int + cmds []Command +} + +// NewReader returns a command reader which will read RESP or telnet commands. +func NewReader(rd io.Reader) *Reader { + return &Reader{ + rd: bufio.NewReader(rd), + buf: make([]byte, 4096), + } +} + +func parseInt(b []byte) (int, bool) { + if len(b) == 1 && b[0] >= '0' && b[0] <= '9' { + return int(b[0] - '0'), true + } + var n int + var sign bool + var i int + if len(b) > 0 && b[0] == '-' { + sign = true + i++ + } + for ; i < len(b); i++ { + if b[i] < '0' || b[i] > '9' { + return 0, false + } + n = n*10 + int(b[i]-'0') + } + if sign { + n *= -1 + } + return n, true +} + +func (rd *Reader) readCommands(leftover *int) ([]Command, error) { + var cmds []Command + b := rd.buf[rd.start:rd.end] + if rd.end-rd.start == 0 && len(rd.buf) > 4096 { + rd.buf = rd.buf[:4096] + rd.start = 0 + rd.end = 0 + } + if len(b) > 0 { + // we have data, yay! + // but is this enough data for a complete command? or multiple? + next: + switch b[0] { + default: + // just a plain text command + for i := 0; i < len(b); i++ { + if b[i] == '\n' { + var line []byte + if i > 0 && b[i-1] == '\r' { + line = b[:i-1] + } else { + line = b[:i] + } + var cmd Command + var quote bool + var quotech byte + var escape bool + outer: + for { + nline := make([]byte, 0, len(line)) + for i := 0; i < len(line); i++ { + c := line[i] + if !quote { + if c == ' ' { + if len(nline) > 0 { + cmd.Args = append(cmd.Args, nline) + } + line = line[i+1:] + continue outer + } + if c == '"' || c == '\'' { + if i != 0 { + return nil, errUnbalancedQuotes + } + quotech = c + quote = true + line = line[i+1:] + continue outer + } + } else { + if escape { + escape = false + switch c { + case 'n': + c = '\n' + case 'r': + c = '\r' + case 't': + c = '\t' + } + } else if c == quotech { + quote = false + quotech = 0 + cmd.Args = append(cmd.Args, nline) + line = line[i+1:] + if len(line) > 0 && line[0] != ' ' { + return nil, errUnbalancedQuotes + } + continue outer + } else if c == '\\' { + escape = true + continue + } + } + nline = append(nline, c) + } + if quote { + return nil, errUnbalancedQuotes + } + if len(line) > 0 { + cmd.Args = append(cmd.Args, line) + } + break + } + if len(cmd.Args) > 0 { + // convert this to resp command syntax + var wr Writer + wr.WriteArray(len(cmd.Args)) + for i := range cmd.Args { + wr.WriteBulk(cmd.Args[i]) + cmd.Args[i] = append([]byte(nil), cmd.Args[i]...) + } + cmd.Raw = wr.b + cmds = append(cmds, cmd) + } + b = b[i+1:] + if len(b) > 0 { + goto next + } else { + goto done + } + } + } + case '*': + // resp formatted command + marks := make([]int, 0, 16) + outer2: + for i := 1; i < len(b); i++ { + if b[i] == '\n' { + if b[i-1] != '\r' { + return nil, errInvalidMultiBulkLength + } + count, ok := parseInt(b[1 : i-1]) + if !ok || count <= 0 { + return nil, errInvalidMultiBulkLength + } + marks = marks[:0] + for j := 0; j < count; j++ { + // read bulk length + i++ + if i < len(b) { + if b[i] != '$' { + return nil, &errProtocol{"expected '$', got '" + + string(b[i]) + "'"} + } + si := i + for ; i < len(b); i++ { + if b[i] == '\n' { + if b[i-1] != '\r' { + return nil, errInvalidBulkLength + } + size, ok := parseInt(b[si+1 : i-1]) + if !ok || size < 0 { + return nil, errInvalidBulkLength + } + if i+size+2 >= len(b) { + // not ready + break outer2 + } + if b[i+size+2] != '\n' || + b[i+size+1] != '\r' { + return nil, errInvalidBulkLength + } + i++ + marks = append(marks, i, i+size) + i += size + 1 + break + } + } + } + } + if len(marks) == count*2 { + var cmd Command + if rd.rd != nil { + // make a raw copy of the entire command when + // there's a underlying reader. + cmd.Raw = append([]byte(nil), b[:i+1]...) + } else { + // just assign the slice + cmd.Raw = b[:i+1] + } + cmd.Args = make([][]byte, len(marks)/2) + // slice up the raw command into the args based on + // the recorded marks. + for h := 0; h < len(marks); h += 2 { + cmd.Args[h/2] = cmd.Raw[marks[h]:marks[h+1]] + } + cmds = append(cmds, cmd) + b = b[i+1:] + if len(b) > 0 { + goto next + } else { + goto done + } + } + } + } + } + done: + rd.start = rd.end - len(b) + } + if leftover != nil { + *leftover = rd.end - rd.start + } + if len(cmds) > 0 { + return cmds, nil + } + if rd.rd == nil { + return nil, errIncompleteCommand + } + if rd.end == len(rd.buf) { + // at the end of the buffer. + if rd.start == rd.end { + // rewind the to the beginning + rd.start, rd.end = 0, 0 + } else { + // must grow the buffer + newbuf := make([]byte, len(rd.buf)*2) + copy(newbuf, rd.buf) + rd.buf = newbuf + } + } + n, err := rd.rd.Read(rd.buf[rd.end:]) + if err != nil { + return nil, err + } + rd.end += n + return rd.readCommands(leftover) +} + +// ReadCommand reads the next command. +func (rd *Reader) ReadCommand() (Command, error) { + if len(rd.cmds) > 0 { + cmd := rd.cmds[0] + rd.cmds = rd.cmds[1:] + return cmd, nil + } + cmds, err := rd.readCommands(nil) + if err != nil { + return Command{}, err + } + rd.cmds = cmds + return rd.ReadCommand() +} + +// Parse parses a raw RESP message and returns a command. +func Parse(raw []byte) (Command, error) { + rd := Reader{buf: raw, end: len(raw)} + var leftover int + cmds, err := rd.readCommands(&leftover) + if err != nil { + return Command{}, err + } + if leftover > 0 { + return Command{}, errTooMuchData + } + return cmds[0], nil + +} diff --git a/vendor/github.com/tidwall/redcon/redcon_test.go b/vendor/github.com/tidwall/redcon/redcon_test.go new file mode 100644 index 00000000..c93d5117 --- /dev/null +++ b/vendor/github.com/tidwall/redcon/redcon_test.go @@ -0,0 +1,556 @@ +package redcon + +import ( + "bytes" + "fmt" + "io" + "log" + "math/rand" + "net" + "os" + "strconv" + "strings" + "testing" + "time" +) + +// TestRandomCommands fills a bunch of random commands and test various +// ways that the reader may receive data. +func TestRandomCommands(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + // build random commands. + gcmds := make([][]string, 10000) + for i := 0; i < len(gcmds); i++ { + args := make([]string, (rand.Int()%50)+1) // 1-50 args + for j := 0; j < len(args); j++ { + n := rand.Int() % 10 + if j == 0 { + n++ + } + arg := make([]byte, n) + for k := 0; k < len(arg); k++ { + arg[k] = byte(rand.Int() % 0xFF) + } + args[j] = string(arg) + } + gcmds[i] = args + } + // create a list of a buffers + var bufs []string + + // pipe valid RESP commands + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + bufs = append(bufs, msg) + } + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid plain commands + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + var msg string + for j := 0; j < len(args); j++ { + quotes := false + var narg []byte + arg := args[j] + if len(arg) == 0 { + quotes = true + } + for k := 0; k < len(arg); k++ { + switch arg[k] { + default: + narg = append(narg, arg[k]) + case ' ': + quotes = true + narg = append(narg, arg[k]) + case '\\', '"', '*': + quotes = true + narg = append(narg, '\\', arg[k]) + case '\r': + quotes = true + narg = append(narg, '\\', 'r') + case '\n': + quotes = true + narg = append(narg, '\\', 'n') + } + } + msg += " " + if quotes { + msg += "\"" + } + msg += string(narg) + if quotes { + msg += "\"" + } + } + if msg != "" { + msg = msg[1:] + } + msg += "\r\n" + bufs = append(bufs, msg) + } + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid RESP commands in broken chunks + lmsg := "" + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + msg = lmsg + msg + if len(msg) > 0 { + lmsg = msg[len(msg)/2:] + msg = msg[:len(msg)/2] + } + bufs = append(bufs, msg) + } + bufs = append(bufs, lmsg) + bufs = append(bufs, "RESET THE INDEX\r\n") + + // pipe valid RESP commands in large broken chunks + lmsg = "" + for i := 0; i < len(gcmds); i++ { + args := gcmds[i] + msg := fmt.Sprintf("*%d\r\n", len(args)) + for j := 0; j < len(args); j++ { + msg += fmt.Sprintf("$%d\r\n%s\r\n", len(args[j]), args[j]) + } + if len(lmsg) < 1500 { + lmsg += msg + continue + } + msg = lmsg + msg + if len(msg) > 0 { + lmsg = msg[len(msg)/2:] + msg = msg[:len(msg)/2] + } + bufs = append(bufs, msg) + } + bufs = append(bufs, lmsg) + bufs = append(bufs, "RESET THE INDEX\r\n") + + // Pipe the buffers in a background routine + rd, wr := io.Pipe() + go func() { + defer wr.Close() + for _, msg := range bufs { + io.WriteString(wr, msg) + } + }() + defer rd.Close() + cnt := 0 + idx := 0 + start := time.Now() + r := NewReader(rd) + for { + cmd, err := r.ReadCommand() + if err != nil { + if err == io.EOF { + break + } + log.Fatal(err) + } + if len(cmd.Args) == 3 && string(cmd.Args[0]) == "RESET" && + string(cmd.Args[1]) == "THE" && string(cmd.Args[2]) == "INDEX" { + if idx != len(gcmds) { + t.Fatalf("did not process all commands") + } + idx = 0 + break + } + if len(cmd.Args) != len(gcmds[idx]) { + t.Fatalf("len not equal for index %d -- %d != %d", idx, len(cmd.Args), len(gcmds[idx])) + } + for i := 0; i < len(cmd.Args); i++ { + if i == 0 { + if len(cmd.Args[i]) == len(gcmds[idx][i]) { + ok := true + for j := 0; j < len(cmd.Args[i]); j++ { + c1, c2 := cmd.Args[i][j], gcmds[idx][i][j] + if c1 >= 'A' && c1 <= 'Z' { + c1 += 32 + } + if c2 >= 'A' && c2 <= 'Z' { + c2 += 32 + } + if c1 != c2 { + ok = false + break + } + } + if ok { + continue + } + } + } else if string(cmd.Args[i]) == string(gcmds[idx][i]) { + continue + } + t.Fatalf("not equal for index %d/%d", idx, i) + } + idx++ + cnt++ + } + if false { + dur := time.Now().Sub(start) + fmt.Printf("%d commands in %s - %.0f ops/sec\n", cnt, dur, float64(cnt)/(float64(dur)/float64(time.Second))) + } +} +func testDetached(t *testing.T, conn DetachedConn) { + conn.WriteString("DETACHED") + if err := conn.Flush(); err != nil { + t.Fatal(err) + } +} +func TestServerTCP(t *testing.T) { + testServerNetwork(t, "tcp", ":12345") +} +func TestServerUnix(t *testing.T) { + os.RemoveAll("/tmp/redcon-unix.sock") + defer os.RemoveAll("/tmp/redcon-unix.sock") + testServerNetwork(t, "unix", "/tmp/redcon-unix.sock") +} + +func testServerNetwork(t *testing.T, network, laddr string) { + s := NewServerNetwork(network, laddr, + func(conn Conn, cmd Command) { + switch strings.ToLower(string(cmd.Args[0])) { + default: + conn.WriteError("ERR unknown command '" + string(cmd.Args[0]) + "'") + case "ping": + conn.WriteString("PONG") + case "quit": + conn.WriteString("OK") + conn.Close() + case "detach": + go testDetached(t, conn.Detach()) + case "int": + conn.WriteInt(100) + case "bulk": + conn.WriteBulkString("bulk") + case "bulkbytes": + conn.WriteBulk([]byte("bulkbytes")) + case "null": + conn.WriteNull() + case "err": + conn.WriteError("ERR error") + case "array": + conn.WriteArray(2) + conn.WriteInt(99) + conn.WriteString("Hi!") + } + }, + func(conn Conn) bool { + //log.Printf("accept: %s", conn.RemoteAddr()) + return true + }, + func(conn Conn, err error) { + //log.Printf("closed: %s [%v]", conn.RemoteAddr(), err) + }, + ) + if err := s.Close(); err == nil { + t.Fatalf("expected an error, should not be able to close before serving") + } + go func() { + time.Sleep(time.Second / 4) + if err := ListenAndServeNetwork(network, laddr, func(conn Conn, cmd Command) {}, nil, nil); err == nil { + t.Fatalf("expected an error, should not be able to listen on the same port") + } + time.Sleep(time.Second / 4) + + err := s.Close() + if err != nil { + t.Fatal(err) + } + err = s.Close() + if err == nil { + t.Fatalf("expected an error") + } + }() + done := make(chan bool) + signal := make(chan error) + go func() { + defer func() { + done <- true + }() + err := <-signal + if err != nil { + t.Fatal(err) + } + c, err := net.Dial(network, laddr) + if err != nil { + t.Fatal(err) + } + defer c.Close() + do := func(cmd string) (string, error) { + io.WriteString(c, cmd) + buf := make([]byte, 1024) + n, err := c.Read(buf) + if err != nil { + return "", err + } + return string(buf[:n]), nil + } + res, err := do("PING\r\n") + if err != nil { + t.Fatal(err) + } + if res != "+PONG\r\n" { + t.Fatalf("expecting '+PONG\r\n', got '%v'", res) + } + res, err = do("BULK\r\n") + if err != nil { + t.Fatal(err) + } + if res != "$4\r\nbulk\r\n" { + t.Fatalf("expecting bulk, got '%v'", res) + } + res, err = do("BULKBYTES\r\n") + if err != nil { + t.Fatal(err) + } + if res != "$9\r\nbulkbytes\r\n" { + t.Fatalf("expecting bulkbytes, got '%v'", res) + } + res, err = do("INT\r\n") + if err != nil { + t.Fatal(err) + } + if res != ":100\r\n" { + t.Fatalf("expecting int, got '%v'", res) + } + res, err = do("NULL\r\n") + if err != nil { + t.Fatal(err) + } + if res != "$-1\r\n" { + t.Fatalf("expecting nul, got '%v'", res) + } + res, err = do("ARRAY\r\n") + if err != nil { + t.Fatal(err) + } + if res != "*2\r\n:99\r\n+Hi!\r\n" { + t.Fatalf("expecting array, got '%v'", res) + } + res, err = do("ERR\r\n") + if err != nil { + t.Fatal(err) + } + if res != "-ERR error\r\n" { + t.Fatalf("expecting array, got '%v'", res) + } + res, err = do("DETACH\r\n") + if err != nil { + t.Fatal(err) + } + if res != "+DETACHED\r\n" { + t.Fatalf("expecting string, got '%v'", res) + } + }() + go func() { + err := s.ListenServeAndSignal(signal) + if err != nil { + t.Fatal(err) + } + }() + <-done +} + +func TestWriter(t *testing.T) { + buf := &bytes.Buffer{} + wr := NewWriter(buf) + wr.WriteError("ERR bad stuff") + wr.Flush() + if buf.String() != "-ERR bad stuff\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteString("HELLO") + wr.Flush() + if buf.String() != "+HELLO\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteInt(-1234) + wr.Flush() + if buf.String() != ":-1234\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteNull() + wr.Flush() + if buf.String() != "$-1\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteBulk([]byte("HELLO\r\nPLANET")) + wr.Flush() + if buf.String() != "$13\r\nHELLO\r\nPLANET\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteBulkString("HELLO\r\nPLANET") + wr.Flush() + if buf.String() != "$13\r\nHELLO\r\nPLANET\r\n" { + t.Fatal("failed") + } + buf.Reset() + wr.WriteArray(3) + wr.WriteBulkString("THIS") + wr.WriteBulkString("THAT") + wr.WriteString("THE OTHER THING") + wr.Flush() + if buf.String() != "*3\r\n$4\r\nTHIS\r\n$4\r\nTHAT\r\n+THE OTHER THING\r\n" { + t.Fatal("failed") + } + buf.Reset() +} +func testMakeRawCommands(rawargs [][]string) []string { + var rawcmds []string + for i := 0; i < len(rawargs); i++ { + rawcmd := "*" + strconv.FormatUint(uint64(len(rawargs[i])), 10) + "\r\n" + for j := 0; j < len(rawargs[i]); j++ { + rawcmd += "$" + strconv.FormatUint(uint64(len(rawargs[i][j])), 10) + "\r\n" + rawcmd += rawargs[i][j] + "\r\n" + } + rawcmds = append(rawcmds, rawcmd) + } + return rawcmds +} + +func TestReaderRespRandom(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + for h := 0; h < 10000; h++ { + var rawargs [][]string + for i := 0; i < 100; i++ { + var args []string + n := int(rand.Int() % 16) + for j := 0; j < n; j++ { + arg := make([]byte, rand.Int()%512) + rand.Read(arg) + args = append(args, string(arg)) + } + } + rawcmds := testMakeRawCommands(rawargs) + data := strings.Join(rawcmds, "") + rd := NewReader(bytes.NewBufferString(data)) + for i := 0; i < len(rawcmds); i++ { + if len(rawargs[i]) == 0 { + continue + } + cmd, err := rd.ReadCommand() + if err != nil { + t.Fatal(err) + } + if string(cmd.Raw) != rawcmds[i] { + t.Fatalf("expected '%v', got '%v'", rawcmds[i], string(cmd.Raw)) + } + if len(cmd.Args) != len(rawargs[i]) { + t.Fatalf("expected '%v', got '%v'", len(rawargs[i]), len(cmd.Args)) + } + for j := 0; j < len(rawargs[i]); j++ { + if string(cmd.Args[j]) != rawargs[i][j] { + t.Fatalf("expected '%v', got '%v'", rawargs[i][j], string(cmd.Args[j])) + } + } + } + } +} + +func TestPlainReader(t *testing.T) { + rawargs := [][]string{ + {"HELLO", "WORLD"}, + {"HELLO", "WORLD"}, + {"HELLO", "PLANET"}, + {"HELLO", "JELLO"}, + {"HELLO ", "JELLO"}, + } + rawcmds := []string{ + "HELLO WORLD\n", + "HELLO WORLD\r\n", + " HELLO PLANET \r\n", + " \"HELLO\" \"JELLO\" \r\n", + " \"HELLO \" JELLO \n", + } + rawres := []string{ + "*2\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", + "*2\r\n$5\r\nHELLO\r\n$5\r\nWORLD\r\n", + "*2\r\n$5\r\nHELLO\r\n$6\r\nPLANET\r\n", + "*2\r\n$5\r\nHELLO\r\n$5\r\nJELLO\r\n", + "*2\r\n$6\r\nHELLO \r\n$5\r\nJELLO\r\n", + } + data := strings.Join(rawcmds, "") + rd := NewReader(bytes.NewBufferString(data)) + for i := 0; i < len(rawcmds); i++ { + if len(rawargs[i]) == 0 { + continue + } + cmd, err := rd.ReadCommand() + if err != nil { + t.Fatal(err) + } + if string(cmd.Raw) != rawres[i] { + t.Fatalf("expected '%v', got '%v'", rawres[i], string(cmd.Raw)) + } + if len(cmd.Args) != len(rawargs[i]) { + t.Fatalf("expected '%v', got '%v'", len(rawargs[i]), len(cmd.Args)) + } + for j := 0; j < len(rawargs[i]); j++ { + if string(cmd.Args[j]) != rawargs[i][j] { + t.Fatalf("expected '%v', got '%v'", rawargs[i][j], string(cmd.Args[j])) + } + } + } +} + +func TestParse(t *testing.T) { + _, err := Parse(nil) + if err != errIncompleteCommand { + t.Fatalf("expected '%v', got '%v'", errIncompleteCommand, err) + } + _, err = Parse([]byte("*1\r\n")) + if err != errIncompleteCommand { + t.Fatalf("expected '%v', got '%v'", errIncompleteCommand, err) + } + _, err = Parse([]byte("*-1\r\n")) + if err != errInvalidMultiBulkLength { + t.Fatalf("expected '%v', got '%v'", errInvalidMultiBulkLength, err) + } + _, err = Parse([]byte("*0\r\n")) + if err != errInvalidMultiBulkLength { + t.Fatalf("expected '%v', got '%v'", errInvalidMultiBulkLength, err) + } + cmd, err := Parse([]byte("*1\r\n$1\r\nA\r\n")) + if err != nil { + t.Fatal(err) + } + if string(cmd.Raw) != "*1\r\n$1\r\nA\r\n" { + t.Fatalf("expected '%v', got '%v'", "*1\r\n$1\r\nA\r\n", string(cmd.Raw)) + } + if len(cmd.Args) != 1 { + t.Fatalf("expected '%v', got '%v'", 1, len(cmd.Args)) + } + if string(cmd.Args[0]) != "A" { + t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0])) + } + cmd, err = Parse([]byte("A\r\n")) + if err != nil { + t.Fatal(err) + } + if string(cmd.Raw) != "*1\r\n$1\r\nA\r\n" { + t.Fatalf("expected '%v', got '%v'", "*1\r\n$1\r\nA\r\n", string(cmd.Raw)) + } + if len(cmd.Args) != 1 { + t.Fatalf("expected '%v', got '%v'", 1, len(cmd.Args)) + } + if string(cmd.Args[0]) != "A" { + t.Fatalf("expected '%v', got '%v'", "A", string(cmd.Args[0])) + } +}