abstract the read / write in server client

This commit is contained in:
silentsai 2014-07-31 14:38:20 +08:00
parent 895613ca5b
commit 14767ecadb
12 changed files with 492 additions and 307 deletions

View File

@ -123,7 +123,7 @@ func (app *App) Run() {
continue continue
} }
newClient(conn, app) newTcpClient(conn, app)
} }
} }

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"bufio"
"bytes" "bytes"
"errors" "errors"
"github.com/siddontang/go-log/log" "github.com/siddontang/go-log/log"
@ -9,7 +8,6 @@ import (
"io" "io"
"net" "net"
"runtime" "runtime"
"strconv"
"strings" "strings"
"time" "time"
) )
@ -21,40 +19,56 @@ type client struct {
ldb *ledis.Ledis ldb *ledis.Ledis
db *ledis.DB db *ledis.DB
c net.Conn
rb *bufio.Reader ctx clientContext
wb *bufio.Writer resp responseWriter
req requestReader
cmd string cmd string
args [][]byte args [][]byte
reqC chan error reqC chan error
syncBuf bytes.Buffer
compressBuf []byte compressBuf []byte
syncBuf bytes.Buffer
logBuf bytes.Buffer logBuf bytes.Buffer
} }
func newClient(c net.Conn, app *App) { type clientContext interface {
co := new(client) addr() string
release()
}
co.app = app type requestReader interface {
co.ldb = app.ldb // readLine func() ([]byte, error)
//use default db read() ([][]byte, error)
co.db, _ = app.ldb.Select(0) }
co.c = c
co.rb = bufio.NewReaderSize(c, 256) type responseWriter interface {
co.wb = bufio.NewWriterSize(c, 256) writeError(error)
writeStatus(string)
writeInteger(int64)
writeBulk([]byte)
writeArray([]interface{})
writeSliceArray([][]byte)
writeFVPairArray([]ledis.FVPair)
writeScorePairArray([]ledis.ScorePair, bool)
writeBulkFrom(int64, io.Reader)
flush()
}
co.reqC = make(chan error, 1) func newClient(app *App) *client {
c := new(client)
co.compressBuf = make([]byte, 256) c.app = app
c.ldb = app.ldb
c.db, _ = app.ldb.Select(0)
go co.run() c.reqC = make(chan error, 1)
c.compressBuf = make([]byte, 256)
return c
} }
func (c *client) run() { func (c *client) run() {
@ -67,11 +81,11 @@ func (c *client) run() {
log.Fatal("client run panic %s:%v", buf, e) log.Fatal("client run panic %s:%v", buf, e)
} }
c.c.Close() c.ctx.release()
}() }()
for { for {
req, err := c.readRequest() req, err := c.req.read()
if err != nil { if err != nil {
return return
} }
@ -80,65 +94,6 @@ func (c *client) run() {
} }
} }
func (c *client) readLine() ([]byte, error) {
return ReadLine(c.rb)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (c *client) readRequest() ([][]byte, error) {
l, err := c.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
req := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = c.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
req = append(req, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(c.rb, buf); err != nil {
return nil, err
}
if l, err = c.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
req = append(req, buf)
}
} else {
return nil, errReadRequest
}
}
return req, nil
}
func (c *client) handleRequest(req [][]byte) { func (c *client) handleRequest(req [][]byte) {
var err error var err error
@ -179,137 +134,27 @@ func (c *client) handleRequest(req [][]byte) {
} }
} }
c.app.access.Log(c.c.RemoteAddr().String(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err) c.app.access.Log(c.ctx.addr(), duration.Nanoseconds()/1000000, c.logBuf.Bytes(), err)
} }
if err != nil { if err != nil {
c.writeError(err) c.resp.writeError(err)
} }
c.wb.Flush() c.resp.flush()
} }
func (c *client) writeError(err error) { func newTcpClient(conn net.Conn, app *App) {
c.wb.Write(ledis.Slice("-ERR")) c := newClient(app)
if err != nil {
c.wb.WriteByte(' ') c.ctx = newTcpContext(conn)
c.wb.Write(ledis.Slice(err.Error())) c.req = newTcpReader(conn)
} c.resp = newTcpWriter(conn)
c.wb.Write(Delims)
go c.run()
} }
func (c *client) writeStatus(status string) { // func newHttpClient(w http.ResponseWriter, r *http.Request, app *App) {
c.wb.WriteByte('+') // c := newClient(app)
c.wb.Write(ledis.Slice(status)) // go c.run()
c.wb.Write(Delims) // }
}
func (c *client) writeInteger(n int64) {
c.wb.WriteByte(':')
c.wb.Write(ledis.StrPutInt64(n))
c.wb.Write(Delims)
}
func (c *client) writeBulk(b []byte) {
c.wb.WriteByte('$')
if b == nil {
c.wb.Write(NullBulk)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(b))))
c.wb.Write(Delims)
c.wb.Write(b)
}
c.wb.Write(Delims)
}
func (c *client) writeArray(ay []interface{}) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
switch v := ay[i].(type) {
case []interface{}:
c.writeArray(v)
case []byte:
c.writeBulk(v)
case nil:
c.writeBulk(nil)
case int64:
c.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (c *client) writeSliceArray(ay [][]byte) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i])
}
}
}
func (c *client) writeFVPairArray(ay []ledis.FVPair) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Field)
c.writeBulk(ay[i].Value)
}
}
}
func (c *client) writeScorePairArray(ay []ledis.ScorePair, withScores bool) {
c.wb.WriteByte('*')
if ay == nil {
c.wb.Write(NullArray)
c.wb.Write(Delims)
} else {
if withScores {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay) * 2)))
c.wb.Write(Delims)
} else {
c.wb.Write(ledis.Slice(strconv.Itoa(len(ay))))
c.wb.Write(Delims)
}
for i := 0; i < len(ay); i++ {
c.writeBulk(ay[i].Member)
if withScores {
c.writeBulk(ledis.StrPutInt64(ay[i].Score))
}
}
}
}
func (c *client) writeBulkFrom(n int64, rb io.Reader) {
c.wb.WriteByte('$')
c.wb.Write(ledis.Slice(strconv.FormatInt(n, 10)))
c.wb.Write(Delims)
io.Copy(c.wb, rb)
c.wb.Write(Delims)
}

