Merge pull request #169 from go-redis/feature/geo

Feature/geo
This commit is contained in:
Vladimir Mihailenco 2015-10-07 17:16:35 +03:00
commit 5d95a32e25
12 changed files with 522 additions and 183 deletions

View File

@ -6,7 +6,7 @@ services:
go: go:
- 1.3 - 1.3
- 1.4 - 1.4
- tip - 1.5
install: install:
- go get gopkg.in/bsm/ratelimit.v1 - go get gopkg.in/bsm/ratelimit.v1

View File

@ -1,9 +1,9 @@
all: testdeps all: testdeps
go test ./... -v=1 -cpu=1,2,4 go test ./... -test.v -test.cpu=1,2,4
go test ./... -short -race go test ./... -test.short -test.race
test: testdeps test: testdeps
go test ./... -v=1 go test ./... -test.v=1
testdeps: .test/redis/src/redis-server testdeps: .test/redis/src/redis-server
@ -11,7 +11,7 @@ testdeps: .test/redis/src/redis-server
.test/redis: .test/redis:
mkdir -p $@ mkdir -p $@
wget -qO- https://github.com/antirez/redis/archive/3.0.3.tar.gz | tar xvz --strip-components=1 -C $@ wget -qO- https://github.com/antirez/redis/archive/unstable.tar.gz | tar xvz --strip-components=1 -C $@
.test/redis/src/redis-server: .test/redis .test/redis/src/redis-server: .test/redis
cd $< && make all cd $< && make all

View File

