From 5cf0d4701271a6f7aa8bf6af7d18114a17f23aae Mon Sep 17 00:00:00 2001 From: siddontang Date: Fri, 2 May 2014 17:08:20 +0800 Subject: [PATCH] add base framework --- ssdb/app.go | 57 +++++++++++++ ssdb/app_test.go | 9 ++ ssdb/client.go | 217 +++++++++++++++++++++++++++++++++++++++++++++++ ssdb/command.go | 7 ++ ssdb/config.go | 33 +++++++ ssdb/const.go | 17 ++++ 6 files changed, 340 insertions(+) create mode 100644 ssdb/app.go create mode 100644 ssdb/app_test.go create mode 100644 ssdb/client.go create mode 100644 ssdb/command.go create mode 100644 ssdb/config.go create mode 100644 ssdb/const.go diff --git a/ssdb/app.go b/ssdb/app.go new file mode 100644 index 0000000..d9dd0ed --- /dev/null +++ b/ssdb/app.go @@ -0,0 +1,57 @@ +package ssdb + +import ( + "github.com/siddontang/golib/leveldb" + "net" + "strings" +) + +type App struct { + cfg *Config + + listener net.Listener + + db *leveldb.DB +} + +func NewApp(cfg *Config) (*App, error) { + app := new(App) + + app.cfg = cfg + + var err error + + if strings.Contains(cfg.Addr, "/") { + app.listener, err = net.Listen("unix", cfg.Addr) + } else { + app.listener, err = net.Listen("tcp", cfg.Addr) + } + + if err != nil { + return nil, err + } + + app.db, err = leveldb.OpenWithConfig(&cfg.DB) + if err != nil { + return nil, err + } + + return app, nil +} + +func (app *App) Close() { + app.listener.Close() + + app.db.Close() +} + +func (app *App) Run() { + for { + conn, err := app.listener.Accept() + if err != nil { + continue + } + + newClient(conn, app) + } +} diff --git a/ssdb/app_test.go b/ssdb/app_test.go new file mode 100644 index 0000000..28f2e2a --- /dev/null +++ b/ssdb/app_test.go @@ -0,0 +1,9 @@ +package ssdb + +import ( + "testing" +) + +func TestApp(t *testing.T) { + +} diff --git a/ssdb/client.go b/ssdb/client.go new file mode 100644 index 0000000..a5fa96b --- /dev/null +++ b/ssdb/client.go @@ -0,0 +1,217 @@ +package ssdb + +import ( + "bufio" + "errors" + "github.com/siddontang/golib/hack" + "github.com/siddontang/golib/log" + "io" + "net" + "runtime" + "strconv" +) + +var errReadRequest = errors.New("read request error, invalid format") + +type client struct { + app *App + c net.Conn + + rb *bufio.Reader + wb *bufio.Writer +} + +func newClient(c net.Conn, app *App) { + co := new(client) + co.app = app + co.c = c + + co.rb = bufio.NewReaderSize(c, 256) + co.wb = bufio.NewWriterSize(c, 256) + + go co.run() +} + +func (c *client) run() { + defer func() { + if e := recover(); e != nil { + buf := make([]byte, 4096) + n := runtime.Stack(buf, false) + buf = buf[0:n] + + log.Fatal("client run panic %s:%v", buf, e) + } + + c.c.Close() + }() + + for { + req, err := c.readRequest() + if err != nil { + log.Info("read request error %v", err) + return + } + + c.handleRequest(req) + } +} + +func (c *client) readLine() ([]byte, error) { + var line []byte + for { + l, more, err := c.rb.ReadLine() + if err != nil { + return nil, err + } + + if line == nil && !more { + return l, nil + } + line = append(line, l...) + if !more { + break + } + } + return line, nil +} + +//A client sends to the Redis server a RESP Array consisting of just Bulk Strings. +func (c *client) readRequest() ([][]byte, error) { + l, err := c.readLine() + if err != nil { + return nil, err + } else if len(l) == 0 || l[0] != '*' { + return nil, errReadRequest + } + + var n int + if n, err = strconv.Atoi(hack.String(l[1:])); err != nil { + return nil, err + } else if n <= 0 { + return nil, errReadRequest + } + + req := make([][]byte, 0, n) + for i := 0; i < n; i++ { + if l, err = c.readLine(); err != nil { + return nil, err + } + + if len(l) == 0 { + return nil, errReadRequest + } else if l[0] == '$' { + //handle resp string + if n, err = strconv.Atoi(hack.String(l[1:])); err != nil { + return nil, err + } else if n == -1 { + req = append(req, nil) + } else { + buf := make([]byte, n+2) + if _, err = io.ReadFull(c.rb, buf); err != nil { + return nil, err + } else if buf[len(buf)-2] != '\r' || buf[len(buf)-1] != '\n' { + return nil, errReadRequest + } else { + req = append(req, buf[0:len(buf)-2]) + } + } + + } else { + return nil, errReadRequest + } + } + + return req, nil +} + +func (c *client) handleRequest(req [][]byte) { + var err error + var r interface{} + f, ok := regCmds[hack.String(req[0])] + if !ok { + err = ErrNotFound + } else { + r, err = f(c, req[1:]) + } + + if err != nil { + c.writeError(err) + } else { + switch v := r.(type) { + case string: + c.writeStatus(v) + case []byte: + c.writeBulk(v) + case int64: + c.writeInteger(v) + case []interface{}: + c.writeArray(v) + case nil: + c.writeBulk(nil) + default: + panic("invalid type") + } + } + + c.wb.Flush() +} + +func (c *client) writeError(err error) { + c.wb.Write(hack.Slice("-ERR")) + if err != nil { + c.wb.WriteByte(' ') + c.wb.Write(hack.Slice(err.Error())) + } + c.wb.Write(Delims) +} + +func (c *client) writeStatus(status string) { + c.wb.WriteByte('+') + c.wb.Write(hack.Slice(status)) + c.wb.Write(Delims) +} + +func (c *client) writeInteger(n int64) { + c.wb.WriteByte(':') + c.wb.Write(hack.Slice(strconv.FormatInt(n, 10))) + c.wb.Write(Delims) +} + +func (c *client) writeBulk(b []byte) { + c.wb.WriteByte('$') + if b == nil { + c.wb.Write(NullBulk) + } else { + c.wb.Write(hack.Slice(strconv.Itoa(len(b)))) + c.wb.Write(Delims) + c.wb.Write(b) + } + + c.wb.Write(Delims) +} + +func (c *client) writeArray(ay []interface{}) { + c.wb.WriteByte('*') + if ay == nil { + c.wb.Write(NullArray) + c.wb.Write(Delims) + } else { + c.wb.Write(hack.Slice(strconv.Itoa(len(ay)))) + c.wb.Write(Delims) + + for i := 0; i < len(ay); i++ { + switch v := ay[i].(type) { + case []interface{}: + c.writeArray(v) + case []byte: + c.writeBulk(v) + case nil: + c.writeBulk(nil) + case int64: + c.writeInteger(v) + default: + panic("invalid array type") + } + } + } +} diff --git a/ssdb/command.go b/ssdb/command.go new file mode 100644 index 0000000..621fc3a --- /dev/null +++ b/ssdb/command.go @@ -0,0 +1,7 @@ +package ssdb + +import () + +type CommandFunc func(c *client, args [][]byte) (interface{}, error) + +var regCmds = map[string]CommandFunc{} diff --git a/ssdb/config.go b/ssdb/config.go new file mode 100644 index 0000000..307e2f2 --- /dev/null +++ b/ssdb/config.go @@ -0,0 +1,33 @@ +package ssdb + +import ( + "encoding/json" + "github.com/siddontang/golib/leveldb" + "io/ioutil" +) + +type Config struct { + Addr string `json:"addr"` + + DB leveldb.Config `json:"leveldb"` +} + +func NewConfig(data json.RawMessage) (*Config, error) { + c := new(Config) + + err := json.Unmarshal(data, c) + if err != nil { + return nil, err + } + + return c, nil +} + +func NewConfigWithFile(fileName string) (*Config, error) { + data, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + + return NewConfig(data) +} diff --git a/ssdb/const.go b/ssdb/const.go new file mode 100644 index 0000000..f7e8e34 --- /dev/null +++ b/ssdb/const.go @@ -0,0 +1,17 @@ +package ssdb + +import ( + "errors" +) + +var ( + ErrEmptyCommand = errors.New("empty command") + ErrNotFound = errors.New("command not found") +) + +var ( + Delims = []byte("\r\n") + + NullBulk = []byte("-1") + NullArray = []byte("-1") +)