forked from mirror/ledisdb
add base framework
This commit is contained in:
parent
d17eab01f8
commit
5cf0d47012
|
@ -0,0 +1,57 @@
|
|||
package ssdb
|
||||
|
||||
import (
|
||||
"github.com/siddontang/golib/leveldb"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type App struct {
|
||||
cfg *Config
|
||||
|
||||
listener net.Listener
|
||||
|
||||
db *leveldb.DB
|
||||
}
|
||||
|
||||
func NewApp(cfg *Config) (*App, error) {
|
||||
app := new(App)
|
||||
|
||||
app.cfg = cfg
|
||||
|
||||
var err error
|
||||
|
||||
if strings.Contains(cfg.Addr, "/") {
|
||||
app.listener, err = net.Listen("unix", cfg.Addr)
|
||||
} else {
|
||||
app.listener, err = net.Listen("tcp", cfg.Addr)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
app.db, err = leveldb.OpenWithConfig(&cfg.DB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func (app *App) Close() {
|
||||
app.listener.Close()
|
||||
|
||||
app.db.Close()
|
||||
}
|
||||
|
||||
func (app *App) Run() {
|
||||
for {
|
||||
conn, err := app.listener.Accept()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
newClient(conn, app)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,9 @@
|
|||
package ssdb
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestApp(t *testing.T) {
|
||||
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
package ssdb
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"github.com/siddontang/golib/hack"
|
||||
"github.com/siddontang/golib/log"
|
||||
"io"
|
||||
"net"
|
||||
"runtime"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
var errReadRequest = errors.New("read request error, invalid format")
|
||||
|
||||
type client struct {
|
||||
app *App
|
||||
c net.Conn
|
||||
|
||||
rb *bufio.Reader
|
||||
wb *bufio.Writer
|
||||
}
|
||||
|
||||
func newClient(c net.Conn, app *App) {
|
||||
co := new(client)
|
||||
co.app = app
|
||||
co.c = c
|
||||
|
||||
co.rb = bufio.NewReaderSize(c, 256)
|
||||
co.wb = bufio.NewWriterSize(c, 256)
|
||||
|
||||
go co.run()
|
||||
}
|
||||
|
||||
func (c *client) run() {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
buf := make([]byte, 4096)
|
||||
n := runtime.Stack(buf, false)
|
||||
buf = buf[0:n]
|
||||
|
||||
log.Fatal("client run panic %s:%v", buf, e)
|
||||
}
|
||||
|
||||
c.c.Close()
|
||||
}()
|
||||
|
||||
for {
|
||||
req, err := c.readRequest()
|
||||
if err != nil {
|
||||
log.Info("read request error %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
c.handleRequest(req)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) readLine() ([]byte, error) {
|
||||
var line []byte
|
||||
for {
|
||||
l, more, err := c.rb.ReadLine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if line == nil && !more {
|
||||
return l, nil
|
||||
}
|
||||
line = append(line, l...)
|
||||
if !more {
|
||||
break
|
||||
}
|
||||
}
|
||||
return line, nil
|
||||
}
|
||||
|
||||
//A client sends to the Redis server a RESP Array consisting of just Bulk Strings.
|
||||
func (c *client) readRequest() ([][]byte, error) {
|
||||
l, err := c.readLine()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if len(l) == 0 || l[0] != '*' {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
|
||||
var n int
|
||||
if n, err = strconv.Atoi(hack.String(l[1:])); err != nil {
|
||||
return nil, err
|
||||
} else if n <= 0 {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
|
||||
req := make([][]byte, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
if l, err = c.readLine(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(l) == 0 {
|
||||
return nil, errReadRequest
|
||||
} else if l[0] == '$' {
|
||||
//handle resp string
|
||||
if n, err = strconv.Atoi(hack.String(l[1:])); err != nil {
|
||||
return nil, err
|
||||
} else if n == -1 {
|
||||
req = append(req, nil)
|
||||
} else {
|
||||
buf := make([]byte, n+2)
|
||||
if _, err = io.ReadFull(c.rb, buf); err != nil {
|
||||
return nil, err
|
||||
} else if buf[len(buf)-2] != '\r' || buf[len(buf)-1] != '\n' {
|
||||
return nil, errReadRequest
|
||||
} else {
|
||||
req = append(req, buf[0:len(buf)-2])
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
return nil, errReadRequest
|
||||
}
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (c *client) handleRequest(req [][]byte) {
|
||||
var err error
|
||||
var r interface{}
|
||||
f, ok := regCmds[hack.String(req[0])]
|
||||
if !ok {
|
||||
err = ErrNotFound
|
||||
} else {
|
||||
r, err = f(c, req[1:])
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
c.writeError(err)
|
||||
} else {
|
||||
switch v := r.(type) {
|
||||
case string:
|
||||
c.writeStatus(v)
|
||||
case []byte:
|
||||
c.writeBulk(v)
|
||||
case int64:
|
||||
c.writeInteger(v)
|
||||
case []interface{}:
|
||||
c.writeArray(v)
|
||||
case nil:
|
||||
c.writeBulk(nil)
|
||||
default:
|
||||
panic("invalid type")
|
||||
}
|
||||
}
|
||||
|
||||
c.wb.Flush()
|
||||
}
|
||||
|
||||
func (c *client) writeError(err error) {
|
||||
c.wb.Write(hack.Slice("-ERR"))
|
||||
if err != nil {
|
||||
c.wb.WriteByte(' ')
|
||||
c.wb.Write(hack.Slice(err.Error()))
|
||||
}
|
||||
c.wb.Write(Delims)
|
||||
}
|
||||
|
||||
func (c *client) writeStatus(status string) {
|
||||
c.wb.WriteByte('+')
|
||||
c.wb.Write(hack.Slice(status))
|
||||
c.wb.Write(Delims)
|
||||
}
|
||||
|
||||
func (c *client) writeInteger(n int64) {
|
||||
c.wb.WriteByte(':')
|
||||
c.wb.Write(hack.Slice(strconv.FormatInt(n, 10)))
|
||||
c.wb.Write(Delims)
|
||||
}
|
||||
|
||||
func (c *client) writeBulk(b []byte) {
|
||||
c.wb.WriteByte('$')
|
||||
if b == nil {
|
||||
c.wb.Write(NullBulk)
|
||||
} else {
|
||||
c.wb.Write(hack.Slice(strconv.Itoa(len(b))))
|
||||
c.wb.Write(Delims)
|
||||
c.wb.Write(b)
|
||||
}
|
||||
|
||||
c.wb.Write(Delims)
|
||||
}
|
||||
|
||||
func (c *client) writeArray(ay []interface{}) {
|
||||
c.wb.WriteByte('*')
|
||||
if ay == nil {
|
||||
c.wb.Write(NullArray)
|
||||
c.wb.Write(Delims)
|
||||
} else {
|
||||
c.wb.Write(hack.Slice(strconv.Itoa(len(ay))))
|
||||
c.wb.Write(Delims)
|
||||
|
||||
for i := 0; i < len(ay); i++ {
|
||||
switch v := ay[i].(type) {
|
||||
case []interface{}:
|
||||
c.writeArray(v)
|
||||
case []byte:
|
||||
c.writeBulk(v)
|
||||
case nil:
|
||||
c.writeBulk(nil)
|
||||
case int64:
|
||||
c.writeInteger(v)
|
||||
default:
|
||||
panic("invalid array type")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
package ssdb
|
||||
|
||||
import ()
|
||||
|
||||
type CommandFunc func(c *client, args [][]byte) (interface{}, error)
|
||||
|
||||
var regCmds = map[string]CommandFunc{}
|
|
@ -0,0 +1,33 @@
|
|||
package ssdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/siddontang/golib/leveldb"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Addr string `json:"addr"`
|
||||
|
||||
DB leveldb.Config `json:"leveldb"`
|
||||
}
|
||||
|
||||
func NewConfig(data json.RawMessage) (*Config, error) {
|
||||
c := new(Config)
|
||||
|
||||
err := json.Unmarshal(data, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func NewConfigWithFile(fileName string) (*Config, error) {
|
||||
data, err := ioutil.ReadFile(fileName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConfig(data)
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package ssdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrEmptyCommand = errors.New("empty command")
|
||||
ErrNotFound = errors.New("command not found")
|
||||
)
|
||||
|
||||
var (
|
||||
Delims = []byte("\r\n")
|
||||
|
||||
NullBulk = []byte("-1")
|
||||
NullArray = []byte("-1")
|
||||
)
|
Loading…
Reference in New Issue