Merge branch 'develop'

This commit is contained in:
siddontang 2014-10-23 13:55:56 +08:00
commit b1689b21fa
8 changed files with 144 additions and 145 deletions

2
Godeps/Godeps.json generated
View File

@ -48,7 +48,7 @@
}, },
{ {
"ImportPath": "github.com/siddontang/goleveldb/leveldb", "ImportPath": "github.com/siddontang/goleveldb/leveldb",
"Rev": "614b6126cf79eee8be803e7e65a3ef24fc12e44a" "Rev": "71404b29ccd98b94ec2278afa806d59a11cd0d28"
}, },
{ {
"ImportPath": "github.com/szferi/gomdb", "ImportPath": "github.com/szferi/gomdb",

View File

@ -20,22 +20,23 @@ type Client struct {
sync.Mutex sync.Mutex
cfg *Config cfg *Config
proto string
conns *list.List conns *list.List
} }
func getProto(addr string) string {
if strings.Contains(addr, "/") {
return "unix"
} else {
return "tcp"
}
}
func NewClient(cfg *Config) *Client { func NewClient(cfg *Config) *Client {
c := new(Client) c := new(Client)
c.cfg = cfg c.cfg = cfg
if strings.Contains(cfg.Addr, "/") {
c.proto = "unix"
} else {
c.proto = "tcp"
}
c.conns = list.New() c.conns = list.New()
return c return c
@ -71,7 +72,7 @@ func (c *Client) get() *Conn {
if c.conns.Len() == 0 { if c.conns.Len() == 0 {
c.Unlock() c.Unlock()
return c.newConn() return c.newConn(c.cfg.Addr)
} else { } else {
e := c.conns.Front() e := c.conns.Front()
co := e.Value.(*Conn) co := e.Value.(*Conn)

View File

@ -19,10 +19,15 @@ func (err Error) Error() string { return string(err) }
type Conn struct { type Conn struct {
client *Client client *Client
addr string
c net.Conn c net.Conn
br *bufio.Reader br *bufio.Reader
bw *bufio.Writer bw *bufio.Writer
rSize int
wSize int
lastActive time.Time lastActive time.Time
// Scratch space for formatting argument length. // Scratch space for formatting argument length.
@ -33,25 +38,57 @@ type Conn struct {
numScratch [40]byte numScratch [40]byte
} }
func NewConn(addr string) *Conn {
co := new(Conn)
co.addr = addr
co.rSize = 4096
co.wSize = 4096
return co
}
func NewConnSize(addr string, readSize int, writeSize int) *Conn {
co := NewConn(addr)
co.rSize = readSize
co.wSize = writeSize
return co
}
func (c *Conn) Close() { func (c *Conn) Close() {
if c.client != nil {
c.client.put(c) c.client.put(c)
} else {
c.finalize()
}
} }
func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) { func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
if err := c.connect(); err != nil { if err := c.Send(cmd, args...); err != nil {
return nil, err return nil, err
} }
return c.Receive()
}
func (c *Conn) Send(cmd string, args ...interface{}) error {
if err := c.connect(); err != nil {
return err
}
if err := c.writeCommand(cmd, args); err != nil { if err := c.writeCommand(cmd, args); err != nil {
c.finalize() c.finalize()
return nil, err return err
} }
if err := c.bw.Flush(); err != nil { if err := c.bw.Flush(); err != nil {
c.finalize() c.finalize()
return nil, err return err
} }
return nil
}
func (c *Conn) Receive() (interface{}, error) {
if reply, err := c.readReply(); err != nil { if reply, err := c.readReply(); err != nil {
c.finalize() c.finalize()
return nil, err return nil, err
@ -64,6 +101,16 @@ func (c *Conn) Do(cmd string, args ...interface{}) (interface{}, error) {
} }
} }
func (c *Conn) ReceiveBulkTo(w io.Writer) error {
err := c.readBulkReplyTo(w)
if err != nil {
if _, ok := err.(Error); !ok {
c.finalize()
}
}
return err
}
func (c *Conn) finalize() { func (c *Conn) finalize() {
if c.c != nil { if c.c != nil {
c.c.Close() c.c.Close()
@ -77,7 +124,7 @@ func (c *Conn) connect() error {
} }
var err error var err error
c.c, err = net.Dial(c.client.proto, c.client.cfg.Addr) c.c, err = net.Dial(getProto(c.addr), c.addr)
if err != nil { if err != nil {
return err return err
} }
@ -85,13 +132,13 @@ func (c *Conn) connect() error {
if c.br != nil { if c.br != nil {
c.br.Reset(c.c) c.br.Reset(c.c)
} else { } else {
c.br = bufio.NewReader(c.c) c.br = bufio.NewReaderSize(c.c, c.rSize)
} }
if c.bw != nil { if c.bw != nil {
c.bw.Reset(c.c) c.bw.Reset(c.c)
} else { } else {
c.bw = bufio.NewWriter(c.c) c.bw = bufio.NewWriterSize(c.c, c.wSize)
} }
return nil return nil
@ -244,6 +291,41 @@ var (
pongReply interface{} = "PONG" pongReply interface{} = "PONG"
) )
func (c *Conn) readBulkReplyTo(w io.Writer) error {
line, err := c.readLine()
if err != nil {
return err
}
if len(line) == 0 {
return errors.New("ledis: short response line")
}
switch line[0] {
case '-':
return Error(string(line[1:]))
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return err
}
var nn int64
if nn, err = io.CopyN(w, c.br, int64(n)); err != nil {
return err
} else if nn != int64(n) {
return io.ErrShortWrite
}
if line, err := c.readLine(); err != nil {
return err
} else if len(line) != 0 {
return errors.New("ledis: bad bulk string format")
}
return nil
default:
return fmt.Errorf("ledis: not invalid bulk string type, but %c", line[0])
}
}
func (c *Conn) readReply() (interface{}, error) { func (c *Conn) readReply() (interface{}, error) {
line, err := c.readLine() line, err := c.readLine()
if err != nil { if err != nil {
@ -301,8 +383,8 @@ func (c *Conn) readReply() (interface{}, error) {
return nil, errors.New("ledis: unexpected response line") return nil, errors.New("ledis: unexpected response line")
} }
func (c *Client) newConn() *Conn { func (c *Client) newConn(addr string) *Conn {
co := new(Conn) co := NewConn(addr)
co.client = c co.client = c
return co return co

View File

@ -18,7 +18,7 @@ var clients = flag.Int("c", 50, "number of clients")
var reverse = flag.Bool("rev", false, "enable zset rev benchmark") var reverse = flag.Bool("rev", false, "enable zset rev benchmark")
var round = flag.Int("r", 1, "benchmark round number") var round = flag.Int("r", 1, "benchmark round number")
var del = flag.Bool("del", true, "enable del benchmark") var del = flag.Bool("del", true, "enable del benchmark")
var valueSize = flag.Int("vsize", 100, "kv value size")
var wg sync.WaitGroup var wg sync.WaitGroup
var client *ledis.Client var client *ledis.Client
@ -65,7 +65,7 @@ var kvDelBase int64 = 0
func benchSet() { func benchSet() {
f := func() { f := func() {
value := make([]byte, 100) value := make([]byte, *valueSize)
crand.Read(value) crand.Read(value)
n := atomic.AddInt64(&kvSetBase, 1) n := atomic.AddInt64(&kvSetBase, 1)
waitBench("set", n, value) waitBench("set", n, value)
@ -103,7 +103,7 @@ func benchDel() {
func benchPushList() { func benchPushList() {
f := func() { f := func() {
value := make([]byte, 10) value := make([]byte, 100)
crand.Read(value) crand.Read(value)
waitBench("rpush", "mytestlist", value) waitBench("rpush", "mytestlist", value)
} }

View File

@ -1,11 +1,9 @@
package main package main
import ( import (
"bufio"
"flag" "flag"
"fmt" "fmt"
"github.com/siddontang/ledisdb/server" "github.com/siddontang/ledisdb/client/go/ledis"
"net"
"os" "os"
) )
@ -14,12 +12,9 @@ var port = flag.Int("port", 6380, "ledis server port")
var sock = flag.String("sock", "", "ledis unix socket domain") var sock = flag.String("sock", "", "ledis unix socket domain")
var dumpFile = flag.String("o", "./ledis.dump", "dump file to save") var dumpFile = flag.String("o", "./ledis.dump", "dump file to save")
var fullSyncCmd = []byte("*2\r\n$8\r\nfullsync\r\n$3\r\nnew\r\n") //fullsync
func main() { func main() {
flag.Parse() flag.Parse()
var c net.Conn
var err error var err error
var f *os.File var f *os.File
@ -30,30 +25,25 @@ func main() {
defer f.Close() defer f.Close()
var addr string
if len(*sock) != 0 { if len(*sock) != 0 {
c, err = net.Dial("unix", *sock) addr = *sock
} else { } else {
addr := fmt.Sprintf("%s:%d", *host, *port) addr = fmt.Sprintf("%s:%d", *host, *port)
c, err = net.Dial("tcp", addr)
} }
if err != nil { c := ledis.NewConnSize(addr, 16*1024, 4096)
println(err.Error())
return
}
defer c.Close() defer c.Close()
println("dump begin") println("dump begin")
if _, err = c.Write(fullSyncCmd); err != nil { if err = c.Send("fullsync"); err != nil {
println(err.Error()) println(err.Error())
return return
} }
rb := bufio.NewReaderSize(c, 16*1024) if err = c.ReceiveBulkTo(f); err != nil {
if err = server.ReadBulkTo(rb, f); err != nil {
println(err.Error()) println(err.Error())
return return
} }

View File

@ -21,7 +21,7 @@ func TestDump(t *testing.T) {
cfgS := config.NewConfigDefault() cfgS := config.NewConfigDefault()
cfgS.DataDir = "/tmp/test_ledis_slave" cfgS.DataDir = "/tmp/test_ledis_slave"
os.RemoveAll(cfgM.DataDir) os.RemoveAll(cfgS.DataDir)
var slave *Ledis var slave *Ledis
if slave, err = Open(cfgS); err != nil { if slave, err = Open(cfgS); err != nil {

View File

@ -1,19 +1,17 @@
package server package server
import ( import (
"bufio"
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
"github.com/siddontang/go/hack"
"github.com/siddontang/go/log" "github.com/siddontang/go/log"
"github.com/siddontang/go/num" "github.com/siddontang/go/num"
goledis "github.com/siddontang/ledisdb/client/go/ledis"
"github.com/siddontang/ledisdb/ledis" "github.com/siddontang/ledisdb/ledis"
"github.com/siddontang/ledisdb/rpl" "github.com/siddontang/ledisdb/rpl"
"net" "net"
"os" "os"
"path" "path"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -26,8 +24,7 @@ var (
type master struct { type master struct {
sync.Mutex sync.Mutex
conn net.Conn conn *goledis.Conn
rb *bufio.Reader
app *App app *App
@ -49,16 +46,12 @@ func newMaster(app *App) *master {
return m return m
} }
var (
quitCmd = []byte("*1\r\n$4\r\nquit\r\n")
)
func (m *master) Close() { func (m *master) Close() {
ledis.AsyncNotify(m.quit) ledis.AsyncNotify(m.quit)
if m.conn != nil { if m.conn != nil {
//for replication, we send quit command to close gracefully //for replication, we send quit command to close gracefully
m.conn.Write(quitCmd) m.conn.Send("quit")
m.conn.Close() m.conn.Close()
m.conn = nil m.conn = nil
@ -67,7 +60,7 @@ func (m *master) Close() {
m.wg.Wait() m.wg.Wait()
} }
func (m *master) connect() error { func (m *master) resetConn() error {
if len(m.addr) == 0 { if len(m.addr) == 0 {
return fmt.Errorf("no assign master addr") return fmt.Errorf("no assign master addr")
} }
@ -77,13 +70,7 @@ func (m *master) connect() error {
m.conn = nil m.conn = nil
} }
if conn, err := net.Dial("tcp", m.addr); err != nil { m.conn = goledis.NewConn(m.addr)
return err
} else {
m.conn = conn
m.rb = bufio.NewReaderSize(m.conn, 4096)
}
return nil return nil
} }
@ -112,13 +99,18 @@ func (m *master) startReplication(masterAddr string, restart bool) error {
func (m *master) runReplication(restart bool) { func (m *master) runReplication(restart bool) {
defer m.wg.Done() defer m.wg.Done()
if err := m.resetConn(); err != nil {
log.Error("reset conn error %s", err.Error())
return
}
for { for {
select { select {
case <-m.quit: case <-m.quit:
return return
default: default:
if err := m.connect(); err != nil { if _, err := m.conn.Do("ping"); err != nil {
log.Error("connect master %s error %s, try 2s later", m.addr, err.Error()) log.Error("ping master %s error %s, try 2s later", m.addr, err.Error())
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
continue continue
} }
@ -160,27 +152,15 @@ func (m *master) runReplication(restart bool) {
return return
} }
var (
fullSyncCmd = []byte("*1\r\n$8\r\nfullsync\r\n") //fullsync
syncCmdFormat = "*2\r\n$4\r\nsync\r\n$%d\r\n%s\r\n" //sync logid
replconfCmdFormat = "*3\r\n$8\r\nreplconf\r\n$14\r\nlistening-port\r\n$%d\r\n%s\r\n" //replconf listening-port port
)
func (m *master) replConf() error { func (m *master) replConf() error {
_, port, err := net.SplitHostPort(m.app.cfg.Addr) _, port, err := net.SplitHostPort(m.app.cfg.Addr)
if err != nil { if err != nil {
return err return err
} }
cmd := hack.Slice(fmt.Sprintf(replconfCmdFormat, len(port), port)) if s, err := goledis.String(m.conn.Do("replconf", "listening-port", port)); err != nil {
if _, err := m.conn.Write(cmd); err != nil {
return err return err
} } else if strings.ToUpper(s) != "OK" {
if s, err := ReadStatus(m.rb); err != nil {
return err
} else if strings.ToLower(s) != "ok" {
return fmt.Errorf("not ok but %s", s) return fmt.Errorf("not ok but %s", s)
} }
@ -190,7 +170,7 @@ func (m *master) replConf() error {
func (m *master) fullSync() error { func (m *master) fullSync() error {
log.Info("begin full sync") log.Info("begin full sync")
if _, err := m.conn.Write(fullSyncCmd); err != nil { if err := m.conn.Send("fullsync"); err != nil {
return err return err
} }
@ -202,7 +182,7 @@ func (m *master) fullSync() error {
defer os.Remove(dumpPath) defer os.Remove(dumpPath)
err = ReadBulkTo(m.rb, f) err = m.conn.ReceiveBulkTo(f)
f.Close() f.Close()
if err != nil { if err != nil {
log.Error("read dump data error %s", err.Error()) log.Error("read dump data error %s", err.Error())
@ -237,18 +217,13 @@ func (m *master) sync() error {
return err return err
} }
logIDStr := strconv.FormatUint(syncID, 10) if err := m.conn.Send("sync", syncID); err != nil {
cmd := hack.Slice(fmt.Sprintf(syncCmdFormat, len(logIDStr),
logIDStr))
if _, err := m.conn.Write(cmd); err != nil {
return err return err
} }
m.syncBuf.Reset() m.syncBuf.Reset()
if err = ReadBulkTo(m.rb, &m.syncBuf); err != nil { if err = m.conn.ReceiveBulkTo(&m.syncBuf); err != nil {
switch err.Error() { switch err.Error() {
case ledis.ErrLogMissed.Error(): case ledis.ErrLogMissed.Error():
return m.fullSync() return m.fullSync()
@ -384,7 +359,9 @@ func (app *App) publishNewLog(l *rpl.Log) {
app.slock.Lock() app.slock.Lock()
total := (len(app.slaves) + 1) / 2 slaveNum := len(app.slaves)
total := (slaveNum + 1) / 2
if app.cfg.Replication.WaitMaxSlaveAcks > 0 { if app.cfg.Replication.WaitMaxSlaveAcks > 0 {
total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks) total = num.MinInt(total, app.cfg.Replication.WaitMaxSlaveAcks)
} }
@ -409,15 +386,21 @@ func (app *App) publishNewLog(l *rpl.Log) {
startTime := time.Now() startTime := time.Now()
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func(total int) { go func() {
for i := 0; i < total; i++ { n := 0
for i := 0; i < slaveNum; i++ {
id := <-app.slaveSyncAck id := <-app.slaveSyncAck
if id < logId { if id < logId {
log.Info("some slave may close with last logid %d < %d", id, logId) log.Info("some slave may close with last logid %d < %d", id, logId)
} else {
n++
if n >= total {
break
}
} }
} }
done <- struct{}{} done <- struct{}{}
}(total) }()
select { select {
case <-done: case <-done:

View File

@ -3,16 +3,10 @@ package server
import ( import (
"bufio" "bufio"
"errors" "errors"
"github.com/siddontang/go/hack"
"io"
"strconv"
) )
var ( var (
errArrayFormat = errors.New("bad array format")
errBulkFormat = errors.New("bad bulk string format")
errLineFormat = errors.New("bad response line format") errLineFormat = errors.New("bad response line format")
errStatusFormat = errors.New("bad status format")
) )
func ReadLine(rb *bufio.Reader) ([]byte, error) { func ReadLine(rb *bufio.Reader) ([]byte, error) {
@ -27,54 +21,3 @@ func ReadLine(rb *bufio.Reader) ([]byte, error) {
} }
return p[:i], nil return p[:i], nil
} }
func ReadBulkTo(rb *bufio.Reader, w io.Writer) error {
l, err := ReadLine(rb)
if err != nil {
return err
} else if len(l) == 0 {
return errBulkFormat
} else if l[0] == '$' {
var n int
//handle resp string
if n, err = strconv.Atoi(hack.String(l[1:])); err != nil {
return err
} else if n == -1 {
return nil
} else {
var nn int64
if nn, err = io.CopyN(w, rb, int64(n)); err != nil {
return err
} else if nn != int64(n) {
return io.ErrShortWrite
}
if l, err = ReadLine(rb); err != nil {
return err
} else if len(l) != 0 {
return errBulkFormat
}
}
} else if l[0] == '-' {
return errors.New(string(l[1:]))
} else {
return errBulkFormat
}
return nil
}
func ReadStatus(rb *bufio.Reader) (string, error) {
l, err := ReadLine(rb)
if err != nil {
return "", err
} else if len(l) == 0 {
return "", errStatusFormat
} else if l[0] == '+' {
return string(l[1:]), nil
} else if l[0] == '-' {
return "", errors.New(string(l[1:]))
} else {
return "", errStatusFormat
}
}