From ab92df333cf286805427f6865519af6e768310b8 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Sat, 2 Apr 2016 14:46:39 -0700 Subject: [PATCH] added resp package --- vendor/github.com/tidwall/resp/.gitignore | 3 + vendor/github.com/tidwall/resp/.travis.yml | 4 + vendor/github.com/tidwall/resp/LICENSE | 19 + vendor/github.com/tidwall/resp/README.md | 191 +++++ vendor/github.com/tidwall/resp/aof.go | 163 ++++ vendor/github.com/tidwall/resp/aof_test.go | 59 ++ vendor/github.com/tidwall/resp/doc.go | 13 + .../github.com/tidwall/resp/example_test.go | 121 +++ vendor/github.com/tidwall/resp/resp.go | 725 ++++++++++++++++++ vendor/github.com/tidwall/resp/resp_test.go | 234 ++++++ vendor/github.com/tidwall/resp/server.go | 135 ++++ vendor/github.com/tidwall/resp/server_test.go | 92 +++ 12 files changed, 1759 insertions(+) create mode 100644 vendor/github.com/tidwall/resp/.gitignore create mode 100644 vendor/github.com/tidwall/resp/.travis.yml create mode 100644 vendor/github.com/tidwall/resp/LICENSE create mode 100644 vendor/github.com/tidwall/resp/README.md create mode 100644 vendor/github.com/tidwall/resp/aof.go create mode 100644 vendor/github.com/tidwall/resp/aof_test.go create mode 100644 vendor/github.com/tidwall/resp/doc.go create mode 100644 vendor/github.com/tidwall/resp/example_test.go create mode 100644 vendor/github.com/tidwall/resp/resp.go create mode 100644 vendor/github.com/tidwall/resp/resp_test.go create mode 100644 vendor/github.com/tidwall/resp/server.go create mode 100644 vendor/github.com/tidwall/resp/server_test.go diff --git a/vendor/github.com/tidwall/resp/.gitignore b/vendor/github.com/tidwall/resp/.gitignore new file mode 100644 index 00000000..0508cd18 --- /dev/null +++ b/vendor/github.com/tidwall/resp/.gitignore @@ -0,0 +1,3 @@ +.DS_Store +aof.tmp +appendonly.aof \ No newline at end of file diff --git a/vendor/github.com/tidwall/resp/.travis.yml b/vendor/github.com/tidwall/resp/.travis.yml new file mode 100644 index 00000000..508069eb --- /dev/null +++ b/vendor/github.com/tidwall/resp/.travis.yml @@ -0,0 +1,4 @@ +language: go + +go: + - 1.6 diff --git a/vendor/github.com/tidwall/resp/LICENSE b/vendor/github.com/tidwall/resp/LICENSE new file mode 100644 index 00000000..1a6cb670 --- /dev/null +++ b/vendor/github.com/tidwall/resp/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2016 Josh Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/tidwall/resp/README.md b/vendor/github.com/tidwall/resp/README.md new file mode 100644 index 00000000..8f6228cb --- /dev/null +++ b/vendor/github.com/tidwall/resp/README.md @@ -0,0 +1,191 @@ +RESP +==== + +[![Build Status](https://travis-ci.org/tidwall/resp.svg?branch=master)](https://travis-ci.org/tidwall/resp) +[![GoDoc](https://godoc.org/github.com/tidwall/resp?status.svg)](https://godoc.org/github.com/tidwall/resp) + +RESP is a [Go](http://golang.org/) library that provides a reader, writer, and server implementation for the [Redis RESP Protocol](http://redis.io/topics/protocol). + +RESP is short for **REdis Serialization Protocol**. +While the protocol was designed specifically for Redis, it can be used for other client-server software projects. + +The RESP protocol has the advantages of being human readable and with performance of a binary protocol. + +Features +-------- + +- [Reader](#reader) and [Writer](#writer) types for streaming RESP values from files, networks, or byte streams. +- [Server Implementation](#server) for creating your own RESP server. [Clients](#clients) use the same tools and libraries as Redis. +- [Append-only File](#append-only-file) type for persisting RESP values to disk. + +Installation +------------ + +Install resp using the "go get" command: + + go get github.com/tidwall/resp + +The Go distribution is Resp's only dependency. + +Documentation +------------- + +- [API Reference](http://godoc.org/github.com/tidwall/resp) + +Server +------ + +A Redis clone that implements the SET and GET commands. + +- You can interact using the Redis CLI (redis-cli). http://redis.io/download +- Or, use the telnet by typing in "telnet localhost 6380" and type in "set key value" and "get key". +- Or, use a client library such as http://github.com/garyburd/redigo +- The "QUIT" command will close the connection. + +```go +package main + +import ( + "errors" + "log" + "sync" + "github.com/tidwall/resp" +) + +func main() { + var mu sync.RWMutex + kvs := make(map[string]string) + s := resp.NewServer() + s.HandleFunc("set", func(conn *resp.Conn, args []resp.Value) bool { + if len(args) != 3 { + conn.WriteError(errors.New("ERR wrong number of arguments for 'set' command")) + } else { + mu.Lock() + kvs[args[1].String()] = args[2].String() + mu.Unlock() + conn.WriteSimpleString("OK") + } + return true + }) + s.HandleFunc("get", func(conn *resp.Conn, args []resp.Value) bool { + if len(args) != 2 { + conn.WriteError(errors.New("ERR wrong number of arguments for 'get' command")) + } else { + mu.RLock() + s, ok := kvs[args[1].String()] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteString(s) + } + } + return true + }) + if err := s.ListenAndServe(":6379"); err != nil { + log.Fatal(err) + } +} +``` + +Reader +------ + +The resp Reader type allows for an application to read raw RESP values from a file, network, or byte stream. + +```go +raw := "*3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n" +raw += "*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n" +rd := resp.NewReader(bytes.NewBufferString(raw)) +for { + v, _, err := rd.ReadValue() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + fmt.Printf("Read %s\n", v.Type()) + if v.Type() == Array { + for i, v := range v.Array() { + fmt.Printf(" #%d %s, value: '%s'\n", i, v.Type(), v) + } + } +} +// Output: +// Read Array +// #0 BulkString, value: 'set' +// #1 BulkString, value: 'leader' +// #2 BulkString, value: 'Charlie' +// Read Array +// #0 BulkString, value: 'set' +// #1 BulkString, value: 'follower' +// #2 BulkString, value: 'Skyler' +``` + +Writer +------ + +The resp Writer type allows for an application to write raw RESP values to a file, network, or byte stream. + +```go +var buf bytes.Buffer +wr := resp.NewWriter(&buf) +wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("leader"), resp.StringValue("Charlie")}) +wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("follower"), resp.StringValue("Skyler")}) +fmt.Printf("%s", buf.String()) +// Output: +// *3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n +// *3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n +``` + +Append-Only File +---------------- + +An append only file (AOF) allows your application to persist values to disk. It's very easy to use, and includes the same level of durablilty and binary format as [Redis AOF Persistence](http://redis.io/topics/persistence). + +Check out the [AOF documentation](https://godoc.org/github.com/tidwall/resp#AOF) for more information + +```go +// create and fill an appendonly file +aof, err := resp.OpenAOF("appendonly.aof") +if err != nil { + log.Fatal(err) +} +// append a couple values and close the file +aof.Append(resp.MultiBulkValue("set", "leader", "Charlie")) +aof.Append(resp.MultiBulkValue("set", "follower", "Skyler")) +aof.Close() + +// reopen and scan all values +aof, err = resp.OpenAOF("appendonly.aof") +if err != nil { + log.Fatal(err) +} +defer aof.Close() +aof.Scan(func(v Value) { + fmt.Printf("%s\n", v.String()) +}) + +// Output: +// [set leader Charlie] +// [set follower Skyler] +} + +``` + +Clients +------- + +There are bunches of [RESP Clients](http://redis.io/clients). Most any client that supports Redis will support this implementation. + +Contact +------- + +Josh Baker [@tidwall](http://twitter.com/tidwall) + +License +------- + +Tile38 source code is available under the MIT [License](/LICENSE). + diff --git a/vendor/github.com/tidwall/resp/aof.go b/vendor/github.com/tidwall/resp/aof.go new file mode 100644 index 00000000..4930b88b --- /dev/null +++ b/vendor/github.com/tidwall/resp/aof.go @@ -0,0 +1,163 @@ +package resp + +import ( + "errors" + "io" + "os" + "sync" + "time" +) + +// SyncPolicy represents a file's fsync policy. +type SyncPolicy int + +const ( + Never SyncPolicy = iota // The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method. + EverySecond SyncPolicy = iota // The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster. + Always SyncPolicy = iota // The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow. +) + +// String returns a string respesentation. +func (policy SyncPolicy) String() string { + switch policy { + default: + return "unknown" + case Never: + return "never" + case EverySecond: + return "every second" + case Always: + return "always" + } +} + +var errClosed = errors.New("closed") + +// AOF represents an open file descriptor. +type AOF struct { + mu sync.Mutex + f *os.File + closed bool + rd *Reader + policy SyncPolicy + atEnd bool +} + +// OpenAOF will open and return an AOF file. If the file does not exist a new one will be created. +func OpenAOF(path string) (*AOF, error) { + var err error + aof := &AOF{} + aof.f, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return nil, err + } + aof.policy = EverySecond + go func() { + for { + time.Sleep(time.Second) + aof.mu.Lock() + if aof.closed { + aof.mu.Unlock() + return + } + if aof.policy == EverySecond { + aof.f.Sync() + } + aof.mu.Unlock() + } + }() + return aof, nil +} + +// SetSyncPolicy set the sync policy of the file. +// The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster. +// The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method. +// The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow. +// EverySecond is the default. +func (aof *AOF) SetSyncPolicy(policy SyncPolicy) { + aof.mu.Lock() + defer aof.mu.Unlock() + if aof.policy == policy { + return + } + switch policy { + default: + return + case Never, EverySecond, Always: + } + aof.policy = policy +} + +// Close will close the file. +func (aof *AOF) Close() error { + aof.mu.Lock() + defer aof.mu.Unlock() + if aof.closed { + return errClosed + } + aof.f.Close() + aof.closed = true + return nil +} + +func (aof *AOF) readValues(iterator func(v Value)) error { + aof.atEnd = false + if _, err := aof.f.Seek(0, 0); err != nil { + return err + } + rd := NewReader(aof.f) + for { + v, _, err := rd.ReadValue() + if err != nil { + if err == io.EOF { + break + } + return err + } + if iterator != nil { + iterator(v) + } + } + if _, err := aof.f.Seek(0, 2); err != nil { + return err + } + aof.atEnd = true + return nil +} + +// Append writes a value to the end of the file. +func (aof *AOF) Append(v Value) error { + b, err := v.MarshalRESP() + if err != nil { + return err + } + aof.mu.Lock() + defer aof.mu.Unlock() + if aof.closed { + return errClosed + } + if !aof.atEnd { + if err := aof.readValues(nil); err != nil { + return err + } + } + _, err = aof.f.Write(b) + if err != nil { + return err + } + if aof.policy == Always { + aof.f.Sync() + } + return nil +} + +// Scan iterates though all values in the file. +// This operation could take a long time if there lots of values, and the operation cannot be canceled part way through. +func (aof *AOF) Scan(iterator func(v Value)) error { + aof.mu.Lock() + defer aof.mu.Unlock() + if aof.closed { + return errClosed + } + return aof.readValues(iterator) +} diff --git a/vendor/github.com/tidwall/resp/aof_test.go b/vendor/github.com/tidwall/resp/aof_test.go new file mode 100644 index 00000000..25913f64 --- /dev/null +++ b/vendor/github.com/tidwall/resp/aof_test.go @@ -0,0 +1,59 @@ +package resp + +import ( + "fmt" + "os" + "testing" +) + +func TestAOF(t *testing.T) { + defer func() { + os.RemoveAll("aof.tmp") + }() + os.RemoveAll("aof.tmp") + f, err := OpenAOF("aof.tmp") + if err != nil { + t.Fatal(err) + } + defer func() { + f.Close() + }() + for i := 0; i < 12345; i++ { + if err := f.Append(StringValue(fmt.Sprintf("hello world #%d\n", i))); err != nil { + t.Fatal(err) + } + } + i := 0 + if err := f.Scan(func(v Value) { + s := v.String() + e := fmt.Sprintf("hello world #%d\n", i) + if s != e { + t.Fatalf("#%d is '%s', expect '%s'", i, s, e) + } + i++ + }); err != nil { + t.Fatal(err) + } + f.Close() + f, err = OpenAOF("aof.tmp") + if err != nil { + t.Fatal(err) + } + c := i + for i := c; i < c+12345; i++ { + if err := f.Append(StringValue(fmt.Sprintf("hello world #%d\n", i))); err != nil { + t.Fatal(err) + } + } + i = 0 + if err := f.Scan(func(v Value) { + s := v.String() + e := fmt.Sprintf("hello world #%d\n", i) + if s != e { + t.Fatalf("#%d is '%s', expect '%s'", i, s, e) + } + i++ + }); err != nil { + t.Fatal(err) + } +} diff --git a/vendor/github.com/tidwall/resp/doc.go b/vendor/github.com/tidwall/resp/doc.go new file mode 100644 index 00000000..ba777260 --- /dev/null +++ b/vendor/github.com/tidwall/resp/doc.go @@ -0,0 +1,13 @@ +// Copyright 2016 Josh Baker. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +/* +Package resp provides a reader, writer, and server implementation for the RESP protocol. http://redis.io/topics/protocol + +RESP is short for "REdis Serialization Protocol". +While the protocol was designed specifically for Redis, it can be used for other client-server software projects. + +RESP has the advantages of being human readable and with performance of a binary protocol. +*/ +package resp diff --git a/vendor/github.com/tidwall/resp/example_test.go b/vendor/github.com/tidwall/resp/example_test.go new file mode 100644 index 00000000..3dcb7710 --- /dev/null +++ b/vendor/github.com/tidwall/resp/example_test.go @@ -0,0 +1,121 @@ +package resp + +import ( + "bytes" + "errors" + "fmt" + "io" + "log" + "os" + "strings" + "sync" +) + +func ExampleServer() { + // ExampleServer is a Redis clone that implements the SET and GET commands. + // The server runs on port 6380. + // You can interact using the Redis CLI (redis-cli) The http://redis.io/download. + // Or, use the telnet by typing in "telnet localhost 6380" and type in "set key value" and "get key". + // Or, use a client library such as "http://github.com/garyburd/redigo" + // The "QUIT" command will close the connection. + var mu sync.RWMutex + kvs := make(map[string]string) + s := NewServer() + s.HandleFunc("set", func(conn *Conn, args []Value) bool { + if len(args) != 3 { + conn.WriteError(errors.New("ERR wrong number of arguments for 'set' command")) + } else { + mu.Lock() + kvs[args[1].String()] = args[2].String() + mu.Unlock() + conn.WriteSimpleString("OK") + } + return true + }) + s.HandleFunc("get", func(conn *Conn, args []Value) bool { + if len(args) != 2 { + conn.WriteError(errors.New("ERR wrong number of arguments for 'get' command")) + } else { + mu.RLock() + s, ok := kvs[args[1].String()] + mu.RUnlock() + if !ok { + conn.WriteNull() + } else { + conn.WriteString(s) + } + } + return true + }) + if err := s.ListenAndServe(":6380"); err != nil { + log.Fatal(err) + } +} + +func ExampleReader() { + raw := "*3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n" + raw += "*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n" + rd := NewReader(bytes.NewBufferString(raw)) + for { + v, _, err := rd.ReadValue() + if err == io.EOF { + break + } + if err != nil { + log.Fatal(err) + } + fmt.Printf("Read %s\n", v.Type()) + if v.Type() == Array { + for i, v := range v.Array() { + fmt.Printf(" #%d %s, value: '%s'\n", i, v.Type(), v) + } + } + } + // Output: + // Read Array + // #0 BulkString, value: 'set' + // #1 BulkString, value: 'leader' + // #2 BulkString, value: 'Charlie' + // Read Array + // #0 BulkString, value: 'set' + // #1 BulkString, value: 'follower' + // #2 BulkString, value: 'Skyler' +} + +func ExampleWriter() { + var buf bytes.Buffer + wr := NewWriter(&buf) + wr.WriteArray([]Value{StringValue("set"), StringValue("leader"), StringValue("Charlie")}) + wr.WriteArray([]Value{StringValue("set"), StringValue("follower"), StringValue("Skyler")}) + fmt.Printf("%s", strings.Replace(buf.String(), "\r\n", "\\r\\n", -1)) + // Output: + // *3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n +} + +func ExampleAOF() { + os.RemoveAll("appendonly.aof") + + // create and fill an appendonly file + aof, err := OpenAOF("appendonly.aof") + if err != nil { + log.Fatal(err) + } + // append a couple values and close the file + aof.Append(MultiBulkValue("set", "leader", "Charlie")) + aof.Append(MultiBulkValue("set", "follower", "Skyler")) + aof.Close() + + // reopen and scan all values + aof, err = OpenAOF("appendonly.aof") + if err != nil { + log.Fatal(err) + } + defer aof.Close() + aof.Scan(func(v Value) { + fmt.Printf("%s\n", v.String()) + }) + + // Output: + // [set leader Charlie] + // [set follower Skyler] +} diff --git a/vendor/github.com/tidwall/resp/resp.go b/vendor/github.com/tidwall/resp/resp.go new file mode 100644 index 00000000..3ba435a2 --- /dev/null +++ b/vendor/github.com/tidwall/resp/resp.go @@ -0,0 +1,725 @@ +package resp + +import ( + "bytes" + "errors" + "fmt" + "io" + "strconv" +) + +const bufsz = 4096 + +// Type represents a Value type +type Type byte + +const ( + SimpleString Type = '+' + Error Type = '-' + Integer Type = ':' + BulkString Type = '$' + Array Type = '*' +) + +// TypeName returns name of the underlying RESP type. +func (t Type) String() string { + switch t { + default: + return "Unknown" + case '+': + return "SimpleString" + case '-': + return "Error" + case ':': + return "Integer" + case '$': + return "BulkString" + case '*': + return "Array" + } +} + +// Value represents the data of a valid RESP type. +type Value struct { + typ Type + integer int + str []byte + array []Value + null bool +} + +// Integer converts Value to an int. If Value cannot be converted, Zero is returned. +func (v Value) Integer() int { + switch v.typ { + default: + n, _ := strconv.ParseInt(v.String(), 10, 64) + return int(n) + case ':': + return v.integer + } +} + +// String converts Value to a string. +func (v Value) String() string { + if v.typ == '$' { + return string(v.str) + } + switch v.typ { + case '+', '-': + return string(v.str) + case ':': + return strconv.FormatInt(int64(v.integer), 10) + case '*': + return fmt.Sprintf("%v", v.array) + } + return "" +} + +// Bytes converts the Value to a byte array. An empty string is converted to a non-nil empty byte array. If it's a RESP Null value, nil is returned. +func (v Value) Bytes() []byte { + switch v.typ { + default: + return []byte(v.String()) + case '$', '+', '-': + return v.str + } +} + +// Float converts Value to a float64. If Value cannot be converted, Zero is returned. +func (v Value) Float() float64 { + switch v.typ { + default: + f, _ := strconv.ParseFloat(v.String(), 64) + return f + case ':': + return float64(v.integer) + } +} + +// IsNull indicates whether or not the base value is null. +func (v Value) IsNull() bool { + return v.null +} + +// Bool converts Value to an bool. If Value cannot be converted, false is returned. +func (v Value) Bool() bool { + return v.Integer() != 0 +} + +// Error converts the Value to an error. If Value is not an error, nil is returned. +func (v Value) Error() error { + switch v.typ { + case '-': + return errors.New(string(v.str)) + } + return nil +} + +// Array converts the Value to a an array. If Value is not an array or when it's is a RESP Null value, nil is returned. +func (v Value) Array() []Value { + if v.typ == '*' && !v.null { + return v.array + } + return nil +} + +// Type returns the underlying RESP type. The following types are represent valid RESP values. +// '+' SimpleString +// '-' Error +// ':' Integer +// '$' BulkString +// '*' Array +func (v Value) Type() Type { + return v.typ +} + +func marshalSimpleRESP(typ Type, b []byte) ([]byte, error) { + bb := make([]byte, 3+len(b)) + bb[0] = byte(typ) + copy(bb[1:], b) + bb[1+len(b)+0] = '\r' + bb[1+len(b)+1] = '\n' + return bb, nil +} + +func marshalBulkRESP(v Value) ([]byte, error) { + if v.null { + return []byte("$-1\r\n"), nil + } + szb := []byte(strconv.FormatInt(int64(len(v.str)), 10)) + bb := make([]byte, 5+len(szb)+len(v.str)) + bb[0] = '$' + copy(bb[1:], szb) + bb[1+len(szb)+0] = '\r' + bb[1+len(szb)+1] = '\n' + copy(bb[1+len(szb)+2:], v.str) + bb[1+len(szb)+2+len(v.str)+0] = '\r' + bb[1+len(szb)+2+len(v.str)+1] = '\n' + return bb, nil +} + +func marshalArrayRESP(v Value) ([]byte, error) { + if v.null { + return []byte("*-1\r\n"), nil + } + szb := []byte(strconv.FormatInt(int64(len(v.array)), 10)) + + var buf bytes.Buffer + buf.Grow(3 + len(szb) + 16*len(v.array)) // prime the buffer + buf.WriteByte('*') + buf.Write(szb) + buf.WriteByte('\r') + buf.WriteByte('\n') + for i := 0; i < len(v.array); i++ { + data, err := v.array[i].MarshalRESP() + if err != nil { + return nil, err + } + buf.Write(data) + } + return buf.Bytes(), nil +} + +func marshalAnyRESP(v Value) ([]byte, error) { + switch v.typ { + default: + if v.typ == 0 && v.null { + return []byte("$-1\r\n"), nil + } + return nil, errors.New("unknown resp type encountered") + case '-', '+': + return marshalSimpleRESP(v.typ, v.str) + case ':': + return marshalSimpleRESP(v.typ, []byte(strconv.FormatInt(int64(v.integer), 10))) + case '$': + return marshalBulkRESP(v) + case '*': + return marshalArrayRESP(v) + } +} + +// Equals compares one value to another value. +func (v Value) Equals(value Value) bool { + data1, err := v.MarshalRESP() + if err != nil { + return false + } + data2, err := value.MarshalRESP() + if err != nil { + return false + } + return string(data1) == string(data2) +} + +// MarshalRESP returns the original serialized byte representation of Value. +// For more information on this format please see http://redis.io/topics/protocol. +func (v Value) MarshalRESP() ([]byte, error) { + return marshalAnyRESP(v) +} + +var nullValue = Value{null: true} + +type errProtocol struct{ msg string } + +func (err errProtocol) Error() string { + return "Protocol error: " + err.msg +} + +// Reader is a specialized RESP Value type reader. +type Reader struct { + rd io.Reader + buf []byte + p, l, s int + rerr error +} + +// NewReader returns a Reader for reading Value types. +func NewReader(rd io.Reader) *Reader { + r := &Reader{rd: rd} + return r +} + +// ReadValue reads the next Value from Reader. +func (rd *Reader) ReadValue() (value Value, n int, err error) { + value, _, n, err = rd.readValue(false, false) + return +} + +// ReadMultiBulk reads the next multi bulk Value from Reader. +// A multi bulk value is a RESP array that contains one or more bulk strings. +// For more information on RESP arrays and strings please see http://redis.io/topics/protocol. +func (rd *Reader) ReadMultiBulk() (value Value, telnet bool, n int, err error) { + return rd.readValue(true, false) +} + +func (rd *Reader) readValue(multibulk, child bool) (val Value, telnet bool, n int, err error) { + var rn int + var c byte + c, rn, err = rd.readByte() + n += rn + if err != nil { + return nullValue, false, n, err + } + if c == '*' { + val, n, err = rd.readArrayValue(multibulk) + } else if multibulk && !child { + telnet = true + } else { + switch c { + default: + if multibulk && child { + return nullValue, telnet, n, &errProtocol{"expected '$', got '" + string(c) + "'"} + } + if child { + return nullValue, telnet, n, &errProtocol{"unknown first byte"} + } + telnet = true + case '-', '+': + val, n, err = rd.readSimpleValue(c) + case ':': + val, n, err = rd.readIntegerValue() + case '$': + val, n, err = rd.readBulkValue() + } + } + if telnet { + rd.unreadByte(c) + val, n, err = rd.readTelnetMultiBulk() + if err == nil { + telnet = true + } + } + n += rn + if err == io.EOF { + return nullValue, telnet, n, io.ErrUnexpectedEOF + } + return val, telnet, n, err +} + +func (rd *Reader) readTelnetMultiBulk() (v Value, n int, err error) { + var rn int + values := make([]Value, 0, 8) + var c byte + var bline []byte + var quote, mustspace bool + for { + c, rn, err = rd.readByte() + n += rn + if err != nil { + return nullValue, n, err + } + if c == '\n' { + if len(bline) > 0 && bline[len(bline)-1] == '\r' { + bline = bline[:len(bline)-1] + } + break + } + if mustspace && c != ' ' { + return nullValue, n, &errProtocol{"unbalanced quotes in request"} + } + if c == ' ' { + if quote { + bline = append(bline, c) + } else { + values = append(values, Value{typ: '$', str: bline}) + bline = nil + } + } else if c == '"' { + if quote { + mustspace = true + } else { + if len(bline) > 0 { + return nullValue, n, &errProtocol{"unbalanced quotes in request"} + } + quote = true + } + } else { + bline = append(bline, c) + } + } + if quote { + return nullValue, n, &errProtocol{"unbalanced quotes in request"} + } + if len(bline) > 0 { + values = append(values, Value{typ: '$', str: bline}) + } + return Value{typ: '*', array: values}, n, nil +} + +func (rd *Reader) readSimpleValue(typ byte) (val Value, n int, err error) { + var line []byte + line, n, err = rd.readLine() + if err != nil { + return nullValue, n, err + } + return Value{typ: Type(typ), str: line}, n, nil +} + +func (rd *Reader) readBulkValue() (val Value, n int, err error) { + var rn int + var l int + l, rn, err = rd.readInt() + n += rn + if err != nil { + if _, ok := err.(*errProtocol); ok { + return nullValue, n, &errProtocol{"invalid bulk length"} + } + return nullValue, n, err + } + if l < 0 { + return Value{typ: '$', null: true}, n, nil + } + if l > 512*1024*1024 { + return nullValue, n, &errProtocol{"invalid bulk length"} + } + var b []byte + b, rn, err = rd.readBytes(l + 2) + n += rn + if err != nil { + return nullValue, n, err + } + if b[l] != '\r' || b[l+1] != '\n' { + return nullValue, n, &errProtocol{"invalid bulk line ending"} + } + return Value{typ: '$', str: b[:l]}, n, nil +} + +func (rd *Reader) readArrayValue(multibulk bool) (val Value, n int, err error) { + var rn int + var l int + l, rn, err = rd.readInt() + n += rn + if err != nil || l > 1024*1024 { + if _, ok := err.(*errProtocol); ok { + if multibulk { + return nullValue, n, &errProtocol{"invalid multibulk length"} + } + return nullValue, n, &errProtocol{"invalid array length"} + } + return nullValue, n, err + } + if l < 0 { + return Value{typ: '*', null: true}, n, nil + } + var aval Value + vals := make([]Value, l) + for i := 0; i < l; i++ { + aval, _, rn, err = rd.readValue(multibulk, true) + n += rn + if err != nil { + return nullValue, n, err + } + vals[i] = aval + } + return Value{typ: '*', array: vals}, n, nil +} + +func (rd *Reader) readIntegerValue() (val Value, n int, err error) { + var l int + l, n, err = rd.readInt() + if err != nil { + if _, ok := err.(*errProtocol); ok { + return nullValue, n, &errProtocol{"invalid integer"} + } + return nullValue, n, err + } + return Value{typ: ':', integer: l}, n, nil +} + +func (rd *Reader) readInt() (x int, n int, err error) { + var rn int + var c byte + neg := 1 + c, rn, err = rd.readByte() + n += rn + if err != nil { + return 0, n, err + } + if c == '-' { + neg = -1 + c, rn, err = rd.readByte() + n += rn + if err != nil { + return 0, n, err + } + } + var length int + for { + switch c { + default: + return 0, n, &errProtocol{"invalid length"} + case '\r': + c, rn, err = rd.readByte() + n += rn + if err != nil { + return 0, n, err + } + if c != '\n' { + return 0, n, &errProtocol{"invalid length"} + } + return length * neg, n, nil + case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9': + length = (length * 10) + int(c-'0') + } + c, rn, err = rd.readByte() + n += rn + if err != nil { + return 0, n, err + } + } +} + +func (rd *Reader) readLine() (b []byte, n int, err error) { + var lc byte + p := rd.p + l := rd.l + for { + // read byte + for l == 0 { + if err := rd.fillBuffer(true); err != nil { + return nil, 0, err + } + l = rd.l - (p - rd.p) + } + c := rd.buf[p] + p++ + l-- + n++ + if c == '\n' && lc == '\r' { + b = rd.buf[rd.p : rd.p+n-2] + rd.p = p + rd.l -= n + return b, n, nil + } + lc = c + } +} + +func (rd *Reader) readBytes(count int) (b []byte, n int, err error) { + if count < 0 { + return nil, 0, errors.New("invalid argument") + } + for rd.l < count { + if err := rd.fillBuffer(false); err != nil { + return nil, 0, err + } + } + b = rd.buf[rd.p : rd.p+count] + rd.p += count + rd.l -= count + return b, count, nil +} + +func (rd *Reader) readByte() (c byte, n int, err error) { + for rd.l < 1 { + if err := rd.fillBuffer(false); err != nil { + return 0, 0, err + } + } + c = rd.buf[rd.p] + rd.p++ + rd.l-- + return c, 1, nil +} + +func (rd *Reader) unreadByte(c byte) { + if rd.p > 0 { + rd.p-- + rd.l++ + rd.buf[rd.p] = c + return + } + buf := make([]byte, rd.l+1) + buf[0] = c + copy(buf[1:], rd.buf[:rd.l]) + rd.l++ + rd.s = rd.l +} + +func (rd *Reader) fillBuffer(ignoreRebuffering bool) error { + if rd.rerr != nil { + return rd.rerr + } + buf := make([]byte, bufsz) + n, err := rd.rd.Read(buf) + rd.rerr = err + if n > 0 { + if !ignoreRebuffering && rd.l == 0 { + rd.l = n + rd.s = n + rd.p = 0 + rd.buf = buf + } else { + rd.buf = append(rd.buf, buf[:n]...) + rd.s += n + rd.l += n + } + return nil + } + return rd.rerr +} + +// AnyValue returns a RESP value from an interface. This function infers the types. Arrays are not allowed. +func AnyValue(v interface{}) Value { + switch v := v.(type) { + default: + return StringValue(fmt.Sprintf("%v", v)) + case nil: + return NullValue() + case int: + return IntegerValue(int(v)) + case uint: + return IntegerValue(int(v)) + case int8: + return IntegerValue(int(v)) + case uint8: + return IntegerValue(int(v)) + case int16: + return IntegerValue(int(v)) + case uint16: + return IntegerValue(int(v)) + case int32: + return IntegerValue(int(v)) + case uint32: + return IntegerValue(int(v)) + case int64: + return IntegerValue(int(v)) + case uint64: + return IntegerValue(int(v)) + case bool: + return BoolValue(v) + case float32: + return FloatValue(float64(v)) + case float64: + return FloatValue(float64(v)) + case []byte: + return BytesValue(v) + case string: + return StringValue(v) + } +} + +// SimpleStringValue returns a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces. +func SimpleStringValue(s string) Value { return Value{typ: '+', str: []byte(formSingleLine(s))} } + +// BytesValue returns a RESP bulk string. A bulk string can represent any data. +func BytesValue(b []byte) Value { return Value{typ: '$', str: b} } + +// StringValue returns a RESP bulk string. A bulk string can represent any data. +func StringValue(s string) Value { return Value{typ: '$', str: []byte(s)} } + +// NullValue returns a RESP null bulk string. +func NullValue() Value { return Value{typ: '$', null: true} } + +// ErrorValue returns a RESP error. +func ErrorValue(err error) Value { + if err == nil { + return Value{typ: '-'} + } + return Value{typ: '-', str: []byte(err.Error())} +} + +// IntegerValue returns a RESP integer. +func IntegerValue(i int) Value { return Value{typ: ':', integer: i} } + +// BoolValue returns a RESP integer representation of a bool. +func BoolValue(t bool) Value { + if t { + return Value{typ: ':', integer: 1} + } + return Value{typ: ':', integer: 0} +} + +// FloatValue returns a RESP bulk string representation of a float. +func FloatValue(f float64) Value { return StringValue(strconv.FormatFloat(f, 'f', -1, 64)) } + +// ArrayValue returns a RESP array. +func ArrayValue(vals []Value) Value { return Value{typ: '*', array: vals} } + +func formSingleLine(s string) string { + bs1 := []byte(s) + for i := 0; i < len(bs1); i++ { + switch bs1[i] { + case '\r', '\n': + bs2 := make([]byte, len(bs1)) + copy(bs2, bs1) + bs2[i] = ' ' + i++ + for ; i < len(bs2); i++ { + switch bs1[i] { + case '\r', '\n': + bs2[i] = ' ' + } + } + return string(bs2) + } + } + return s +} + +// MultiBulkValue returns a RESP array which contains one or more bulk strings. +// For more information on RESP arrays and strings please see http://redis.io/topics/protocol. +func MultiBulkValue(commandName string, args ...interface{}) Value { + vals := make([]Value, len(args)+1) + vals[0] = StringValue(commandName) + for i, arg := range args { + switch arg := arg.(type) { + default: + vals[i+1] = StringValue(fmt.Sprintf("%v", arg)) + case []byte: + vals[i+1] = StringValue(string(arg)) + case string: + vals[i+1] = StringValue(arg) + case nil: + vals[i+1] = NullValue() + } + } + return ArrayValue(vals) +} + +// Writer is a specialized RESP Value type writer. +type Writer struct { + wr io.Writer +} + +// NewWriter returns a new Writer. +func NewWriter(wr io.Writer) *Writer { + return &Writer{wr} +} + +// WriteValue writes a RESP Value. +func (wr *Writer) WriteValue(v Value) error { + b, err := v.MarshalRESP() + if err != nil { + return err + } + _, err = wr.wr.Write(b) + return nil +} + +// WriteSimpleString writes a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces. +func (wr *Writer) WriteSimpleString(s string) error { return wr.WriteValue(SimpleStringValue(s)) } + +// WriteBytes writes a RESP bulk string. A bulk string can represent any data. +func (wr *Writer) WriteBytes(b []byte) error { return wr.WriteValue(BytesValue(b)) } + +// WriteString writes a RESP bulk string. A bulk string can represent any data. +func (wr *Writer) WriteString(s string) error { return wr.WriteValue(StringValue(s)) } + +// WriteNull writes a RESP null bulk string. +func (wr *Writer) WriteNull() error { return wr.WriteValue(NullValue()) } + +// WriteError writes a RESP error. +func (wr *Writer) WriteError(err error) error { return wr.WriteValue(ErrorValue(err)) } + +// WriteInteger writes a RESP integer. +func (wr *Writer) WriteInteger(i int) error { return wr.WriteValue(IntegerValue(i)) } + +// WriteArray writes a RESP array. +func (wr *Writer) WriteArray(vals []Value) error { return wr.WriteValue(ArrayValue(vals)) } + +// WriteMultiBulk writes a RESP array which contains one or more bulk strings. +// For more information on RESP arrays and strings please see http://redis.io/topics/protocol. +func (wr *Writer) WriteMultiBulk(commandName string, args ...interface{}) error { + return wr.WriteValue(MultiBulkValue(commandName, args...)) +} diff --git a/vendor/github.com/tidwall/resp/resp_test.go b/vendor/github.com/tidwall/resp/resp_test.go new file mode 100644 index 00000000..4e37708b --- /dev/null +++ b/vendor/github.com/tidwall/resp/resp_test.go @@ -0,0 +1,234 @@ +package resp + +import ( + "bytes" + "crypto/rand" + "encoding/binary" + "fmt" + "io" + "strconv" + "strings" + "testing" +) + +func TestIntegers(t *testing.T) { + var n, rn int + var v Value + var err error + data := []byte(":1234567\r\n:-90898\r\n:0\r\n") + r := NewReader(bytes.NewBuffer(data)) + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Integer() != 1234567 { + t.Fatalf("invalid integer: expected %d, got %d", 1234567, v.Integer()) + } + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Integer() != -90898 { + t.Fatalf("invalid integer: expected %d, got %d", -90898, v.Integer()) + } + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Integer() != 0 { + t.Fatalf("invalid integer: expected %d, got %d", 0, v.Integer()) + } + v, rn, err = r.ReadValue() + n += rn + if err != io.EOF { + t.Fatalf("invalid error: expected %v, got %v", io.EOF, err) + } + if n != len(data) { + t.Fatalf("invalid read count: expected %d, got %d", len(data), n) + } +} + +func TestFloats(t *testing.T) { + var n, rn int + var v Value + var err error + data := []byte(":1234567\r\n+-90898\r\n$6\r\n12.345\r\n-90284.987\r\n") + r := NewReader(bytes.NewBuffer(data)) + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Float() != 1234567 { + t.Fatalf("invalid integer: expected %v, got %v", 1234567, v.Float()) + } + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Float() != -90898 { + t.Fatalf("invalid integer: expected %v, got %v", -90898, v.Float()) + } + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Float() != 12.345 { + t.Fatalf("invalid integer: expected %v, got %v", 12.345, v.Float()) + } + v, rn, err = r.ReadValue() + n += rn + if err != nil { + t.Fatal(err) + } + if v.Float() != 90284.987 { + t.Fatalf("invalid integer: expected %v, got %v", 90284.987, v.Float()) + } + v, rn, err = r.ReadValue() + n += rn + if err != io.EOF { + t.Fatalf("invalid error: expected %v, got %v", io.EOF, err) + } + if n != len(data) { + t.Fatalf("invalid read count: expected %d, got %d", len(data), n) + } +} + +// TestLotsaRandomness does generates N resp messages and reads the values though a Reader. +// It then marshals the values back to strings and compares to the original. +// All data and resp types are random. + +func TestLotsaRandomness(t *testing.T) { + n := 10000 + var anys []string + var buf bytes.Buffer + for i := 0; i < n; i++ { + any := randRESPAny() + anys = append(anys, any) + buf.WriteString(any) + } + r := NewReader(bytes.NewBuffer(buf.Bytes())) + for i := 0; i < n; i++ { + v, _, err := r.ReadValue() + if err != nil { + t.Fatal(err) + } + resp, err := v.MarshalRESP() + if err != nil { + t.Fatal(err) + } + if string(resp) != anys[i] { + t.Fatalf("resp failed to remarshal #%d\n-- original --\n%s\n-- remarshalled --\n%s\n-- done --", i, anys[i], string(resp)) + } + } +} + +func randRESPInteger() string { + return fmt.Sprintf(":%d\r\n", (randInt()%1000000)-500000) +} +func randRESPSimpleString() string { + return "+" + strings.Replace(randString(), "\r\n", "", -1) + "\r\n" +} +func randRESPError() string { + return "-" + strings.Replace(randString(), "\r\n", "", -1) + "\r\n" +} +func randRESPBulkString() string { + s := randString() + if len(s)%1024 == 0 { + return "$-1\r\n" + } + return "$" + strconv.FormatInt(int64(len(s)), 10) + "\r\n" + s + "\r\n" +} +func randRESPArray() string { + n := randInt() % 10 + if n%10 == 0 { + return "$-1\r\n" + } + s := "*" + strconv.FormatInt(int64(n), 10) + "\r\n" + for i := 0; i < n; i++ { + rn := randInt() % 100 + if rn == 0 { + s += randRESPArray() + } else { + switch (rn - 1) % 4 { + case 0: + s += randRESPInteger() + case 1: + s += randRESPSimpleString() + case 2: + s += randRESPError() + case 3: + s += randRESPBulkString() + } + } + } + return s +} + +func randInt() int { + n := int(binary.LittleEndian.Uint64(randBytes(8))) + if n < 0 { + n *= -1 + } + return n +} + +func randBytes(n int) []byte { + b := make([]byte, n) + if _, err := io.ReadFull(rand.Reader, b); err != nil { + panic("random error: " + err.Error()) + } + return b +} + +func randString() string { + return string(randBytes(randInt() % 1024)) +} + +func randRESPAny() string { + switch randInt() % 5 { + case 0: + return randRESPInteger() + case 1: + return randRESPSimpleString() + case 2: + return randRESPError() + case 3: + return randRESPBulkString() + case 4: + return randRESPArray() + } + panic("?") +} + +func BenchmarkRead(b *testing.B) { + n := 1000 + var buf bytes.Buffer + for k := 0; k < n; k++ { + buf.WriteString(randRESPAny()) + } + bb := buf.Bytes() + b.ResetTimer() + var j int + var r *Reader + //start := time.Now() + var k int + for i := 0; i < b.N; i++ { + if j == 0 { + r = NewReader(bytes.NewBuffer(bb)) + j = n + } + _, _, err := r.ReadValue() + if err != nil { + b.Fatal(err) + } + j-- + k++ + } + //fmt.Printf("\n%f\n", float64(k)/(float64(time.Now().Sub(start))/float64(time.Second))) +} diff --git a/vendor/github.com/tidwall/resp/server.go b/vendor/github.com/tidwall/resp/server.go new file mode 100644 index 00000000..c1cad489 --- /dev/null +++ b/vendor/github.com/tidwall/resp/server.go @@ -0,0 +1,135 @@ +package resp + +import ( + "errors" + "io" + "net" + "strings" + "sync" +) + +// Server represents a RESP server which handles reading RESP Values. +type Server struct { + mu sync.RWMutex + handlers map[string]func(conn *Conn, args []Value) bool + accept func(conn *Conn) bool +} + +// Conn represents a RESP network connection. +type Conn struct { + *Reader + *Writer + base net.Conn + RemoteAddr string +} + +// NewConn returns a Conn. +func NewConn(conn net.Conn) *Conn { + return &Conn{ + Reader: NewReader(conn), + Writer: NewWriter(conn), + base: conn, + RemoteAddr: conn.RemoteAddr().String(), + } +} + +// NewServer returns a new Server. +func NewServer() *Server { + return &Server{ + handlers: make(map[string]func(conn *Conn, args []Value) bool), + } +} + +// HandleFunc registers the handler function for the given command. +// The conn parameter is a Conn type and it can be used to read and write further RESP messages from and to the connection. +// Returning false will close the connection. +func (s *Server) HandleFunc(command string, handler func(conn *Conn, args []Value) bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.handlers[strings.ToUpper(command)] = handler +} + +// AcceptFunc registers a function for accepting connections. +// Calling this function is optional and it allows for total control over reading and writing RESP Values from and to the connections. +// Returning false will close the connection. +func (s *Server) AcceptFunc(accept func(conn *Conn) bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.accept = accept +} + +// ListenAndServe listens on the TCP network address addr for incoming connections. +func (s *Server) ListenAndServe(addr string) error { + l, err := net.Listen("tcp", addr) + if err != nil { + return err + } + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + return err + } + go func() { + err = s.handleConn(conn) + if err != nil { + if _, ok := err.(*errProtocol); ok { + io.WriteString(conn, "-ERR "+formSingleLine(err.Error())+"\r\n") + } else { + io.WriteString(conn, "-ERR unknown error\r\n") + } + } + conn.Close() + }() + } +} + +func (s *Server) handleConn(nconn net.Conn) error { + conn := NewConn(nconn) + s.mu.RLock() + accept := s.accept + s.mu.RUnlock() + if accept != nil { + if !accept(conn) { + return nil + } + } + for { + v, _, _, err := conn.ReadMultiBulk() + if err != nil { + return err + } + values := v.Array() + if len(values) == 0 { + continue + } + lccommandName := values[0].String() + commandName := strings.ToUpper(lccommandName) + s.mu.RLock() + h := s.handlers[commandName] + s.mu.RUnlock() + switch commandName { + case "QUIT": + if h == nil { + conn.WriteSimpleString("OK") + return nil + } + case "PING": + if h == nil { + if err := conn.WriteSimpleString("PONG"); err != nil { + return err + } + continue + } + } + if h == nil { + if err := conn.WriteError(errors.New("ERR unknown command '" + lccommandName + "'")); err != nil { + return err + } + } else { + if !h(conn, values) { + return nil + } + } + } +} diff --git a/vendor/github.com/tidwall/resp/server_test.go b/vendor/github.com/tidwall/resp/server_test.go new file mode 100644 index 00000000..9c49d83e --- /dev/null +++ b/vendor/github.com/tidwall/resp/server_test.go @@ -0,0 +1,92 @@ +package resp + +import ( + "fmt" + "net" + "os" + "sync" + "testing" + "time" +) + +func TestServer(t *testing.T) { + // Use the example server in example_test + go func() { + ExampleServer() + }() + if os.Getenv("WAIT_ON_TEST_SERVER") == "1" { + select {} + } + time.Sleep(time.Millisecond * 50) + + n := 75 + + // Open N connections and do a bunch of stuff. + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer func() { + wg.Done() + }() + nconn, err := net.Dial("tcp", ":6380") + if err != nil { + t.Fatal(err) + } + defer nconn.Close() + conn := NewConn(nconn) + + // PING + if err := conn.WriteMultiBulk("PING"); err != nil { + t.Fatal(err) + } + val, _, err := conn.ReadValue() + if err != nil { + t.Fatal(err) + } + if val.String() != "PONG" { + t.Fatalf("expecting 'PONG', got '%s'", val) + } + + key := fmt.Sprintf("key:%d", i) + + // SET + if err := conn.WriteMultiBulk("SET", key, 123.4); err != nil { + t.Fatal(err) + } + val, _, err = conn.ReadValue() + if err != nil { + t.Fatal(err) + } + if val.String() != "OK" { + t.Fatalf("expecting 'OK', got '%s'", val) + } + + // GET + if err := conn.WriteMultiBulk("GET", key); err != nil { + t.Fatal(err) + } + val, _, err = conn.ReadValue() + if err != nil { + t.Fatal(err) + } + if val.Float() != 123.4 { + t.Fatalf("expecting '123.4', got '%s'", val) + } + + // QUIT + if err := conn.WriteMultiBulk("QUIT"); err != nil { + t.Fatal(err) + } + val, _, err = conn.ReadValue() + if err != nil { + t.Fatal(err) + } + if val.String() != "OK" { + t.Fatalf("expecting 'OK', got '%s'", val) + } + + }(i) + } + wg.Wait() +}