View File

@ -14,7 +14,7 @@ func bgetCommand(c *client) error {
if v, err := c.db.BGet(args[0]); err != nil { if v, err := c.db.BGet(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
} }
@ -28,7 +28,7 @@ func bdeleteCommand(c *client) error {
if n, err := c.db.BDelete(args[0]); err != nil { if n, err := c.db.BDelete(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
} }
@ -56,7 +56,7 @@ func bsetbitCommand(c *client) error {
if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil { if ori, err := c.db.BSetBit(args[0], offset, uint8(val)); err != nil {
return err return err
} else { } else {
c.writeInteger(int64(ori)) c.resp.writeInteger(int64(ori))
} }
return nil return nil
} }
@ -75,7 +75,7 @@ func bgetbitCommand(c *client) error {
if v, err := c.db.BGetBit(args[0], offset); err != nil { if v, err := c.db.BGetBit(args[0], offset); err != nil {
return err return err
} else { } else {
c.writeInteger(int64(v)) c.resp.writeInteger(int64(v))
} }
return nil return nil
} }
@ -116,7 +116,7 @@ func bmsetbitCommand(c *client) error {
if place, err := c.db.BMSetBit(key, pairs...); err != nil { if place, err := c.db.BMSetBit(key, pairs...); err != nil {
return err return err
} else { } else {
c.writeInteger(place) c.resp.writeInteger(place)
} }
return nil return nil
} }
@ -151,7 +151,7 @@ func bcountCommand(c *client) error {
if cnt, err := c.db.BCount(args[0], start, end); err != nil { if cnt, err := c.db.BCount(args[0], start, end); err != nil {
return err return err
} else { } else {
c.writeInteger(int64(cnt)) c.resp.writeInteger(int64(cnt))
} }
return nil return nil
} }
@ -183,7 +183,7 @@ func boptCommand(c *client) error {
if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil { if blen, err := c.db.BOperation(op, dstKey, srcKeys...); err != nil {
return err return err
} else { } else {
c.writeInteger(int64(blen)) c.resp.writeInteger(int64(blen))
} }
return nil return nil
} }
@ -202,7 +202,7 @@ func bexpireCommand(c *client) error {
if v, err := c.db.BExpire(args[0], duration); err != nil { if v, err := c.db.BExpire(args[0], duration); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -222,7 +222,7 @@ func bexpireatCommand(c *client) error {
if v, err := c.db.BExpireAt(args[0], when); err != nil { if v, err := c.db.BExpireAt(args[0], when); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -237,7 +237,7 @@ func bttlCommand(c *client) error {
if v, err := c.db.BTTL(args[0]); err != nil { if v, err := c.db.BTTL(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -252,7 +252,7 @@ func bpersistCommand(c *client) error {
if n, err := c.db.BPersist(args[0]); err != nil { if n, err := c.db.BPersist(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil

View File

@ -13,7 +13,7 @@ func hsetCommand(c *client) error {
if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil { if n, err := c.db.HSet(args[0], args[1], args[2]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -28,7 +28,7 @@ func hgetCommand(c *client) error {
if v, err := c.db.HGet(args[0], args[1]); err != nil { if v, err := c.db.HGet(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
@ -48,7 +48,7 @@ func hexistsCommand(c *client) error {
n = 0 n = 0
} }
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
} }
@ -62,7 +62,7 @@ func hdelCommand(c *client) error {
if n, err := c.db.HDel(args[0], args[1:]...); err != nil { if n, err := c.db.HDel(args[0], args[1:]...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -77,7 +77,7 @@ func hlenCommand(c *client) error {
if n, err := c.db.HLen(args[0]); err != nil { if n, err := c.db.HLen(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -98,7 +98,7 @@ func hincrbyCommand(c *client) error {
if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil { if n, err = c.db.HIncrBy(args[0], args[1], delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
} }
@ -126,7 +126,7 @@ func hmsetCommand(c *client) error {
if err := c.db.HMset(key, kvs...); err != nil { if err := c.db.HMset(key, kvs...); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.resp.writeStatus(OK)
} }
return nil return nil
@ -141,7 +141,7 @@ func hmgetCommand(c *client) error {
if v, err := c.db.HMget(args[0], args[1:]...); err != nil { if v, err := c.db.HMget(args[0], args[1:]...); err != nil {
return err return err
} else { } else {
c.writeSliceArray(v) c.resp.writeSliceArray(v)
} }
return nil return nil
@ -156,7 +156,7 @@ func hgetallCommand(c *client) error {
if v, err := c.db.HGetAll(args[0]); err != nil { if v, err := c.db.HGetAll(args[0]); err != nil {
return err return err
} else { } else {
c.writeFVPairArray(v) c.resp.writeFVPairArray(v)
} }
return nil return nil
@ -171,7 +171,7 @@ func hkeysCommand(c *client) error {
if v, err := c.db.HKeys(args[0]); err != nil { if v, err := c.db.HKeys(args[0]); err != nil {
return err return err
} else { } else {
c.writeSliceArray(v) c.resp.writeSliceArray(v)
} }
return nil return nil
@ -186,7 +186,7 @@ func hvalsCommand(c *client) error {
if v, err := c.db.HValues(args[0]); err != nil { if v, err := c.db.HValues(args[0]); err != nil {
return err return err
} else { } else {
c.writeSliceArray(v) c.resp.writeSliceArray(v)
} }
return nil return nil
@ -201,7 +201,7 @@ func hclearCommand(c *client) error {
if n, err := c.db.HClear(args[0]); err != nil { if n, err := c.db.HClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -216,7 +216,7 @@ func hmclearCommand(c *client) error {
if n, err := c.db.HMclear(args...); err != nil { if n, err := c.db.HMclear(args...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -236,7 +236,7 @@ func hexpireCommand(c *client) error {
if v, err := c.db.HExpire(args[0], duration); err != nil { if v, err := c.db.HExpire(args[0], duration); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -256,7 +256,7 @@ func hexpireAtCommand(c *client) error {
if v, err := c.db.HExpireAt(args[0], when); err != nil { if v, err := c.db.HExpireAt(args[0], when); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -271,7 +271,7 @@ func httlCommand(c *client) error {
if v, err := c.db.HTTL(args[0]); err != nil { if v, err := c.db.HTTL(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -286,7 +286,7 @@ func hpersistCommand(c *client) error {
if n, err := c.db.HPersist(args[0]); err != nil { if n, err := c.db.HPersist(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil

View File

@ -13,7 +13,7 @@ func getCommand(c *client) error {
if v, err := c.db.Get(args[0]); err != nil { if v, err := c.db.Get(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
} }
@ -27,7 +27,7 @@ func setCommand(c *client) error {
if err := c.db.Set(args[0], args[1]); err != nil { if err := c.db.Set(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.resp.writeStatus(OK)
} }
return nil return nil
@ -42,7 +42,7 @@ func getsetCommand(c *client) error {
if v, err := c.db.GetSet(args[0], args[1]); err != nil { if v, err := c.db.GetSet(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
@ -57,7 +57,7 @@ func setnxCommand(c *client) error {
if n, err := c.db.SetNX(args[0], args[1]); err != nil { if n, err := c.db.SetNX(args[0], args[1]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -72,7 +72,7 @@ func existsCommand(c *client) error {
if n, err := c.db.Exists(args[0]); err != nil { if n, err := c.db.Exists(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -87,7 +87,7 @@ func incrCommand(c *client) error {
if n, err := c.db.Incr(c.args[0]); err != nil { if n, err := c.db.Incr(c.args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -102,7 +102,7 @@ func decrCommand(c *client) error {
if n, err := c.db.Decr(c.args[0]); err != nil { if n, err := c.db.Decr(c.args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -122,7 +122,7 @@ func incrbyCommand(c *client) error {
if n, err := c.db.IncryBy(c.args[0], delta); err != nil { if n, err := c.db.IncryBy(c.args[0], delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -142,7 +142,7 @@ func decrbyCommand(c *client) error {
if n, err := c.db.DecrBy(c.args[0], delta); err != nil { if n, err := c.db.DecrBy(c.args[0], delta); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -157,7 +157,7 @@ func delCommand(c *client) error {
if n, err := c.db.Del(args...); err != nil { if n, err := c.db.Del(args...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -178,7 +178,7 @@ func msetCommand(c *client) error {
if err := c.db.MSet(kvs...); err != nil { if err := c.db.MSet(kvs...); err != nil {
return err return err
} else { } else {
c.writeStatus(OK) c.resp.writeStatus(OK)
} }
return nil return nil
@ -197,7 +197,7 @@ func mgetCommand(c *client) error {
if v, err := c.db.MGet(args...); err != nil { if v, err := c.db.MGet(args...); err != nil {
return err return err
} else { } else {
c.writeSliceArray(v) c.resp.writeSliceArray(v)
} }
return nil return nil
@ -217,7 +217,7 @@ func expireCommand(c *client) error {
if v, err := c.db.Expire(args[0], duration); err != nil { if v, err := c.db.Expire(args[0], duration); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -237,7 +237,7 @@ func expireAtCommand(c *client) error {
if v, err := c.db.ExpireAt(args[0], when); err != nil { if v, err := c.db.ExpireAt(args[0], when); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -252,7 +252,7 @@ func ttlCommand(c *client) error {
if v, err := c.db.TTL(args[0]); err != nil { if v, err := c.db.TTL(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -267,7 +267,7 @@ func persistCommand(c *client) error {
if n, err := c.db.Persist(args[0]); err != nil { if n, err := c.db.Persist(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil

View File

@ -13,7 +13,7 @@ func lpushCommand(c *client) error {
if n, err := c.db.LPush(args[0], args[1:]...); err != nil { if n, err := c.db.LPush(args[0], args[1:]...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -28,7 +28,7 @@ func rpushCommand(c *client) error {
if n, err := c.db.RPush(args[0], args[1:]...); err != nil { if n, err := c.db.RPush(args[0], args[1:]...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -43,7 +43,7 @@ func lpopCommand(c *client) error {
if v, err := c.db.LPop(args[0]); err != nil { if v, err := c.db.LPop(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
@ -58,7 +58,7 @@ func rpopCommand(c *client) error {
if v, err := c.db.RPop(args[0]); err != nil { if v, err := c.db.RPop(args[0]); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
@ -73,7 +73,7 @@ func llenCommand(c *client) error {
if n, err := c.db.LLen(args[0]); err != nil { if n, err := c.db.LLen(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -93,7 +93,7 @@ func lindexCommand(c *client) error {
if v, err := c.db.LIndex(args[0], int32(index)); err != nil { if v, err := c.db.LIndex(args[0], int32(index)); err != nil {
return err return err
} else { } else {
c.writeBulk(v) c.resp.writeBulk(v)
} }
return nil return nil
@ -122,7 +122,7 @@ func lrangeCommand(c *client) error {
if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil { if v, err := c.db.LRange(args[0], int32(start), int32(stop)); err != nil {
return err return err
} else { } else {
c.writeSliceArray(v) c.resp.writeSliceArray(v)
} }
return nil return nil
@ -137,7 +137,7 @@ func lclearCommand(c *client) error {
if n, err := c.db.LClear(args[0]); err != nil { if n, err := c.db.LClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -152,7 +152,7 @@ func lmclearCommand(c *client) error {
if n, err := c.db.LMclear(args...); err != nil { if n, err := c.db.LMclear(args...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -172,7 +172,7 @@ func lexpireCommand(c *client) error {
if v, err := c.db.LExpire(args[0], duration); err != nil { if v, err := c.db.LExpire(args[0], duration); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -192,7 +192,7 @@ func lexpireAtCommand(c *client) error {
if v, err := c.db.LExpireAt(args[0], when); err != nil { if v, err := c.db.LExpireAt(args[0], when); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -207,7 +207,7 @@ func lttlCommand(c *client) error {
if v, err := c.db.LTTL(args[0]); err != nil { if v, err := c.db.LTTL(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -222,7 +222,7 @@ func lpersistCommand(c *client) error {
if n, err := c.db.LPersist(args[0]); err != nil { if n, err := c.db.LPersist(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil

View File

@ -35,7 +35,7 @@ func slaveofCommand(c *client) error {
return err return err
} }
c.writeStatus(OK) c.resp.writeStatus(OK)
return nil return nil
} }
@ -56,7 +56,7 @@ func fullsyncCommand(c *client) error {
dumpFile.Seek(0, os.SEEK_SET) dumpFile.Seek(0, os.SEEK_SET)
c.writeBulkFrom(n, dumpFile) c.resp.writeBulkFrom(n, dumpFile)
name := dumpFile.Name() name := dumpFile.Name()
dumpFile.Close() dumpFile.Close()
@ -112,7 +112,7 @@ func syncCommand(c *client) error {
return err return err
} }
c.writeBulk(buf) c.resp.writeBulk(buf)
} }
return nil return nil

View File

@ -39,7 +39,7 @@ func zaddCommand(c *client) error {
if n, err := c.db.ZAdd(key, params...); err != nil { if n, err := c.db.ZAdd(key, params...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -54,7 +54,7 @@ func zcardCommand(c *client) error {
if n, err := c.db.ZCard(args[0]); err != nil { if n, err := c.db.ZCard(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -68,12 +68,12 @@ func zscoreCommand(c *client) error {
if s, err := c.db.ZScore(args[0], args[1]); err != nil { if s, err := c.db.ZScore(args[0], args[1]); err != nil {
if err == ledis.ErrScoreMiss { if err == ledis.ErrScoreMiss {
c.writeBulk(nil) c.resp.writeBulk(nil)
} else { } else {
return err return err
} }
} else { } else {
c.writeBulk(ledis.StrPutInt64(s)) c.resp.writeBulk(ledis.StrPutInt64(s))
} }
return nil return nil
@ -88,7 +88,7 @@ func zremCommand(c *client) error {
if n, err := c.db.ZRem(args[0], args[1:]...); err != nil { if n, err := c.db.ZRem(args[0], args[1:]...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -110,7 +110,7 @@ func zincrbyCommand(c *client) error {
if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil { if v, err := c.db.ZIncrBy(key, delta, args[2]); err != nil {
return err return err
} else { } else {
c.writeBulk(ledis.StrPutInt64(v)) c.resp.writeBulk(ledis.StrPutInt64(v))
} }
return nil return nil
@ -190,14 +190,14 @@ func zcountCommand(c *client) error {
} }
if min > max { if min > max {
c.writeInteger(0) c.resp.writeInteger(0)
return nil return nil
} }
if n, err := c.db.ZCount(args[0], min, max); err != nil { if n, err := c.db.ZCount(args[0], min, max); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -212,9 +212,9 @@ func zrankCommand(c *client) error {
if n, err := c.db.ZRank(args[0], args[1]); err != nil { if n, err := c.db.ZRank(args[0], args[1]); err != nil {
return err return err
} else if n == -1 { } else if n == -1 {
c.writeBulk(nil) c.resp.writeBulk(nil)
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -229,9 +229,9 @@ func zrevrankCommand(c *client) error {
if n, err := c.db.ZRevRank(args[0], args[1]); err != nil { if n, err := c.db.ZRevRank(args[0], args[1]); err != nil {
return err return err
} else if n == -1 { } else if n == -1 {
c.writeBulk(nil) c.resp.writeBulk(nil)
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -253,7 +253,7 @@ func zremrangebyrankCommand(c *client) error {
if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil { if n, err := c.db.ZRemRangeByRank(key, start, stop); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -274,7 +274,7 @@ func zremrangebyscoreCommand(c *client) error {
if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil { if n, err := c.db.ZRemRangeByScore(key, min, max); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -315,7 +315,7 @@ func zrangeGeneric(c *client, reverse bool) error {
if datas, err := c.db.ZRangeGeneric(key, start, stop, reverse); err != nil { if datas, err := c.db.ZRangeGeneric(key, start, stop, reverse); err != nil {
return err return err
} else { } else {
c.writeScorePairArray(datas, withScores) c.resp.writeScorePairArray(datas, withScores)
} }
return nil return nil
} }
@ -383,14 +383,14 @@ func zrangebyscoreGeneric(c *client, reverse bool) error {
if offset < 0 { if offset < 0 {
//for ledis, if offset < 0, a empty will return //for ledis, if offset < 0, a empty will return
//so here we directly return a empty array //so here we directly return a empty array
c.writeArray([]interface{}{}) c.resp.writeArray([]interface{}{})
return nil return nil
} }
if datas, err := c.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil { if datas, err := c.db.ZRangeByScoreGeneric(key, min, max, offset, count, reverse); err != nil {
return err return err
} else { } else {
c.writeScorePairArray(datas, withScores) c.resp.writeScorePairArray(datas, withScores)
} }
return nil return nil
@ -413,7 +413,7 @@ func zclearCommand(c *client) error {
if n, err := c.db.ZClear(args[0]); err != nil { if n, err := c.db.ZClear(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -428,7 +428,7 @@ func zmclearCommand(c *client) error {
if n, err := c.db.ZMclear(args...); err != nil { if n, err := c.db.ZMclear(args...); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil
@ -448,7 +448,7 @@ func zexpireCommand(c *client) error {
if v, err := c.db.ZExpire(args[0], duration); err != nil { if v, err := c.db.ZExpire(args[0], duration); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -468,7 +468,7 @@ func zexpireAtCommand(c *client) error {
if v, err := c.db.ZExpireAt(args[0], when); err != nil { if v, err := c.db.ZExpireAt(args[0], when); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -483,7 +483,7 @@ func zttlCommand(c *client) error {
if v, err := c.db.ZTTL(args[0]); err != nil { if v, err := c.db.ZTTL(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(v) c.resp.writeInteger(v)
} }
return nil return nil
@ -498,7 +498,7 @@ func zpersistCommand(c *client) error {
if n, err := c.db.ZPersist(args[0]); err != nil { if n, err := c.db.ZPersist(args[0]); err != nil {
return err return err
} else { } else {
c.writeInteger(n) c.resp.writeInteger(n)
} }
return nil return nil

View File

@ -21,7 +21,7 @@ func register(name string, f CommandFunc) {
} }
func pingCommand(c *client) error { func pingCommand(c *client) error {
c.writeStatus(PONG) c.resp.writeStatus(PONG)
return nil return nil
} }
@ -30,7 +30,7 @@ func echoCommand(c *client) error {
return ErrCmdParams return ErrCmdParams
} }
c.writeBulk(c.args[0]) c.resp.writeBulk(c.args[0])
return nil return nil
} }
@ -46,7 +46,7 @@ func selectCommand(c *client) error {
return err return err
} else { } else {
c.db = db c.db = db
c.writeStatus(OK) c.resp.writeStatus(OK)
} }
} }
return nil return nil

95
server/http/http_io.go Normal file
View File

@ -0,0 +1,95 @@
package http
import (
"github.com/siddontang/ledisdb/ledis"
"io"
"net/http"
)
type httpContext struct {
}
type httpReader struct {
req *http.Request
}
type httpWriter struct {
resp *http.ResponseWriter
}
// http context
func newHttpContext() *httpContext {
ctx := new(httpContext)
return ctx
}
func (ctx *httpContext) addr() string {
return ""
}
func (ctx *httpContext) release() {
}
// http reader
func newHttpReader(req *http.Request) *httpReader {
r := new(httpReader)
r.req = req
return r
}
func (r *httpReader) read() ([][]byte, error) {
return nil, nil
}
// http writer
func newHttpWriter(resp *http.ResponseWriter) *httpWriter {
w := new(httpWriter)
w.resp = resp
return w
}
func (w *httpWriter) writeError(err error) {
}
func (w *httpWriter) writeStatus(status string) {
}
func (w *httpWriter) writeInteger(n int64) {
}
func (w *httpWriter) writeBulk(b []byte) {
}
func (w *httpWriter) writeArray(lst []interface{}) {
}
func (w *httpWriter) writeSliceArray(lst [][]byte) {
}
func (w *httpWriter) writeFVPairArray(lst []ledis.FVPair) {
}
func (w *httpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
}
func (w *httpWriter) writeBulkFrom(n int64, rb io.Reader) {
}
func (w *httpWriter) flush() {
}

View File

@ -72,7 +72,7 @@ func (m *MasterInfo) Load(filePath string) error {
type master struct { type master struct {
sync.Mutex sync.Mutex
c net.Conn conn net.Conn
rb *bufio.Reader rb *bufio.Reader
app *App app *App
@ -114,9 +114,9 @@ func (m *master) Close() {
default: default:
} }
if m.c != nil { if m.conn != nil {
m.c.Close() m.conn.Close()
m.c = nil m.conn = nil
} }
m.wg.Wait() m.wg.Wait()
@ -135,17 +135,17 @@ func (m *master) connect() error {
return fmt.Errorf("no assign master addr") return fmt.Errorf("no assign master addr")
} }
if m.c != nil { if m.conn != nil {
m.c.Close() m.conn.Close()
m.c = nil m.conn = nil
} }
if c, err := net.Dial("tcp", m.info.Addr); err != nil { if conn, err := net.Dial("tcp", m.info.Addr); err != nil {
return err return err
} else { } else {
m.c = c m.conn = conn
m.rb = bufio.NewReaderSize(m.c, 4096) m.rb = bufio.NewReaderSize(m.conn, 4096)
} }
return nil return nil
} }
@ -248,7 +248,7 @@ var (
) )
func (m *master) fullSync() error { func (m *master) fullSync() error {
if _, err := m.c.Write(fullSyncCmd); err != nil { if _, err := m.conn.Write(fullSyncCmd); err != nil {
return err return err
} }
@ -291,7 +291,7 @@ func (m *master) sync() error {
cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr), cmd := ledis.Slice(fmt.Sprintf(syncCmdFormat, len(logIndexStr),
logIndexStr, len(logPosStr), logPosStr)) logIndexStr, len(logPosStr), logPosStr))
if _, err := m.c.Write(cmd); err != nil { if _, err := m.conn.Write(cmd); err != nil {
return err return err
} }

245
server/tcp_io.go Normal file
View File

@ -0,0 +1,245 @@
package server
import (
"bufio"
"errors"
"github.com/siddontang/ledisdb/ledis"
"io"
"net"
"strconv"
)
type tcpContext struct {
conn net.Conn
}
type tcpWriter struct {
buff *bufio.Writer
}
type tcpReader struct {
buff *bufio.Reader
}
// tcp context
func newTcpContext(conn net.Conn) *tcpContext {
ctx := new(tcpContext)
ctx.conn = conn
return ctx
}
func (ctx *tcpContext) addr() string {
return ctx.conn.RemoteAddr().String()
}
func (ctx *tcpContext) release() {
if ctx.conn != nil {
ctx.conn.Close()
ctx.conn = nil
}
}
// tcp reader
func newTcpReader(conn net.Conn) *tcpReader {
r := new(tcpReader)
r.buff = bufio.NewReaderSize(conn, 256)
return r
}
func (r *tcpReader) readLine() ([]byte, error) {
return ReadLine(r.buff)
}
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
func (r *tcpReader) read() ([][]byte, error) {
l, err := r.readLine()
if err != nil {
return nil, err
} else if len(l) == 0 || l[0] != '*' {
return nil, errReadRequest
}
var nparams int
if nparams, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if nparams <= 0 {
return nil, errReadRequest
}
reqData := make([][]byte, 0, nparams)
var n int
for i := 0; i < nparams; i++ {
if l, err = r.readLine(); err != nil {
return nil, err
}
if len(l) == 0 {
return nil, errReadRequest
} else if l[0] == '$' {
//handle resp string
if n, err = strconv.Atoi(ledis.String(l[1:])); err != nil {
return nil, err
} else if n == -1 {
reqData = append(reqData, nil)
} else {
buf := make([]byte, n)
if _, err = io.ReadFull(r.buff, buf); err != nil {
return nil, err
}
if l, err = r.readLine(); err != nil {
return nil, err
} else if len(l) != 0 {
return nil, errors.New("bad bulk string format")
}
reqData = append(reqData, buf)
}
} else {
return nil, errReadRequest
}
}
return reqData, nil
}
// tcp writer
func newTcpWriter(conn net.Conn) *tcpWriter {
w := new(tcpWriter)
w.buff = bufio.NewWriterSize(conn, 256)
return w
}
func (w *tcpWriter) writeError(err error) {
w.buff.Write(ledis.Slice("-ERR"))
if err != nil {
w.buff.WriteByte(' ')
w.buff.Write(ledis.Slice(err.Error()))
}
w.buff.Write(Delims)
}
func (w *tcpWriter) writeStatus(status string) {
w.buff.WriteByte('+')
w.buff.Write(ledis.Slice(status))
w.buff.Write(Delims)
}
func (w *tcpWriter) writeInteger(n int64) {
w.buff.WriteByte(':')
w.buff.Write(ledis.StrPutInt64(n))
w.buff.Write(Delims)
}
func (w *tcpWriter) writeBulk(b []byte) {
w.buff.WriteByte('$')
if b == nil {
w.buff.Write(NullBulk)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(b))))
w.buff.Write(Delims)
w.buff.Write(b)
}
w.buff.Write(Delims)
}
func (w *tcpWriter) writeArray(lst []interface{}) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
switch v := lst[i].(type) {
case []interface{}:
w.writeArray(v)
case []byte:
w.writeBulk(v)
case nil:
w.writeBulk(nil)
case int64:
w.writeInteger(v)
default:
panic("invalid array type")
}
}
}
}
func (w *tcpWriter) writeSliceArray(lst [][]byte) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i])
}
}
}
func (w *tcpWriter) writeFVPairArray(lst []ledis.FVPair) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Field)
w.writeBulk(lst[i].Value)
}
}
}
func (w *tcpWriter) writeScorePairArray(lst []ledis.ScorePair, withScores bool) {
w.buff.WriteByte('*')
if lst == nil {
w.buff.Write(NullArray)
w.buff.Write(Delims)
} else {
if withScores {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst) * 2)))
w.buff.Write(Delims)
} else {
w.buff.Write(ledis.Slice(strconv.Itoa(len(lst))))
w.buff.Write(Delims)
}
for i := 0; i < len(lst); i++ {
w.writeBulk(lst[i].Member)
if withScores {
w.writeBulk(ledis.StrPutInt64(lst[i].Score))
}
}
}
}
func (w *tcpWriter) writeBulkFrom(n int64, rb io.Reader) {
w.buff.WriteByte('$')
w.buff.Write(ledis.Slice(strconv.FormatInt(n, 10)))
w.buff.Write(Delims)
io.Copy(w.buff, rb)
w.buff.Write(Delims)
}
func (w *tcpWriter) flush() {
w.buff.Flush()
}