mirror of https://github.com/tidwall/tile38.git
added resp package
This commit is contained in:
parent
03a3c1642c
commit
ab92df333c
|
@ -0,0 +1,3 @@
|
|||
.DS_Store
|
||||
aof.tmp
|
||||
appendonly.aof
|
|
@ -0,0 +1,4 @@
|
|||
language: go
|
||||
|
||||
go:
|
||||
- 1.6
|
|
@ -0,0 +1,19 @@
|
|||
Copyright (c) 2016 Josh Baker
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
|
@ -0,0 +1,191 @@
|
|||
RESP
|
||||
====
|
||||
|
||||
[![Build Status](https://travis-ci.org/tidwall/resp.svg?branch=master)](https://travis-ci.org/tidwall/resp)
|
||||
[![GoDoc](https://godoc.org/github.com/tidwall/resp?status.svg)](https://godoc.org/github.com/tidwall/resp)
|
||||
|
||||
RESP is a [Go](http://golang.org/) library that provides a reader, writer, and server implementation for the [Redis RESP Protocol](http://redis.io/topics/protocol).
|
||||
|
||||
RESP is short for **REdis Serialization Protocol**.
|
||||
While the protocol was designed specifically for Redis, it can be used for other client-server software projects.
|
||||
|
||||
The RESP protocol has the advantages of being human readable and with performance of a binary protocol.
|
||||
|
||||
Features
|
||||
--------
|
||||
|
||||
- [Reader](#reader) and [Writer](#writer) types for streaming RESP values from files, networks, or byte streams.
|
||||
- [Server Implementation](#server) for creating your own RESP server. [Clients](#clients) use the same tools and libraries as Redis.
|
||||
- [Append-only File](#append-only-file) type for persisting RESP values to disk.
|
||||
|
||||
Installation
|
||||
------------
|
||||
|
||||
Install resp using the "go get" command:
|
||||
|
||||
go get github.com/tidwall/resp
|
||||
|
||||
The Go distribution is Resp's only dependency.
|
||||
|
||||
Documentation
|
||||
-------------
|
||||
|
||||
- [API Reference](http://godoc.org/github.com/tidwall/resp)
|
||||
|
||||
Server
|
||||
------
|
||||
|
||||
A Redis clone that implements the SET and GET commands.
|
||||
|
||||
- You can interact using the Redis CLI (redis-cli). http://redis.io/download
|
||||
- Or, use the telnet by typing in "telnet localhost 6380" and type in "set key value" and "get key".
|
||||
- Or, use a client library such as http://github.com/garyburd/redigo
|
||||
- The "QUIT" command will close the connection.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"sync"
|
||||
"github.com/tidwall/resp"
|
||||
)
|
||||
|
||||
func main() {
|
||||
var mu sync.RWMutex
|
||||
kvs := make(map[string]string)
|
||||
s := resp.NewServer()
|
||||
s.HandleFunc("set", func(conn *resp.Conn, args []resp.Value) bool {
|
||||
if len(args) != 3 {
|
||||
conn.WriteError(errors.New("ERR wrong number of arguments for 'set' command"))
|
||||
} else {
|
||||
mu.Lock()
|
||||
kvs[args[1].String()] = args[2].String()
|
||||
mu.Unlock()
|
||||
conn.WriteSimpleString("OK")
|
||||
}
|
||||
return true
|
||||
})
|
||||
s.HandleFunc("get", func(conn *resp.Conn, args []resp.Value) bool {
|
||||
if len(args) != 2 {
|
||||
conn.WriteError(errors.New("ERR wrong number of arguments for 'get' command"))
|
||||
} else {
|
||||
mu.RLock()
|
||||
s, ok := kvs[args[1].String()]
|
||||
mu.RUnlock()
|
||||
if !ok {
|
||||
conn.WriteNull()
|
||||
} else {
|
||||
conn.WriteString(s)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
if err := s.ListenAndServe(":6379"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Reader
|
||||
------
|
||||
|
||||
The resp Reader type allows for an application to read raw RESP values from a file, network, or byte stream.
|
||||
|
||||
```go
|
||||
raw := "*3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n"
|
||||
raw += "*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n"
|
||||
rd := resp.NewReader(bytes.NewBufferString(raw))
|
||||
for {
|
||||
v, _, err := rd.ReadValue()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("Read %s\n", v.Type())
|
||||
if v.Type() == Array {
|
||||
for i, v := range v.Array() {
|
||||
fmt.Printf(" #%d %s, value: '%s'\n", i, v.Type(), v)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Output:
|
||||
// Read Array
|
||||
// #0 BulkString, value: 'set'
|
||||
// #1 BulkString, value: 'leader'
|
||||
// #2 BulkString, value: 'Charlie'
|
||||
// Read Array
|
||||
// #0 BulkString, value: 'set'
|
||||
// #1 BulkString, value: 'follower'
|
||||
// #2 BulkString, value: 'Skyler'
|
||||
```
|
||||
|
||||
Writer
|
||||
------
|
||||
|
||||
The resp Writer type allows for an application to write raw RESP values to a file, network, or byte stream.
|
||||
|
||||
```go
|
||||
var buf bytes.Buffer
|
||||
wr := resp.NewWriter(&buf)
|
||||
wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("leader"), resp.StringValue("Charlie")})
|
||||
wr.WriteArray([]resp.Value{resp.StringValue("set"), resp.StringValue("follower"), resp.StringValue("Skyler")})
|
||||
fmt.Printf("%s", buf.String())
|
||||
// Output:
|
||||
// *3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n
|
||||
// *3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n
|
||||
```
|
||||
|
||||
Append-Only File
|
||||
----------------
|
||||
|
||||
An append only file (AOF) allows your application to persist values to disk. It's very easy to use, and includes the same level of durablilty and binary format as [Redis AOF Persistence](http://redis.io/topics/persistence).
|
||||
|
||||
Check out the [AOF documentation](https://godoc.org/github.com/tidwall/resp#AOF) for more information
|
||||
|
||||
```go
|
||||
// create and fill an appendonly file
|
||||
aof, err := resp.OpenAOF("appendonly.aof")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// append a couple values and close the file
|
||||
aof.Append(resp.MultiBulkValue("set", "leader", "Charlie"))
|
||||
aof.Append(resp.MultiBulkValue("set", "follower", "Skyler"))
|
||||
aof.Close()
|
||||
|
||||
// reopen and scan all values
|
||||
aof, err = resp.OpenAOF("appendonly.aof")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer aof.Close()
|
||||
aof.Scan(func(v Value) {
|
||||
fmt.Printf("%s\n", v.String())
|
||||
})
|
||||
|
||||
// Output:
|
||||
// [set leader Charlie]
|
||||
// [set follower Skyler]
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
Clients
|
||||
-------
|
||||
|
||||
There are bunches of [RESP Clients](http://redis.io/clients). Most any client that supports Redis will support this implementation.
|
||||
|
||||
Contact
|
||||
-------
|
||||
|
||||
Josh Baker [@tidwall](http://twitter.com/tidwall)
|
||||
|
||||
License
|
||||
-------
|
||||
|
||||
Tile38 source code is available under the MIT [License](/LICENSE).
|
||||
|
|
@ -0,0 +1,163 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SyncPolicy represents a file's fsync policy.
|
||||
type SyncPolicy int
|
||||
|
||||
const (
|
||||
Never SyncPolicy = iota // The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
|
||||
EverySecond SyncPolicy = iota // The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
|
||||
Always SyncPolicy = iota // The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
|
||||
)
|
||||
|
||||
// String returns a string respesentation.
|
||||
func (policy SyncPolicy) String() string {
|
||||
switch policy {
|
||||
default:
|
||||
return "unknown"
|
||||
case Never:
|
||||
return "never"
|
||||
case EverySecond:
|
||||
return "every second"
|
||||
case Always:
|
||||
return "always"
|
||||
}
|
||||
}
|
||||
|
||||
var errClosed = errors.New("closed")
|
||||
|
||||
// AOF represents an open file descriptor.
|
||||
type AOF struct {
|
||||
mu sync.Mutex
|
||||
f *os.File
|
||||
closed bool
|
||||
rd *Reader
|
||||
policy SyncPolicy
|
||||
atEnd bool
|
||||
}
|
||||
|
||||
// OpenAOF will open and return an AOF file. If the file does not exist a new one will be created.
|
||||
func OpenAOF(path string) (*AOF, error) {
|
||||
var err error
|
||||
aof := &AOF{}
|
||||
aof.f, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
aof.policy = EverySecond
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
aof.mu.Lock()
|
||||
if aof.closed {
|
||||
aof.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if aof.policy == EverySecond {
|
||||
aof.f.Sync()
|
||||
}
|
||||
aof.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
return aof, nil
|
||||
}
|
||||
|
||||
// SetSyncPolicy set the sync policy of the file.
|
||||
// The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
|
||||
// The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
|
||||
// The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
|
||||
// EverySecond is the default.
|
||||
func (aof *AOF) SetSyncPolicy(policy SyncPolicy) {
|
||||
aof.mu.Lock()
|
||||
defer aof.mu.Unlock()
|
||||
if aof.policy == policy {
|
||||
return
|
||||
}
|
||||
switch policy {
|
||||
default:
|
||||
return
|
||||
case Never, EverySecond, Always:
|
||||
}
|
||||
aof.policy = policy
|
||||
}
|
||||
|
||||
// Close will close the file.
|
||||
func (aof *AOF) Close() error {
|
||||
aof.mu.Lock()
|
||||
defer aof.mu.Unlock()
|
||||
if aof.closed {
|
||||
return errClosed
|
||||
}
|
||||
aof.f.Close()
|
||||
aof.closed = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (aof *AOF) readValues(iterator func(v Value)) error {
|
||||
aof.atEnd = false
|
||||
if _, err := aof.f.Seek(0, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
rd := NewReader(aof.f)
|
||||
for {
|
||||
v, _, err := rd.ReadValue()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
if iterator != nil {
|
||||
iterator(v)
|
||||
}
|
||||
}
|
||||
if _, err := aof.f.Seek(0, 2); err != nil {
|
||||
return err
|
||||
}
|
||||
aof.atEnd = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append writes a value to the end of the file.
|
||||
func (aof *AOF) Append(v Value) error {
|
||||
b, err := v.MarshalRESP()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
aof.mu.Lock()
|
||||
defer aof.mu.Unlock()
|
||||
if aof.closed {
|
||||
return errClosed
|
||||
}
|
||||
if !aof.atEnd {
|
||||
if err := aof.readValues(nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, err = aof.f.Write(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if aof.policy == Always {
|
||||
aof.f.Sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan iterates though all values in the file.
|
||||
// This operation could take a long time if there lots of values, and the operation cannot be canceled part way through.
|
||||
func (aof *AOF) Scan(iterator func(v Value)) error {
|
||||
aof.mu.Lock()
|
||||
defer aof.mu.Unlock()
|
||||
if aof.closed {
|
||||
return errClosed
|
||||
}
|
||||
return aof.readValues(iterator)
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAOF(t *testing.T) {
|
||||
defer func() {
|
||||
os.RemoveAll("aof.tmp")
|
||||
}()
|
||||
os.RemoveAll("aof.tmp")
|
||||
f, err := OpenAOF("aof.tmp")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
f.Close()
|
||||
}()
|
||||
for i := 0; i < 12345; i++ {
|
||||
if err := f.Append(StringValue(fmt.Sprintf("hello world #%d\n", i))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
i := 0
|
||||
if err := f.Scan(func(v Value) {
|
||||
s := v.String()
|
||||
e := fmt.Sprintf("hello world #%d\n", i)
|
||||
if s != e {
|
||||
t.Fatalf("#%d is '%s', expect '%s'", i, s, e)
|
||||
}
|
||||
i++
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
f.Close()
|
||||
f, err = OpenAOF("aof.tmp")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c := i
|
||||
for i := c; i < c+12345; i++ {
|
||||
if err := f.Append(StringValue(fmt.Sprintf("hello world #%d\n", i))); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
i = 0
|
||||
if err := f.Scan(func(v Value) {
|
||||
s := v.String()
|
||||
e := fmt.Sprintf("hello world #%d\n", i)
|
||||
if s != e {
|
||||
t.Fatalf("#%d is '%s', expect '%s'", i, s, e)
|
||||
}
|
||||
i++
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
// Copyright 2016 Josh Baker. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
/*
|
||||
Package resp provides a reader, writer, and server implementation for the RESP protocol. http://redis.io/topics/protocol
|
||||
|
||||
RESP is short for "REdis Serialization Protocol".
|
||||
While the protocol was designed specifically for Redis, it can be used for other client-server software projects.
|
||||
|
||||
RESP has the advantages of being human readable and with performance of a binary protocol.
|
||||
*/
|
||||
package resp
|
|
@ -0,0 +1,121 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func ExampleServer() {
|
||||
// ExampleServer is a Redis clone that implements the SET and GET commands.
|
||||
// The server runs on port 6380.
|
||||
// You can interact using the Redis CLI (redis-cli) The http://redis.io/download.
|
||||
// Or, use the telnet by typing in "telnet localhost 6380" and type in "set key value" and "get key".
|
||||
// Or, use a client library such as "http://github.com/garyburd/redigo"
|
||||
// The "QUIT" command will close the connection.
|
||||
var mu sync.RWMutex
|
||||
kvs := make(map[string]string)
|
||||
s := NewServer()
|
||||
s.HandleFunc("set", func(conn *Conn, args []Value) bool {
|
||||
if len(args) != 3 {
|
||||
conn.WriteError(errors.New("ERR wrong number of arguments for 'set' command"))
|
||||
} else {
|
||||
mu.Lock()
|
||||
kvs[args[1].String()] = args[2].String()
|
||||
mu.Unlock()
|
||||
conn.WriteSimpleString("OK")
|
||||
}
|
||||
return true
|
||||
})
|
||||
s.HandleFunc("get", func(conn *Conn, args []Value) bool {
|
||||
if len(args) != 2 {
|
||||
conn.WriteError(errors.New("ERR wrong number of arguments for 'get' command"))
|
||||
} else {
|
||||
mu.RLock()
|
||||
s, ok := kvs[args[1].String()]
|
||||
mu.RUnlock()
|
||||
if !ok {
|
||||
conn.WriteNull()
|
||||
} else {
|
||||
conn.WriteString(s)
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
if err := s.ListenAndServe(":6380"); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleReader() {
|
||||
raw := "*3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n"
|
||||
raw += "*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n"
|
||||
rd := NewReader(bytes.NewBufferString(raw))
|
||||
for {
|
||||
v, _, err := rd.ReadValue()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("Read %s\n", v.Type())
|
||||
if v.Type() == Array {
|
||||
for i, v := range v.Array() {
|
||||
fmt.Printf(" #%d %s, value: '%s'\n", i, v.Type(), v)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Output:
|
||||
// Read Array
|
||||
// #0 BulkString, value: 'set'
|
||||
// #1 BulkString, value: 'leader'
|
||||
// #2 BulkString, value: 'Charlie'
|
||||
// Read Array
|
||||
// #0 BulkString, value: 'set'
|
||||
// #1 BulkString, value: 'follower'
|
||||
// #2 BulkString, value: 'Skyler'
|
||||
}
|
||||
|
||||
func ExampleWriter() {
|
||||
var buf bytes.Buffer
|
||||
wr := NewWriter(&buf)
|
||||
wr.WriteArray([]Value{StringValue("set"), StringValue("leader"), StringValue("Charlie")})
|
||||
wr.WriteArray([]Value{StringValue("set"), StringValue("follower"), StringValue("Skyler")})
|
||||
fmt.Printf("%s", strings.Replace(buf.String(), "\r\n", "\\r\\n", -1))
|
||||
// Output:
|
||||
// *3\r\n$3\r\nset\r\n$6\r\nleader\r\n$7\r\nCharlie\r\n*3\r\n$3\r\nset\r\n$8\r\nfollower\r\n$6\r\nSkyler\r\n
|
||||
}
|
||||
|
||||
func ExampleAOF() {
|
||||
os.RemoveAll("appendonly.aof")
|
||||
|
||||
// create and fill an appendonly file
|
||||
aof, err := OpenAOF("appendonly.aof")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// append a couple values and close the file
|
||||
aof.Append(MultiBulkValue("set", "leader", "Charlie"))
|
||||
aof.Append(MultiBulkValue("set", "follower", "Skyler"))
|
||||
aof.Close()
|
||||
|
||||
// reopen and scan all values
|
||||
aof, err = OpenAOF("appendonly.aof")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer aof.Close()
|
||||
aof.Scan(func(v Value) {
|
||||
fmt.Printf("%s\n", v.String())
|
||||
})
|
||||
|
||||
// Output:
|
||||
// [set leader Charlie]
|
||||
// [set follower Skyler]
|
||||
}
|
|
@ -0,0 +1,725 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const bufsz = 4096
|
||||
|
||||
// Type represents a Value type
|
||||
type Type byte
|
||||
|
||||
const (
|
||||
SimpleString Type = '+'
|
||||
Error Type = '-'
|
||||
Integer Type = ':'
|
||||
BulkString Type = '$'
|
||||
Array Type = '*'
|
||||
)
|
||||
|
||||
// TypeName returns name of the underlying RESP type.
|
||||
func (t Type) String() string {
|
||||
switch t {
|
||||
default:
|
||||
return "Unknown"
|
||||
case '+':
|
||||
return "SimpleString"
|
||||
case '-':
|
||||
return "Error"
|
||||
case ':':
|
||||
return "Integer"
|
||||
case '$':
|
||||
return "BulkString"
|
||||
case '*':
|
||||
return "Array"
|
||||
}
|
||||
}
|
||||
|
||||
// Value represents the data of a valid RESP type.
|
||||
type Value struct {
|
||||
typ Type
|
||||
integer int
|
||||
str []byte
|
||||
array []Value
|
||||
null bool
|
||||
}
|
||||
|
||||
// Integer converts Value to an int. If Value cannot be converted, Zero is returned.
|
||||
func (v Value) Integer() int {
|
||||
switch v.typ {
|
||||
default:
|
||||
n, _ := strconv.ParseInt(v.String(), 10, 64)
|
||||
return int(n)
|
||||
case ':':
|
||||
return v.integer
|
||||
}
|
||||
}
|
||||
|
||||
// String converts Value to a string.
|
||||
func (v Value) String() string {
|
||||
if v.typ == '$' {
|
||||
return string(v.str)
|
||||
}
|
||||
switch v.typ {
|
||||
case '+', '-':
|
||||
return string(v.str)
|
||||
case ':':
|
||||
return strconv.FormatInt(int64(v.integer), 10)
|
||||
case '*':
|
||||
return fmt.Sprintf("%v", v.array)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// Bytes converts the Value to a byte array. An empty string is converted to a non-nil empty byte array. If it's a RESP Null value, nil is returned.
|
||||
func (v Value) Bytes() []byte {
|
||||
switch v.typ {
|
||||
default:
|
||||
return []byte(v.String())
|
||||
case '$', '+', '-':
|
||||
return v.str
|
||||
}
|
||||
}
|
||||
|
||||
// Float converts Value to a float64. If Value cannot be converted, Zero is returned.
|
||||
func (v Value) Float() float64 {
|
||||
switch v.typ {
|
||||
default:
|
||||
f, _ := strconv.ParseFloat(v.String(), 64)
|
||||
return f
|
||||
case ':':
|
||||
return float64(v.integer)
|
||||
}
|
||||
}
|
||||
|
||||
// IsNull indicates whether or not the base value is null.
|
||||
func (v Value) IsNull() bool {
|
||||
return v.null
|
||||
}
|
||||
|
||||
// Bool converts Value to an bool. If Value cannot be converted, false is returned.
|
||||
func (v Value) Bool() bool {
|
||||
return v.Integer() != 0
|
||||
}
|
||||
|
||||
// Error converts the Value to an error. If Value is not an error, nil is returned.
|
||||
func (v Value) Error() error {
|
||||
switch v.typ {
|
||||
case '-':
|
||||
return errors.New(string(v.str))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Array converts the Value to a an array. If Value is not an array or when it's is a RESP Null value, nil is returned.
|
||||
func (v Value) Array() []Value {
|
||||
if v.typ == '*' && !v.null {
|
||||
return v.array
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Type returns the underlying RESP type. The following types are represent valid RESP values.
|
||||
// '+' SimpleString
|
||||
// '-' Error
|
||||
// ':' Integer
|
||||
// '$' BulkString
|
||||
// '*' Array
|
||||
func (v Value) Type() Type {
|
||||
return v.typ
|
||||
}
|
||||
|
||||
func marshalSimpleRESP(typ Type, b []byte) ([]byte, error) {
|
||||
bb := make([]byte, 3+len(b))
|
||||
bb[0] = byte(typ)
|
||||
copy(bb[1:], b)
|
||||
bb[1+len(b)+0] = '\r'
|
||||
bb[1+len(b)+1] = '\n'
|
||||
return bb, nil
|
||||
}
|
||||
|
||||
func marshalBulkRESP(v Value) ([]byte, error) {
|
||||
if v.null {
|
||||
return []byte("$-1\r\n"), nil
|
||||
}
|
||||
szb := []byte(strconv.FormatInt(int64(len(v.str)), 10))
|
||||
bb := make([]byte, 5+len(szb)+len(v.str))
|
||||
bb[0] = '$'
|
||||
copy(bb[1:], szb)
|
||||
bb[1+len(szb)+0] = '\r'
|
||||
bb[1+len(szb)+1] = '\n'
|
||||
copy(bb[1+len(szb)+2:], v.str)
|
||||
bb[1+len(szb)+2+len(v.str)+0] = '\r'
|
||||
bb[1+len(szb)+2+len(v.str)+1] = '\n'
|
||||
return bb, nil
|
||||
}
|
||||
|
||||
func marshalArrayRESP(v Value) ([]byte, error) {
|
||||
if v.null {
|
||||
return []byte("*-1\r\n"), nil
|
||||
}
|
||||
szb := []byte(strconv.FormatInt(int64(len(v.array)), 10))
|
||||
|
||||
var buf bytes.Buffer
|
||||
buf.Grow(3 + len(szb) + 16*len(v.array)) // prime the buffer
|
||||
buf.WriteByte('*')
|
||||
buf.Write(szb)
|
||||
buf.WriteByte('\r')
|
||||
buf.WriteByte('\n')
|
||||
for i := 0; i < len(v.array); i++ {
|
||||
data, err := v.array[i].MarshalRESP()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf.Write(data)
|
||||
}
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
func marshalAnyRESP(v Value) ([]byte, error) {
|
||||
switch v.typ {
|
||||
default:
|
||||
if v.typ == 0 && v.null {
|
||||
return []byte("$-1\r\n"), nil
|
||||
}
|
||||
return nil, errors.New("unknown resp type encountered")
|
||||
case '-', '+':
|
||||
return marshalSimpleRESP(v.typ, v.str)
|
||||
case ':':
|
||||
return marshalSimpleRESP(v.typ, []byte(strconv.FormatInt(int64(v.integer), 10)))
|
||||
case '$':
|
||||
return marshalBulkRESP(v)
|
||||
case '*':
|
||||
return marshalArrayRESP(v)
|
||||
}
|
||||
}
|
||||
|
||||
// Equals compares one value to another value.
|
||||
func (v Value) Equals(value Value) bool {
|
||||
data1, err := v.MarshalRESP()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
data2, err := value.MarshalRESP()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return string(data1) == string(data2)
|
||||
}
|
||||
|
||||
// MarshalRESP returns the original serialized byte representation of Value.
|
||||
// For more information on this format please see http://redis.io/topics/protocol.
|
||||
func (v Value) MarshalRESP() ([]byte, error) {
|
||||
return marshalAnyRESP(v)
|
||||
}
|
||||
|
||||
var nullValue = Value{null: true}
|
||||
|
||||
type errProtocol struct{ msg string }
|
||||
|
||||
func (err errProtocol) Error() string {
|
||||
return "Protocol error: " + err.msg
|
||||
}
|
||||
|
||||
// Reader is a specialized RESP Value type reader.
|
||||
type Reader struct {
|
||||
rd io.Reader
|
||||
buf []byte
|
||||
p, l, s int
|
||||
rerr error
|
||||
}
|
||||
|
||||
// NewReader returns a Reader for reading Value types.
|
||||
func NewReader(rd io.Reader) *Reader {
|
||||
r := &Reader{rd: rd}
|
||||
return r
|
||||
}
|
||||
|
||||
// ReadValue reads the next Value from Reader.
|
||||
func (rd *Reader) ReadValue() (value Value, n int, err error) {
|
||||
value, _, n, err = rd.readValue(false, false)
|
||||
return
|
||||
}
|
||||
|
||||
// ReadMultiBulk reads the next multi bulk Value from Reader.
|
||||
// A multi bulk value is a RESP array that contains one or more bulk strings.
|
||||
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
|
||||
func (rd *Reader) ReadMultiBulk() (value Value, telnet bool, n int, err error) {
|
||||
return rd.readValue(true, false)
|
||||
}
|
||||
|
||||
func (rd *Reader) readValue(multibulk, child bool) (val Value, telnet bool, n int, err error) {
|
||||
var rn int
|
||||
var c byte
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return nullValue, false, n, err
|
||||
}
|
||||
if c == '*' {
|
||||
val, n, err = rd.readArrayValue(multibulk)
|
||||
} else if multibulk && !child {
|
||||
telnet = true
|
||||
} else {
|
||||
switch c {
|
||||
default:
|
||||
if multibulk && child {
|
||||
return nullValue, telnet, n, &errProtocol{"expected '$', got '" + string(c) + "'"}
|
||||
}
|
||||
if child {
|
||||
return nullValue, telnet, n, &errProtocol{"unknown first byte"}
|
||||
}
|
||||
telnet = true
|
||||
case '-', '+':
|
||||
val, n, err = rd.readSimpleValue(c)
|
||||
case ':':
|
||||
val, n, err = rd.readIntegerValue()
|
||||
case '$':
|
||||
val, n, err = rd.readBulkValue()
|
||||
}
|
||||
}
|
||||
if telnet {
|
||||
rd.unreadByte(c)
|
||||
val, n, err = rd.readTelnetMultiBulk()
|
||||
if err == nil {
|
||||
telnet = true
|
||||
}
|
||||
}
|
||||
n += rn
|
||||
if err == io.EOF {
|
||||
return nullValue, telnet, n, io.ErrUnexpectedEOF
|
||||
}
|
||||
return val, telnet, n, err
|
||||
}
|
||||
|
||||
func (rd *Reader) readTelnetMultiBulk() (v Value, n int, err error) {
|
||||
var rn int
|
||||
values := make([]Value, 0, 8)
|
||||
var c byte
|
||||
var bline []byte
|
||||
var quote, mustspace bool
|
||||
for {
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return nullValue, n, err
|
||||
}
|
||||
if c == '\n' {
|
||||
if len(bline) > 0 && bline[len(bline)-1] == '\r' {
|
||||
bline = bline[:len(bline)-1]
|
||||
}
|
||||
break
|
||||
}
|
||||
if mustspace && c != ' ' {
|
||||
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
|
||||
}
|
||||
if c == ' ' {
|
||||
if quote {
|
||||
bline = append(bline, c)
|
||||
} else {
|
||||
values = append(values, Value{typ: '$', str: bline})
|
||||
bline = nil
|
||||
}
|
||||
} else if c == '"' {
|
||||
if quote {
|
||||
mustspace = true
|
||||
} else {
|
||||
if len(bline) > 0 {
|
||||
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
|
||||
}
|
||||
quote = true
|
||||
}
|
||||
} else {
|
||||
bline = append(bline, c)
|
||||
}
|
||||
}
|
||||
if quote {
|
||||
return nullValue, n, &errProtocol{"unbalanced quotes in request"}
|
||||
}
|
||||
if len(bline) > 0 {
|
||||
values = append(values, Value{typ: '$', str: bline})
|
||||
}
|
||||
return Value{typ: '*', array: values}, n, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readSimpleValue(typ byte) (val Value, n int, err error) {
|
||||
var line []byte
|
||||
line, n, err = rd.readLine()
|
||||
if err != nil {
|
||||
return nullValue, n, err
|
||||
}
|
||||
return Value{typ: Type(typ), str: line}, n, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readBulkValue() (val Value, n int, err error) {
|
||||
var rn int
|
||||
var l int
|
||||
l, rn, err = rd.readInt()
|
||||
n += rn
|
||||
if err != nil {
|
||||
if _, ok := err.(*errProtocol); ok {
|
||||
return nullValue, n, &errProtocol{"invalid bulk length"}
|
||||
}
|
||||
return nullValue, n, err
|
||||
}
|
||||
if l < 0 {
|
||||
return Value{typ: '$', null: true}, n, nil
|
||||
}
|
||||
if l > 512*1024*1024 {
|
||||
return nullValue, n, &errProtocol{"invalid bulk length"}
|
||||
}
|
||||
var b []byte
|
||||
b, rn, err = rd.readBytes(l + 2)
|
||||
n += rn
|
||||
if err != nil {
|
||||
return nullValue, n, err
|
||||
}
|
||||
if b[l] != '\r' || b[l+1] != '\n' {
|
||||
return nullValue, n, &errProtocol{"invalid bulk line ending"}
|
||||
}
|
||||
return Value{typ: '$', str: b[:l]}, n, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readArrayValue(multibulk bool) (val Value, n int, err error) {
|
||||
var rn int
|
||||
var l int
|
||||
l, rn, err = rd.readInt()
|
||||
n += rn
|
||||
if err != nil || l > 1024*1024 {
|
||||
if _, ok := err.(*errProtocol); ok {
|
||||
if multibulk {
|
||||
return nullValue, n, &errProtocol{"invalid multibulk length"}
|
||||
}
|
||||
return nullValue, n, &errProtocol{"invalid array length"}
|
||||
}
|
||||
return nullValue, n, err
|
||||
}
|
||||
if l < 0 {
|
||||
return Value{typ: '*', null: true}, n, nil
|
||||
}
|
||||
var aval Value
|
||||
vals := make([]Value, l)
|
||||
for i := 0; i < l; i++ {
|
||||
aval, _, rn, err = rd.readValue(multibulk, true)
|
||||
n += rn
|
||||
if err != nil {
|
||||
return nullValue, n, err
|
||||
}
|
||||
vals[i] = aval
|
||||
}
|
||||
return Value{typ: '*', array: vals}, n, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readIntegerValue() (val Value, n int, err error) {
|
||||
var l int
|
||||
l, n, err = rd.readInt()
|
||||
if err != nil {
|
||||
if _, ok := err.(*errProtocol); ok {
|
||||
return nullValue, n, &errProtocol{"invalid integer"}
|
||||
}
|
||||
return nullValue, n, err
|
||||
}
|
||||
return Value{typ: ':', integer: l}, n, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readInt() (x int, n int, err error) {
|
||||
var rn int
|
||||
var c byte
|
||||
neg := 1
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return 0, n, err
|
||||
}
|
||||
if c == '-' {
|
||||
neg = -1
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return 0, n, err
|
||||
}
|
||||
}
|
||||
var length int
|
||||
for {
|
||||
switch c {
|
||||
default:
|
||||
return 0, n, &errProtocol{"invalid length"}
|
||||
case '\r':
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return 0, n, err
|
||||
}
|
||||
if c != '\n' {
|
||||
return 0, n, &errProtocol{"invalid length"}
|
||||
}
|
||||
return length * neg, n, nil
|
||||
case '0', '1', '2', '3', '4', '5', '6', '7', '8', '9':
|
||||
length = (length * 10) + int(c-'0')
|
||||
}
|
||||
c, rn, err = rd.readByte()
|
||||
n += rn
|
||||
if err != nil {
|
||||
return 0, n, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rd *Reader) readLine() (b []byte, n int, err error) {
|
||||
var lc byte
|
||||
p := rd.p
|
||||
l := rd.l
|
||||
for {
|
||||
// read byte
|
||||
for l == 0 {
|
||||
if err := rd.fillBuffer(true); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
l = rd.l - (p - rd.p)
|
||||
}
|
||||
c := rd.buf[p]
|
||||
p++
|
||||
l--
|
||||
n++
|
||||
if c == '\n' && lc == '\r' {
|
||||
b = rd.buf[rd.p : rd.p+n-2]
|
||||
rd.p = p
|
||||
rd.l -= n
|
||||
return b, n, nil
|
||||
}
|
||||
lc = c
|
||||
}
|
||||
}
|
||||
|
||||
func (rd *Reader) readBytes(count int) (b []byte, n int, err error) {
|
||||
if count < 0 {
|
||||
return nil, 0, errors.New("invalid argument")
|
||||
}
|
||||
for rd.l < count {
|
||||
if err := rd.fillBuffer(false); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
}
|
||||
b = rd.buf[rd.p : rd.p+count]
|
||||
rd.p += count
|
||||
rd.l -= count
|
||||
return b, count, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) readByte() (c byte, n int, err error) {
|
||||
for rd.l < 1 {
|
||||
if err := rd.fillBuffer(false); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
c = rd.buf[rd.p]
|
||||
rd.p++
|
||||
rd.l--
|
||||
return c, 1, nil
|
||||
}
|
||||
|
||||
func (rd *Reader) unreadByte(c byte) {
|
||||
if rd.p > 0 {
|
||||
rd.p--
|
||||
rd.l++
|
||||
rd.buf[rd.p] = c
|
||||
return
|
||||
}
|
||||
buf := make([]byte, rd.l+1)
|
||||
buf[0] = c
|
||||
copy(buf[1:], rd.buf[:rd.l])
|
||||
rd.l++
|
||||
rd.s = rd.l
|
||||
}
|
||||
|
||||
func (rd *Reader) fillBuffer(ignoreRebuffering bool) error {
|
||||
if rd.rerr != nil {
|
||||
return rd.rerr
|
||||
}
|
||||
buf := make([]byte, bufsz)
|
||||
n, err := rd.rd.Read(buf)
|
||||
rd.rerr = err
|
||||
if n > 0 {
|
||||
if !ignoreRebuffering && rd.l == 0 {
|
||||
rd.l = n
|
||||
rd.s = n
|
||||
rd.p = 0
|
||||
rd.buf = buf
|
||||
} else {
|
||||
rd.buf = append(rd.buf, buf[:n]...)
|
||||
rd.s += n
|
||||
rd.l += n
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return rd.rerr
|
||||
}
|
||||
|
||||
// AnyValue returns a RESP value from an interface. This function infers the types. Arrays are not allowed.
|
||||
func AnyValue(v interface{}) Value {
|
||||
switch v := v.(type) {
|
||||
default:
|
||||
return StringValue(fmt.Sprintf("%v", v))
|
||||
case nil:
|
||||
return NullValue()
|
||||
case int:
|
||||
return IntegerValue(int(v))
|
||||
case uint:
|
||||
return IntegerValue(int(v))
|
||||
case int8:
|
||||
return IntegerValue(int(v))
|
||||
case uint8:
|
||||
return IntegerValue(int(v))
|
||||
case int16:
|
||||
return IntegerValue(int(v))
|
||||
case uint16:
|
||||
return IntegerValue(int(v))
|
||||
case int32:
|
||||
return IntegerValue(int(v))
|
||||
case uint32:
|
||||
return IntegerValue(int(v))
|
||||
case int64:
|
||||
return IntegerValue(int(v))
|
||||
case uint64:
|
||||
return IntegerValue(int(v))
|
||||
case bool:
|
||||
return BoolValue(v)
|
||||
case float32:
|
||||
return FloatValue(float64(v))
|
||||
case float64:
|
||||
return FloatValue(float64(v))
|
||||
case []byte:
|
||||
return BytesValue(v)
|
||||
case string:
|
||||
return StringValue(v)
|
||||
}
|
||||
}
|
||||
|
||||
// SimpleStringValue returns a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces.
|
||||
func SimpleStringValue(s string) Value { return Value{typ: '+', str: []byte(formSingleLine(s))} }
|
||||
|
||||
// BytesValue returns a RESP bulk string. A bulk string can represent any data.
|
||||
func BytesValue(b []byte) Value { return Value{typ: '$', str: b} }
|
||||
|
||||
// StringValue returns a RESP bulk string. A bulk string can represent any data.
|
||||
func StringValue(s string) Value { return Value{typ: '$', str: []byte(s)} }
|
||||
|
||||
// NullValue returns a RESP null bulk string.
|
||||
func NullValue() Value { return Value{typ: '$', null: true} }
|
||||
|
||||
// ErrorValue returns a RESP error.
|
||||
func ErrorValue(err error) Value {
|
||||
if err == nil {
|
||||
return Value{typ: '-'}
|
||||
}
|
||||
return Value{typ: '-', str: []byte(err.Error())}
|
||||
}
|
||||
|
||||
// IntegerValue returns a RESP integer.
|
||||
func IntegerValue(i int) Value { return Value{typ: ':', integer: i} }
|
||||
|
||||
// BoolValue returns a RESP integer representation of a bool.
|
||||
func BoolValue(t bool) Value {
|
||||
if t {
|
||||
return Value{typ: ':', integer: 1}
|
||||
}
|
||||
return Value{typ: ':', integer: 0}
|
||||
}
|
||||
|
||||
// FloatValue returns a RESP bulk string representation of a float.
|
||||
func FloatValue(f float64) Value { return StringValue(strconv.FormatFloat(f, 'f', -1, 64)) }
|
||||
|
||||
// ArrayValue returns a RESP array.
|
||||
func ArrayValue(vals []Value) Value { return Value{typ: '*', array: vals} }
|
||||
|
||||
func formSingleLine(s string) string {
|
||||
bs1 := []byte(s)
|
||||
for i := 0; i < len(bs1); i++ {
|
||||
switch bs1[i] {
|
||||
case '\r', '\n':
|
||||
bs2 := make([]byte, len(bs1))
|
||||
copy(bs2, bs1)
|
||||
bs2[i] = ' '
|
||||
i++
|
||||
for ; i < len(bs2); i++ {
|
||||
switch bs1[i] {
|
||||
case '\r', '\n':
|
||||
bs2[i] = ' '
|
||||
}
|
||||
}
|
||||
return string(bs2)
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// MultiBulkValue returns a RESP array which contains one or more bulk strings.
|
||||
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
|
||||
func MultiBulkValue(commandName string, args ...interface{}) Value {
|
||||
vals := make([]Value, len(args)+1)
|
||||
vals[0] = StringValue(commandName)
|
||||
for i, arg := range args {
|
||||
switch arg := arg.(type) {
|
||||
default:
|
||||
vals[i+1] = StringValue(fmt.Sprintf("%v", arg))
|
||||
case []byte:
|
||||
vals[i+1] = StringValue(string(arg))
|
||||
case string:
|
||||
vals[i+1] = StringValue(arg)
|
||||
case nil:
|
||||
vals[i+1] = NullValue()
|
||||
}
|
||||
}
|
||||
return ArrayValue(vals)
|
||||
}
|
||||
|
||||
// Writer is a specialized RESP Value type writer.
|
||||
type Writer struct {
|
||||
wr io.Writer
|
||||
}
|
||||
|
||||
// NewWriter returns a new Writer.
|
||||
func NewWriter(wr io.Writer) *Writer {
|
||||
return &Writer{wr}
|
||||
}
|
||||
|
||||
// WriteValue writes a RESP Value.
|
||||
func (wr *Writer) WriteValue(v Value) error {
|
||||
b, err := v.MarshalRESP()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = wr.wr.Write(b)
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteSimpleString writes a RESP simple string. A simple string has no new lines. The carriage return and new line characters are replaced with spaces.
|
||||
func (wr *Writer) WriteSimpleString(s string) error { return wr.WriteValue(SimpleStringValue(s)) }
|
||||
|
||||
// WriteBytes writes a RESP bulk string. A bulk string can represent any data.
|
||||
func (wr *Writer) WriteBytes(b []byte) error { return wr.WriteValue(BytesValue(b)) }
|
||||
|
||||
// WriteString writes a RESP bulk string. A bulk string can represent any data.
|
||||
func (wr *Writer) WriteString(s string) error { return wr.WriteValue(StringValue(s)) }
|
||||
|
||||
// WriteNull writes a RESP null bulk string.
|
||||
func (wr *Writer) WriteNull() error { return wr.WriteValue(NullValue()) }
|
||||
|
||||
// WriteError writes a RESP error.
|
||||
func (wr *Writer) WriteError(err error) error { return wr.WriteValue(ErrorValue(err)) }
|
||||
|
||||
// WriteInteger writes a RESP integer.
|
||||
func (wr *Writer) WriteInteger(i int) error { return wr.WriteValue(IntegerValue(i)) }
|
||||
|
||||
// WriteArray writes a RESP array.
|
||||
func (wr *Writer) WriteArray(vals []Value) error { return wr.WriteValue(ArrayValue(vals)) }
|
||||
|
||||
// WriteMultiBulk writes a RESP array which contains one or more bulk strings.
|
||||
// For more information on RESP arrays and strings please see http://redis.io/topics/protocol.
|
||||
func (wr *Writer) WriteMultiBulk(commandName string, args ...interface{}) error {
|
||||
return wr.WriteValue(MultiBulkValue(commandName, args...))
|
||||
}
|
|
@ -0,0 +1,234 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestIntegers(t *testing.T) {
|
||||
var n, rn int
|
||||
var v Value
|
||||
var err error
|
||||
data := []byte(":1234567\r\n:-90898\r\n:0\r\n")
|
||||
r := NewReader(bytes.NewBuffer(data))
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Integer() != 1234567 {
|
||||
t.Fatalf("invalid integer: expected %d, got %d", 1234567, v.Integer())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Integer() != -90898 {
|
||||
t.Fatalf("invalid integer: expected %d, got %d", -90898, v.Integer())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Integer() != 0 {
|
||||
t.Fatalf("invalid integer: expected %d, got %d", 0, v.Integer())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != io.EOF {
|
||||
t.Fatalf("invalid error: expected %v, got %v", io.EOF, err)
|
||||
}
|
||||
if n != len(data) {
|
||||
t.Fatalf("invalid read count: expected %d, got %d", len(data), n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFloats(t *testing.T) {
|
||||
var n, rn int
|
||||
var v Value
|
||||
var err error
|
||||
data := []byte(":1234567\r\n+-90898\r\n$6\r\n12.345\r\n-90284.987\r\n")
|
||||
r := NewReader(bytes.NewBuffer(data))
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Float() != 1234567 {
|
||||
t.Fatalf("invalid integer: expected %v, got %v", 1234567, v.Float())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Float() != -90898 {
|
||||
t.Fatalf("invalid integer: expected %v, got %v", -90898, v.Float())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Float() != 12.345 {
|
||||
t.Fatalf("invalid integer: expected %v, got %v", 12.345, v.Float())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if v.Float() != 90284.987 {
|
||||
t.Fatalf("invalid integer: expected %v, got %v", 90284.987, v.Float())
|
||||
}
|
||||
v, rn, err = r.ReadValue()
|
||||
n += rn
|
||||
if err != io.EOF {
|
||||
t.Fatalf("invalid error: expected %v, got %v", io.EOF, err)
|
||||
}
|
||||
if n != len(data) {
|
||||
t.Fatalf("invalid read count: expected %d, got %d", len(data), n)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLotsaRandomness does generates N resp messages and reads the values though a Reader.
|
||||
// It then marshals the values back to strings and compares to the original.
|
||||
// All data and resp types are random.
|
||||
|
||||
func TestLotsaRandomness(t *testing.T) {
|
||||
n := 10000
|
||||
var anys []string
|
||||
var buf bytes.Buffer
|
||||
for i := 0; i < n; i++ {
|
||||
any := randRESPAny()
|
||||
anys = append(anys, any)
|
||||
buf.WriteString(any)
|
||||
}
|
||||
r := NewReader(bytes.NewBuffer(buf.Bytes()))
|
||||
for i := 0; i < n; i++ {
|
||||
v, _, err := r.ReadValue()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp, err := v.MarshalRESP()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if string(resp) != anys[i] {
|
||||
t.Fatalf("resp failed to remarshal #%d\n-- original --\n%s\n-- remarshalled --\n%s\n-- done --", i, anys[i], string(resp))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func randRESPInteger() string {
|
||||
return fmt.Sprintf(":%d\r\n", (randInt()%1000000)-500000)
|
||||
}
|
||||
func randRESPSimpleString() string {
|
||||
return "+" + strings.Replace(randString(), "\r\n", "", -1) + "\r\n"
|
||||
}
|
||||
func randRESPError() string {
|
||||
return "-" + strings.Replace(randString(), "\r\n", "", -1) + "\r\n"
|
||||
}
|
||||
func randRESPBulkString() string {
|
||||
s := randString()
|
||||
if len(s)%1024 == 0 {
|
||||
return "$-1\r\n"
|
||||
}
|
||||
return "$" + strconv.FormatInt(int64(len(s)), 10) + "\r\n" + s + "\r\n"
|
||||
}
|
||||
func randRESPArray() string {
|
||||
n := randInt() % 10
|
||||
if n%10 == 0 {
|
||||
return "$-1\r\n"
|
||||
}
|
||||
s := "*" + strconv.FormatInt(int64(n), 10) + "\r\n"
|
||||
for i := 0; i < n; i++ {
|
||||
rn := randInt() % 100
|
||||
if rn == 0 {
|
||||
s += randRESPArray()
|
||||
} else {
|
||||
switch (rn - 1) % 4 {
|
||||
case 0:
|
||||
s += randRESPInteger()
|
||||
case 1:
|
||||
s += randRESPSimpleString()
|
||||
case 2:
|
||||
s += randRESPError()
|
||||
case 3:
|
||||
s += randRESPBulkString()
|
||||
}
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func randInt() int {
|
||||
n := int(binary.LittleEndian.Uint64(randBytes(8)))
|
||||
if n < 0 {
|
||||
n *= -1
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func randBytes(n int) []byte {
|
||||
b := make([]byte, n)
|
||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||
panic("random error: " + err.Error())
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func randString() string {
|
||||
return string(randBytes(randInt() % 1024))
|
||||
}
|
||||
|
||||
func randRESPAny() string {
|
||||
switch randInt() % 5 {
|
||||
case 0:
|
||||
return randRESPInteger()
|
||||
case 1:
|
||||
return randRESPSimpleString()
|
||||
case 2:
|
||||
return randRESPError()
|
||||
case 3:
|
||||
return randRESPBulkString()
|
||||
case 4:
|
||||
return randRESPArray()
|
||||
}
|
||||
panic("?")
|
||||
}
|
||||
|
||||
func BenchmarkRead(b *testing.B) {
|
||||
n := 1000
|
||||
var buf bytes.Buffer
|
||||
for k := 0; k < n; k++ {
|
||||
buf.WriteString(randRESPAny())
|
||||
}
|
||||
bb := buf.Bytes()
|
||||
b.ResetTimer()
|
||||
var j int
|
||||
var r *Reader
|
||||
//start := time.Now()
|
||||
var k int
|
||||
for i := 0; i < b.N; i++ {
|
||||
if j == 0 {
|
||||
r = NewReader(bytes.NewBuffer(bb))
|
||||
j = n
|
||||
}
|
||||
_, _, err := r.ReadValue()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
j--
|
||||
k++
|
||||
}
|
||||
//fmt.Printf("\n%f\n", float64(k)/(float64(time.Now().Sub(start))/float64(time.Second)))
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Server represents a RESP server which handles reading RESP Values.
|
||||
type Server struct {
|
||||
mu sync.RWMutex
|
||||
handlers map[string]func(conn *Conn, args []Value) bool
|
||||
accept func(conn *Conn) bool
|
||||
}
|
||||
|
||||
// Conn represents a RESP network connection.
|
||||
type Conn struct {
|
||||
*Reader
|
||||
*Writer
|
||||
base net.Conn
|
||||
RemoteAddr string
|
||||
}
|
||||
|
||||
// NewConn returns a Conn.
|
||||
func NewConn(conn net.Conn) *Conn {
|
||||
return &Conn{
|
||||
Reader: NewReader(conn),
|
||||
Writer: NewWriter(conn),
|
||||
base: conn,
|
||||
RemoteAddr: conn.RemoteAddr().String(),
|
||||
}
|
||||
}
|
||||
|
||||
// NewServer returns a new Server.
|
||||
func NewServer() *Server {
|
||||
return &Server{
|
||||
handlers: make(map[string]func(conn *Conn, args []Value) bool),
|
||||
}
|
||||
}
|
||||
|
||||
// HandleFunc registers the handler function for the given command.
|
||||
// The conn parameter is a Conn type and it can be used to read and write further RESP messages from and to the connection.
|
||||
// Returning false will close the connection.
|
||||
func (s *Server) HandleFunc(command string, handler func(conn *Conn, args []Value) bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.handlers[strings.ToUpper(command)] = handler
|
||||
}
|
||||
|
||||
// AcceptFunc registers a function for accepting connections.
|
||||
// Calling this function is optional and it allows for total control over reading and writing RESP Values from and to the connections.
|
||||
// Returning false will close the connection.
|
||||
func (s *Server) AcceptFunc(accept func(conn *Conn) bool) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.accept = accept
|
||||
}
|
||||
|
||||
// ListenAndServe listens on the TCP network address addr for incoming connections.
|
||||
func (s *Server) ListenAndServe(addr string) error {
|
||||
l, err := net.Listen("tcp", addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer l.Close()
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
err = s.handleConn(conn)
|
||||
if err != nil {
|
||||
if _, ok := err.(*errProtocol); ok {
|
||||
io.WriteString(conn, "-ERR "+formSingleLine(err.Error())+"\r\n")
|
||||
} else {
|
||||
io.WriteString(conn, "-ERR unknown error\r\n")
|
||||
}
|
||||
}
|
||||
conn.Close()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleConn(nconn net.Conn) error {
|
||||
conn := NewConn(nconn)
|
||||
s.mu.RLock()
|
||||
accept := s.accept
|
||||
s.mu.RUnlock()
|
||||
if accept != nil {
|
||||
if !accept(conn) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
for {
|
||||
v, _, _, err := conn.ReadMultiBulk()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
values := v.Array()
|
||||
if len(values) == 0 {
|
||||
continue
|
||||
}
|
||||
lccommandName := values[0].String()
|
||||
commandName := strings.ToUpper(lccommandName)
|
||||
s.mu.RLock()
|
||||
h := s.handlers[commandName]
|
||||
s.mu.RUnlock()
|
||||
switch commandName {
|
||||
case "QUIT":
|
||||
if h == nil {
|
||||
conn.WriteSimpleString("OK")
|
||||
return nil
|
||||
}
|
||||
case "PING":
|
||||
if h == nil {
|
||||
if err := conn.WriteSimpleString("PONG"); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if h == nil {
|
||||
if err := conn.WriteError(errors.New("ERR unknown command '" + lccommandName + "'")); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if !h(conn, values) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
package resp
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestServer(t *testing.T) {
|
||||
// Use the example server in example_test
|
||||
go func() {
|
||||
ExampleServer()
|
||||
}()
|
||||
if os.Getenv("WAIT_ON_TEST_SERVER") == "1" {
|
||||
select {}
|
||||
}
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
|
||||
n := 75
|
||||
|
||||
// Open N connections and do a bunch of stuff.
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(n)
|
||||
for i := 0; i < n; i++ {
|
||||
go func(i int) {
|
||||
defer func() {
|
||||
wg.Done()
|
||||
}()
|
||||
nconn, err := net.Dial("tcp", ":6380")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer nconn.Close()
|
||||
conn := NewConn(nconn)
|
||||
|
||||
// PING
|
||||
if err := conn.WriteMultiBulk("PING"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
val, _, err := conn.ReadValue()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val.String() != "PONG" {
|
||||
t.Fatalf("expecting 'PONG', got '%s'", val)
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("key:%d", i)
|
||||
|
||||
// SET
|
||||
if err := conn.WriteMultiBulk("SET", key, 123.4); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
val, _, err = conn.ReadValue()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val.String() != "OK" {
|
||||
t.Fatalf("expecting 'OK', got '%s'", val)
|
||||
}
|
||||
|
||||
// GET
|
||||
if err := conn.WriteMultiBulk("GET", key); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
val, _, err = conn.ReadValue()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val.Float() != 123.4 {
|
||||
t.Fatalf("expecting '123.4', got '%s'", val)
|
||||
}
|
||||
|
||||
// QUIT
|
||||
if err := conn.WriteMultiBulk("QUIT"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
val, _, err = conn.ReadValue()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if val.String() != "OK" {
|
||||
t.Fatalf("expecting 'OK', got '%s'", val)
|
||||
}
|
||||
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
Loading…
Reference in New Issue