@ -100,7 +100,7 @@ func (pipe *ClusterPipeline) execClusterCmds(
var firstCmdErr error var firstCmdErr error
for i, cmd := range cmds { for i, cmd := range cmds {
err := cmd.parseReply(cn) err := cmd.readReply(cn)
if err == nil { if err == nil {
continue continue
} }

View File

@ -28,7 +28,7 @@ var (
type Cmder interface { type Cmder interface {
args() []interface{} args() []interface{}
parseReply(*conn) error readReply(*conn) error
setErr(error) setErr(error)
reset() reset()
@ -152,14 +152,20 @@ func (cmd *Cmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *Cmd) parseReply(cn *conn) error { func (cmd *Cmd) readReply(cn *conn) error {
cmd.val, cmd.err = parseReply(cn, parseSlice) val, err := readReply(cn, sliceParser)
if err != nil {
cmd.err = err
return cmd.err
}
if v, ok := val.([]byte); ok {
// Convert to string to preserve old behaviour. // Convert to string to preserve old behaviour.
// TODO: remove in v4 // TODO: remove in v4
if v, ok := cmd.val.([]byte); ok {
cmd.val = string(v) cmd.val = string(v)
} else {
cmd.val = val
} }
return cmd.err return nil
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -191,8 +197,8 @@ func (cmd *SliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *SliceCmd) parseReply(cn *conn) error { func (cmd *SliceCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseSlice) v, err := readReply(cn, sliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -234,8 +240,8 @@ func (cmd *StatusCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StatusCmd) parseReply(cn *conn) error { func (cmd *StatusCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -273,8 +279,8 @@ func (cmd *IntCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *IntCmd) parseReply(cn *conn) error { func (cmd *IntCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -316,8 +322,8 @@ func (cmd *DurationCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *DurationCmd) parseReply(cn *conn) error { func (cmd *DurationCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -357,8 +363,8 @@ func (cmd *BoolCmd) String() string {
var ok = []byte("OK") var ok = []byte("OK")
func (cmd *BoolCmd) parseReply(cn *conn) error { func (cmd *BoolCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
// `SET key value NX` returns nil when key already exists, which // `SET key value NX` returns nil when key already exists, which
// is inconsistent with `SETNX key value`. // is inconsistent with `SETNX key value`.
// TODO: is this okay? // TODO: is this okay?
@ -443,8 +449,8 @@ func (cmd *StringCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringCmd) parseReply(cn *conn) error { func (cmd *StringCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -482,8 +488,8 @@ func (cmd *FloatCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *FloatCmd) parseReply(cn *conn) error { func (cmd *FloatCmd) readReply(cn *conn) error {
v, err := parseReply(cn, nil) v, err := readReply(cn, nil)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -522,8 +528,8 @@ func (cmd *StringSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringSliceCmd) parseReply(cn *conn) error { func (cmd *StringSliceCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseStringSlice) v, err := readReply(cn, stringSliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -561,8 +567,8 @@ func (cmd *BoolSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *BoolSliceCmd) parseReply(cn *conn) error { func (cmd *BoolSliceCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseBoolSlice) v, err := readReply(cn, boolSliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -600,8 +606,8 @@ func (cmd *StringStringMapCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *StringStringMapCmd) parseReply(cn *conn) error { func (cmd *StringStringMapCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseStringStringMap) v, err := readReply(cn, stringStringMapParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -639,8 +645,8 @@ func (cmd *StringIntMapCmd) reset() {
cmd.err = nil cmd.err = nil
} }
func (cmd *StringIntMapCmd) parseReply(cn *conn) error { func (cmd *StringIntMapCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseStringIntMap) v, err := readReply(cn, stringIntMapParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -678,8 +684,8 @@ func (cmd *ZSliceCmd) String() string {
return cmdString(cmd, cmd.val) return cmdString(cmd, cmd.val)
} }
func (cmd *ZSliceCmd) parseReply(cn *conn) error { func (cmd *ZSliceCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseZSlice) v, err := readReply(cn, zSliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -719,8 +725,8 @@ func (cmd *ScanCmd) String() string {
return cmdString(cmd, cmd.keys) return cmdString(cmd, cmd.keys)
} }
func (cmd *ScanCmd) parseReply(cn *conn) error { func (cmd *ScanCmd) readReply(cn *conn) error {
vi, err := parseReply(cn, parseSlice) vi, err := readReply(cn, sliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return cmd.err return cmd.err
@ -743,7 +749,8 @@ func (cmd *ScanCmd) parseReply(cn *conn) error {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
type ClusterSlotInfo struct { type ClusterSlotInfo struct {
Start, End int Start int
End int
Addrs []string Addrs []string
} }
@ -774,8 +781,8 @@ func (cmd *ClusterSlotCmd) reset() {
cmd.err = nil cmd.err = nil
} }
func (cmd *ClusterSlotCmd) parseReply(cn *conn) error { func (cmd *ClusterSlotCmd) readReply(cn *conn) error {
v, err := parseReply(cn, parseClusterSlotInfoSlice) v, err := readReply(cn, clusterSlotInfoSliceParser)
if err != nil { if err != nil {
cmd.err = err cmd.err = err
return err return err
@ -783,3 +790,65 @@ func (cmd *ClusterSlotCmd) parseReply(cn *conn) error {
cmd.val = v.([]ClusterSlotInfo) cmd.val = v.([]ClusterSlotInfo)
return nil return nil
} }
//------------------------------------------------------------------------------
// GeoLocation is used with GeoAdd to add geospatial location.
type GeoLocation struct {
Name string
Longitude, Latitude, Distance float64
GeoHash int64
}
// GeoRadiusQuery is used with GeoRadius to query geospatial index.
type GeoRadiusQuery struct {
Key string
Longitude float64
Latitude float64
Radius float64
// Can be m, km, ft, or mi. Default is km.
Unit string
WithCoordinates bool
WithDistance bool
WithGeoHash bool
Count int
// Can be ASC or DESC. Default is no sort order.
Sort string
}
type GeoLocationCmd struct {
baseCmd
locations []GeoLocation
}
func NewGeoLocationCmd(args ...interface{}) *GeoLocationCmd {
return &GeoLocationCmd{baseCmd: baseCmd{_args: args, _clusterKeyPos: 1}}
}
func (cmd *GeoLocationCmd) reset() {
cmd.locations = nil
cmd.err = nil
}
func (cmd *GeoLocationCmd) Val() []GeoLocation {
return cmd.locations
}
func (cmd *GeoLocationCmd) Result() ([]GeoLocation, error) {
return cmd.locations, cmd.err
}
func (cmd *GeoLocationCmd) String() string {
return cmdString(cmd, cmd.locations)
}
func (cmd *GeoLocationCmd) readReply(cn *conn) error {
reply, err := readReply(cn, geoLocationSliceParser)
if err != nil {
cmd.err = err
return err
}
cmd.locations = reply.([]GeoLocation)
return nil
}

View File

@ -1671,3 +1671,52 @@ func (c *commandable) ClusterAddSlotsRange(min, max int) *StatusCmd {
} }
return c.ClusterAddSlots(slots...) return c.ClusterAddSlots(slots...)
} }
//------------------------------------------------------------------------------
func (c *commandable) GeoAdd(key string, geoLocation ...*GeoLocation) *IntCmd {
args := make([]interface{}, 2+3*len(geoLocation))
args[0] = "GEOADD"
args[1] = key
for i, eachLoc := range geoLocation {
args[2+3*i] = eachLoc.Longitude
args[2+3*i+1] = eachLoc.Latitude
args[2+3*i+2] = eachLoc.Name
}
cmd := NewIntCmd(args...)
c.Process(cmd)
return cmd
}
func (c *commandable) GeoRadius(query *GeoRadiusQuery) *GeoLocationCmd {
args := make([]interface{}, 6)
args[0] = "GEORADIUS"
args[1] = query.Key
args[2] = query.Longitude
args[3] = query.Latitude
args[4] = query.Radius
if query.Unit != "" {
args[5] = query.Unit
} else {
args[5] = "km"
}
if query.WithCoordinates {
args = append(args, "WITHCOORD")
}
if query.WithDistance {
args = append(args, "WITHDIST")
}
if query.WithGeoHash {
args = append(args, "WITHHASH")
}
if query.Count > 0 {
args = append(args, "COUNT", query.Count)
}
if query.Sort != "" {
args = append(args, query.Sort)
}
cmd := NewGeoLocationCmd(args...)
c.Process(cmd)
return cmd
}

View File

@ -193,7 +193,7 @@ var _ = Describe("Commands", func() {
dump := client.Dump("key") dump := client.Dump("key")
Expect(dump.Err()).NotTo(HaveOccurred()) Expect(dump.Err()).NotTo(HaveOccurred())
Expect(dump.Val()).To(Equal("\x00\x05hello\x06\x00\xf5\x9f\xb7\xf6\x90a\x1c\x99")) Expect(dump.Val()).NotTo(BeEmpty())
}) })
It("should Exists", func() { It("should Exists", func() {
@ -2521,6 +2521,66 @@ var _ = Describe("Commands", func() {
}) })
Describe("Geo add and radius search", func() {
It("should add one geo location", func() {
geoAdd := client.GeoAdd("Sicily", &redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"})
Expect(geoAdd.Err()).NotTo(HaveOccurred())
Expect(geoAdd.Val()).To(Equal(int64(1)))
})
It("should add multiple geo locations", func() {
geoAdd := client.GeoAdd("Sicily", &redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"})
Expect(geoAdd.Err()).NotTo(HaveOccurred())
Expect(geoAdd.Val()).To(Equal(int64(2)))
})
It("should search geo radius", func() {
geoAdd := client.GeoAdd("Sicily", &redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"})
Expect(geoAdd.Err()).NotTo(HaveOccurred())
Expect(geoAdd.Val()).To(Equal(int64(2)))
geoRadius := client.GeoRadius(&redis.GeoRadiusQuery{Key: "Sicily", Longitude: 15, Latitude: 37, Radius: 200})
Expect(geoRadius.Err()).NotTo(HaveOccurred())
Expect(geoRadius.Val()[0].Name).To(Equal("Palermo"))
Expect(geoRadius.Val()[1].Name).To(Equal("Catania"))
})
It("should search geo radius with options", func() {
locations := []*redis.GeoLocation{&redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"}}
geoAdd := client.GeoAdd("Sicily", locations...)
Expect(geoAdd.Err()).NotTo(HaveOccurred())
Expect(geoAdd.Val()).To(Equal(int64(2)))
geoRadius := client.GeoRadius(&redis.GeoRadiusQuery{Key: "Sicily", Longitude: 15, Latitude: 37, Radius: 200, Unit: "km", WithGeoHash: true, WithCoordinates: true, WithDistance: true, Count: 2, Sort: "ASC"})
Expect(geoRadius.Err()).NotTo(HaveOccurred())
Expect(geoRadius.Val()[1].Name).To(Equal("Palermo"))
Expect(geoRadius.Val()[1].Distance).To(Equal(190.4424))
Expect(geoRadius.Val()[1].GeoHash).To(Equal(int64(3479099956230698)))
Expect(geoRadius.Val()[1].Longitude).To(Equal(13.361389338970184))
Expect(geoRadius.Val()[1].Latitude).To(Equal(38.115556395496299))
Expect(geoRadius.Val()[0].Name).To(Equal("Catania"))
Expect(geoRadius.Val()[0].Distance).To(Equal(56.4413))
Expect(geoRadius.Val()[0].GeoHash).To(Equal(int64(3479447370796909)))
Expect(geoRadius.Val()[0].Longitude).To(Equal(15.087267458438873))
Expect(geoRadius.Val()[0].Latitude).To(Equal(37.50266842333162))
})
It("should search geo radius with no results", func() {
geoAdd := client.GeoAdd("Sicily", &redis.GeoLocation{Longitude: 13.361389, Latitude: 38.115556, Name: "Palermo"},
&redis.GeoLocation{Longitude: 15.087269, Latitude: 37.502669, Name: "Catania"})
Expect(geoAdd.Err()).NotTo(HaveOccurred())
Expect(geoAdd.Val()).To(Equal(int64(2)))
geoRadius := client.GeoRadius(&redis.GeoRadiusQuery{Key: "Sicily", Longitude: 99, Latitude: 37, Radius: 200, Unit: "km", WithGeoHash: true, WithCoordinates: true, WithDistance: true})
Expect(geoRadius.Err()).NotTo(HaveOccurred())
Expect(len(geoRadius.Val())).To(Equal(0))
})
})
Describe("marshaling/unmarshaling", func() { Describe("marshaling/unmarshaling", func() {
type convTest struct { type convTest struct {

View File

@ -116,7 +116,7 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
// Parse queued replies. // Parse queued replies.
for i := 0; i < cmdsLen; i++ { for i := 0; i < cmdsLen; i++ {
if err := statusCmd.parseReply(cn); err != nil { if err := statusCmd.readReply(cn); err != nil {
setCmdsErr(cmds[1:len(cmds)-1], err) setCmdsErr(cmds[1:len(cmds)-1], err)
return err return err
} }
@ -144,7 +144,7 @@ func (c *Multi) execCmds(cn *conn, cmds []Cmder) error {
// Loop starts from 1 to omit MULTI cmd. // Loop starts from 1 to omit MULTI cmd.
for i := 1; i < cmdsLen; i++ { for i := 1; i < cmdsLen; i++ {
cmd := cmds[i] cmd := cmds[i]
if err := cmd.parseReply(cn); err != nil { if err := cmd.readReply(cn); err != nil {
if firstCmdErr == nil { if firstCmdErr == nil {
firstCmdErr = err firstCmdErr = err
} }

385
parser.go
View File

@ -8,6 +8,14 @@ import (
"strconv" "strconv"
) )
const (
errorReply = '-'
statusReply = '+'
intReply = ':'
stringReply = '$'
arrayReply = '*'
)
type multiBulkParser func(cn *conn, n int64) (interface{}, error) type multiBulkParser func(cn *conn, n int64) (interface{}, error)
var ( var (
@ -239,24 +247,34 @@ func readN(cn *conn, n int) ([]byte, error) {
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func parseReply(cn *conn, p multiBulkParser) (interface{}, error) { func parseErrorReply(cn *conn, line []byte) error {
return errorf(string(line[1:]))
}
func parseIntReply(cn *conn, line []byte) (int64, error) {
n, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64)
if err != nil {
return 0, err
}
return n, nil
}
func readIntReply(cn *conn) (int64, error) {
line, err := readLine(cn) line, err := readLine(cn)
if err != nil { if err != nil {
return nil, err return 0, err
} }
switch line[0] { switch line[0] {
case '-': case errorReply:
return nil, errorf(string(line[1:])) return 0, parseErrorReply(cn, line)
case '+': case intReply:
return line[1:], nil return parseIntReply(cn, line)
case ':': default:
v, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64) return 0, fmt.Errorf("readIntReply: can't parse %.100q", line)
if err != nil {
return nil, err
} }
return v, nil }
case '$':
func parseBytesReply(cn *conn, line []byte) ([]byte, error) {
if len(line) == 3 && line[1] == '-' && line[2] == '1' { if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return nil, Nil return nil, Nil
} }
@ -270,26 +288,116 @@ func parseReply(cn *conn, p multiBulkParser) (interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return b[:replyLen], nil return b[:replyLen], nil
case '*': }
func readBytesReply(cn *conn) ([]byte, error) {
line, err := readLine(cn)
if err != nil {
return nil, err
}
switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case stringReply:
return parseBytesReply(cn, line)
default:
return nil, fmt.Errorf("readBytesReply: can't parse %.100q", line)
}
}
func readStringReply(cn *conn) (string, error) {
b, err := readBytesReply(cn)
if err != nil {
return "", err
}
return string(b), nil
}
func readFloatReply(cn *conn) (float64, error) {
b, err := readBytesReply(cn)
if err != nil {
return 0, err
}
return strconv.ParseFloat(bytesToString(b), 64)
}
func parseArrayHeader(cn *conn, line []byte) (int64, error) {
if len(line) == 3 && line[1] == '-' && line[2] == '1' { if len(line) == 3 && line[1] == '-' && line[2] == '1' {
return nil, Nil return 0, Nil
} }
repliesNum, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64) n, err := strconv.ParseInt(bytesToString(line[1:]), 10, 64)
if err != nil {
return 0, err
}
return n, nil
}
func parseArrayReply(cn *conn, p multiBulkParser, line []byte) (interface{}, error) {
n, err := parseArrayHeader(cn, line)
if err != nil {
return nil, err
}
return p(cn, n)
}
func readArrayHeader(cn *conn) (int64, error) {
line, err := readLine(cn)
if err != nil {
return 0, err
}
switch line[0] {
case errorReply:
return 0, parseErrorReply(cn, line)
case arrayReply:
return parseArrayHeader(cn, line)
default:
return 0, fmt.Errorf("readArrayReply: can't parse %.100q", line)
}
}
func readArrayReply(cn *conn, p multiBulkParser) (interface{}, error) {
line, err := readLine(cn)
if err != nil {
return nil, err
}
switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case arrayReply:
return parseArrayReply(cn, p, line)
default:
return nil, fmt.Errorf("readArrayReply: can't parse %.100q", line)
}
}
func readReply(cn *conn, p multiBulkParser) (interface{}, error) {
line, err := readLine(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return p(cn, repliesNum) switch line[0] {
case errorReply:
return nil, parseErrorReply(cn, line)
case statusReply:
return line[1:], nil
case intReply:
return parseIntReply(cn, line)
case stringReply:
return parseBytesReply(cn, line)
case arrayReply:
return parseArrayReply(cn, p, line)
} }
return nil, fmt.Errorf("redis: can't parse %q", line) return nil, fmt.Errorf("redis: can't parse %q", line)
} }
func parseSlice(cn *conn, n int64) (interface{}, error) { func sliceParser(cn *conn, n int64) (interface{}, error) {
vals := make([]interface{}, 0, n) vals := make([]interface{}, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
v, err := parseReply(cn, parseSlice) v, err := readReply(cn, sliceParser)
if err == Nil { if err == Nil {
vals = append(vals, nil) vals = append(vals, nil)
} else if err != nil { } else if err != nil {
@ -306,171 +414,224 @@ func parseSlice(cn *conn, n int64) (interface{}, error) {
return vals, nil return vals, nil
} }
func parseStringSlice(cn *conn, n int64) (interface{}, error) { func intSliceParser(cn *conn, n int64) (interface{}, error) {
vals := make([]string, 0, n) ints := make([]int64, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(cn, nil) n, err := readIntReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
v, ok := viface.([]byte) ints = append(ints, n)
if !ok {
return nil, fmt.Errorf("got %T, expected string", viface)
} }
vals = append(vals, string(v)) return ints, nil
}
return vals, nil
} }
func parseBoolSlice(cn *conn, n int64) (interface{}, error) { func boolSliceParser(cn *conn, n int64) (interface{}, error) {
vals := make([]bool, 0, n) bools := make([]bool, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(cn, nil) n, err := readIntReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
v, ok := viface.(int64) bools = append(bools, n == 1)
if !ok {
return nil, fmt.Errorf("got %T, expected int64", viface)
} }
vals = append(vals, v == 1) return bools, nil
}
return vals, nil
} }
func parseStringStringMap(cn *conn, n int64) (interface{}, error) { func stringSliceParser(cn *conn, n int64) (interface{}, error) {
ss := make([]string, 0, n)
for i := int64(0); i < n; i++ {
s, err := readStringReply(cn)
if err != nil {
return nil, err
}
ss = append(ss, s)
}
return ss, nil
}
func floatSliceParser(cn *conn, n int64) (interface{}, error) {
nn := make([]float64, 0, n)
for i := int64(0); i < n; i++ {
n, err := readFloatReply(cn)
if err != nil {
return nil, err
}
nn = append(nn, n)
}
return nn, nil
}
func stringStringMapParser(cn *conn, n int64) (interface{}, error) {
m := make(map[string]string, n/2) m := make(map[string]string, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
keyIface, err := parseReply(cn, nil) key, err := readStringReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
keyBytes, ok := keyIface.([]byte)
if !ok {
return nil, fmt.Errorf("got %T, expected []byte", keyIface)
}
key := string(keyBytes)
valueIface, err := parseReply(cn, nil) value, err := readStringReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
valueBytes, ok := valueIface.([]byte)
if !ok {
return nil, fmt.Errorf("got %T, expected []byte", valueIface)
}
m[key] = string(valueBytes) m[key] = value
} }
return m, nil return m, nil
} }
func parseStringIntMap(cn *conn, n int64) (interface{}, error) { func stringIntMapParser(cn *conn, n int64) (interface{}, error) {
m := make(map[string]int64, n/2) m := make(map[string]int64, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
keyiface, err := parseReply(cn, nil) key, err := readStringReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
key, ok := keyiface.([]byte)
if !ok {
return nil, fmt.Errorf("got %T, expected string", keyiface)
}
valueiface, err := parseReply(cn, nil) n, err := readIntReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
switch value := valueiface.(type) {
case int64: m[key] = n
m[string(key)] = value
case string:
m[string(key)], err = strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, fmt.Errorf("got %v, expected number", value)
}
default:
return nil, fmt.Errorf("got %T, expected number or string", valueiface)
}
} }
return m, nil return m, nil
} }
func parseZSlice(cn *conn, n int64) (interface{}, error) { func zSliceParser(cn *conn, n int64) (interface{}, error) {
zz := make([]Z, n/2) zz := make([]Z, n/2)
for i := int64(0); i < n; i += 2 { for i := int64(0); i < n; i += 2 {
var err error
z := &zz[i/2] z := &zz[i/2]
memberiface, err := parseReply(cn, nil) z.Member, err = readStringReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
member, ok := memberiface.([]byte)
if !ok {
return nil, fmt.Errorf("got %T, expected string", memberiface)
}
z.Member = string(member)
scoreiface, err := parseReply(cn, nil) z.Score, err = readFloatReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
scoreb, ok := scoreiface.([]byte)
if !ok {
return nil, fmt.Errorf("got %T, expected string", scoreiface)
}
score, err := strconv.ParseFloat(bytesToString(scoreb), 64)
if err != nil {
return nil, err
}
z.Score = score
} }
return zz, nil return zz, nil
} }
func parseClusterSlotInfoSlice(cn *conn, n int64) (interface{}, error) { func clusterSlotInfoSliceParser(cn *conn, n int64) (interface{}, error) {
infos := make([]ClusterSlotInfo, 0, n) infos := make([]ClusterSlotInfo, 0, n)
for i := int64(0); i < n; i++ { for i := int64(0); i < n; i++ {
viface, err := parseReply(cn, parseSlice) n, err := readArrayHeader(cn)
if err != nil {
return nil, err
}
if n < 2 {
return nil, fmt.Errorf("got %d elements in cluster info, expected at least 2", n)
}
start, err := readIntReply(cn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
item, ok := viface.([]interface{}) end, err := readIntReply(cn)
if !ok { if err != nil {
return nil, fmt.Errorf("got %T, expected []interface{}", viface) return nil, err
} else if len(item) < 3 {
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
} }
start, ok := item[0].(int64) addrsn := n - 2
if !ok || start < 0 || start > hashSlots { info := ClusterSlotInfo{
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item) Start: int(start),
} End: int(end),
end, ok := item[1].(int64) Addrs: make([]string, addrsn),
if !ok || end < 0 || end > hashSlots {
return nil, fmt.Errorf("got %v, expected {int64, int64, string...}", item)
} }
info := ClusterSlotInfo{int(start), int(end), make([]string, len(item)-2)} for i := int64(0); i < addrsn; i++ {
for n, ipair := range item[2:] { n, err := readArrayHeader(cn)
pair, ok := ipair.([]interface{}) if err != nil {
if !ok || len(pair) != 2 { return nil, err
return nil, fmt.Errorf("got %v, expected []interface{host, port}", viface) }
if n != 2 {
return nil, fmt.Errorf("got %d elements in cluster info address, expected 2", n)
} }
ip, ok := pair[0].(string) ip, err := readStringReply(cn)
if !ok || len(ip) < 1 { if err != nil {
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair) return nil, err
}
port, ok := pair[1].(int64)
if !ok || port < 1 {
return nil, fmt.Errorf("got %v, expected IP PORT pair", pair)
} }
info.Addrs[n] = net.JoinHostPort(ip, strconv.FormatInt(port, 10)) port, err := readIntReply(cn)
if err != nil {
return nil, err
} }
info.Addrs[i] = net.JoinHostPort(ip, strconv.FormatInt(port, 10))
}
infos = append(infos, info) infos = append(infos, info)
} }
return infos, nil return infos, nil
} }
func geoLocationParser(cn *conn, n int64) (interface{}, error) {
loc := &GeoLocation{}
var err error
loc.Name, err = readStringReply(cn)
if err != nil {
return nil, err
}
if n >= 2 {
loc.Distance, err = readFloatReply(cn)
if err != nil {
return nil, err
}
}
if n >= 3 {
loc.GeoHash, err = readIntReply(cn)
if err != nil {
return nil, err
}
}
if n >= 4 {
n, err := readArrayHeader(cn)
if err != nil {
return nil, err
}
if n != 2 {
return nil, fmt.Errorf("got %d coordinates, expected 2", n)
}
loc.Longitude, err = readFloatReply(cn)
if err != nil {
return nil, err
}
loc.Latitude, err = readFloatReply(cn)
if err != nil {
return nil, err
}
}
return loc, nil
}
func geoLocationSliceParser(cn *conn, n int64) (interface{}, error) {
locs := make([]GeoLocation, 0, n)
for i := int64(0); i < n; i++ {
v, err := readReply(cn, geoLocationParser)
if err != nil {
return nil, err
}
switch vv := v.(type) {
case []byte:
locs = append(locs, GeoLocation{
Name: string(vv),
})
case *GeoLocation:
locs = append(locs, *vv)
default:
return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
}
}
return locs, nil
}

View File

@ -23,7 +23,7 @@ func BenchmarkParseReplyString(b *testing.B) {
} }
func BenchmarkParseReplySlice(b *testing.B) { func BenchmarkParseReplySlice(b *testing.B) {
benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", parseSlice, false) benchmarkParseReply(b, "*2\r\n$5\r\nhello\r\n$5\r\nworld\r\n", sliceParser, false)
} }
func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr bool) { func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr bool) {
@ -39,7 +39,7 @@ func benchmarkParseReply(b *testing.B, reply string, p multiBulkParser, wanterr
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := parseReply(cn, p) _, err := readReply(cn, p)
if !wanterr && err != nil { if !wanterr && err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@ -97,7 +97,7 @@ func execCmds(cn *conn, cmds []Cmder) ([]Cmder, error) {
var firstCmdErr error var firstCmdErr error
var failedCmds []Cmder var failedCmds []Cmder
for _, cmd := range cmds { for _, cmd := range cmds {
err := cmd.parseReply(cn) err := cmd.readReply(cn)
if err == nil { if err == nil {
continue continue
} }

View File

@ -215,7 +215,7 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
cn.ReadTimeout = timeout cn.ReadTimeout = timeout
cmd := NewSliceCmd() cmd := NewSliceCmd()
if err := cmd.parseReply(cn); err != nil { if err := cmd.readReply(cn); err != nil {
return nil, err return nil, err
} }
return newMessage(cmd.Val()) return newMessage(cmd.Val())

View File

@ -69,7 +69,7 @@ func (c *baseClient) process(cmd Cmder) {
return return
} }
err = cmd.parseReply(cn) err = cmd.readReply(cn)
c.putConn(cn, err) c.putConn(cn, err)
if shouldRetry(err) { if shouldRetry(err) {
continue continue