ledisdb/server/app.go

274 lines
4.6 KiB
Go
Raw Normal View History

package server
2014-05-02 13:08:20 +04:00
import (
"fmt"
2014-05-02 13:08:20 +04:00
"net"
2014-07-17 11:19:46 +04:00
"net/http"
"os"
2014-06-11 12:48:11 +04:00
"path"
"strconv"
2014-05-02 13:08:20 +04:00
"strings"
"sync"
2015-05-04 17:42:28 +03:00
2017-06-12 03:05:42 +03:00
"crypto/tls"
2015-05-04 17:42:28 +03:00
"github.com/siddontang/goredis"
"github.com/siddontang/ledisdb/config"
"github.com/siddontang/ledisdb/ledis"
2014-05-02 13:08:20 +04:00
)
type App struct {
cfg *config.Config
2014-05-02 13:08:20 +04:00
2014-07-17 11:19:46 +04:00
listener net.Listener
httpListener net.Listener
2014-05-02 13:08:20 +04:00
2014-05-20 04:41:24 +04:00
ldb *ledis.Ledis
2014-05-08 06:54:33 +04:00
closed bool
quit chan struct{}
2014-06-08 12:43:59 +04:00
2014-06-11 12:48:11 +04:00
access *accessLog
2014-06-08 12:43:59 +04:00
//for slave replication
2014-06-09 13:23:32 +04:00
m *master
2014-08-25 10:18:23 +04:00
info *info
2014-09-02 13:55:12 +04:00
script *script
// handle slaves
2014-10-21 13:35:03 +04:00
slock sync.Mutex
slaves map[string]*client
slaveSyncAck chan uint64
2014-10-11 13:44:31 +04:00
snap *snapshotStore
connWait sync.WaitGroup
rcm sync.Mutex
rcs map[*respClient]struct{}
2014-12-01 12:50:48 +03:00
migrateM sync.Mutex
2015-03-11 06:54:02 +03:00
migrateClients map[string]*goredis.Client
migrateKeyLockers map[string]*migrateKeyLocker
2014-05-02 13:08:20 +04:00
}
2014-07-17 11:19:46 +04:00
func netType(s string) string {
if strings.Contains(s, "/") {
return "unix"
}
return "tcp"
2014-07-17 11:19:46 +04:00
}
2017-06-12 03:05:42 +03:00
func tlsConfig(c *config.TLS) (*tls.Config, error) {
crt, err := tls.LoadX509KeyPair(c.Certificate, c.Key)
if err != nil {
return nil, err
}
return &tls.Config{
Certificates: []tls.Certificate{
crt,
},
}, nil
}
func listen(netType, laddr string, tlsCfg *tls.Config) (net.Listener, error) {
if tlsCfg != nil {
return tls.Listen(netType, laddr, tlsCfg)
}
return net.Listen(netType, laddr)
}
func NewApp(cfg *config.Config) (*App, error) {
2014-06-06 07:25:13 +04:00
if len(cfg.DataDir) == 0 {
println("use default datadir %s", config.DefaultDataDir)
cfg.DataDir = config.DefaultDataDir
2014-06-06 07:25:13 +04:00
}
2014-05-02 13:08:20 +04:00
app := new(App)
app.quit = make(chan struct{})
2014-05-08 06:54:33 +04:00
app.closed = false
2014-05-02 13:08:20 +04:00
app.cfg = cfg
2014-10-21 13:35:03 +04:00
app.slaves = make(map[string]*client)
app.slaveSyncAck = make(chan uint64)
app.rcs = make(map[*respClient]struct{})
2015-03-11 06:54:02 +03:00
app.migrateClients = make(map[string]*goredis.Client)
app.newMigrateKeyLockers()
2014-12-01 12:50:48 +03:00
2014-05-02 13:08:20 +04:00
var err error
2014-08-25 10:18:23 +04:00
if app.info, err = newInfo(app); err != nil {
return nil, err
}
2017-06-12 03:05:42 +03:00
var tlsCfg *tls.Config
if cfg.TLS.Enabled {
tlsCfg, err = tlsConfig(&cfg.TLS)
if err != nil {
return nil, err
}
}
if cfg.Addr != "" {
addrNetType := netType(cfg.Addr)
2014-05-02 13:08:20 +04:00
2017-06-12 03:05:42 +03:00
if app.listener, err = listen(addrNetType, cfg.Addr, tlsCfg); err != nil {
return nil, err
}
if addrNetType == "unix" && len(cfg.AddrUnixSocketPerm) > 0 {
var perm int64
if perm, err = strconv.ParseInt(cfg.AddrUnixSocketPerm, 8, 32); err != nil {
return nil, err
}
if err = os.Chmod(cfg.Addr, os.FileMode(perm)); err != nil {
return nil, err
}
}
} else {
app.listener, err = net.Listen("tcp", "127.0.0.1:0")
if err != nil {
if app.listener, err = net.Listen("tcp6", "[::1]:0"); err != nil {
return nil, fmt.Errorf("app: failed to listen on a port: %v", err)
}
}
}
2014-07-17 11:19:46 +04:00
if len(cfg.HttpAddr) > 0 {
2017-06-12 03:05:42 +03:00
if app.httpListener, err = listen(netType(cfg.HttpAddr), cfg.HttpAddr, tlsCfg); err != nil {
2014-07-17 11:19:46 +04:00
return nil, err
}
2014-05-02 13:08:20 +04:00
}
2014-06-11 12:48:11 +04:00
if len(cfg.AccessLog) > 0 {
if path.Dir(cfg.AccessLog) == "." {
app.access, err = newAcessLog(path.Join(cfg.DataDir, cfg.AccessLog))
} else {
app.access, err = newAcessLog(cfg.AccessLog)
}
if err != nil {
return nil, err
}
}
2014-10-11 13:44:31 +04:00
if app.snap, err = newSnapshotStore(cfg); err != nil {
return nil, err
}
2014-09-25 12:03:29 +04:00
if len(app.cfg.SlaveOf) > 0 {
//slave must readonly
2014-10-10 05:49:16 +04:00
app.cfg.Readonly = true
2014-09-25 12:03:29 +04:00
}
2014-10-10 05:49:16 +04:00
if app.ldb, err = ledis.Open(cfg); err != nil {
2014-05-02 13:08:20 +04:00
return nil, err
}
2014-06-09 13:23:32 +04:00
app.m = newMaster(app)
2014-09-02 13:55:12 +04:00
app.openScript()
app.ldb.AddNewLogEventHandler(app.publishNewLog)
2014-05-02 13:08:20 +04:00
return app, nil
}
func (app *App) Close() {
2014-05-08 06:54:33 +04:00
if app.closed {
return
}
app.closed = true
close(app.quit)
2014-05-02 13:08:20 +04:00
app.listener.Close()
2014-12-01 12:50:48 +03:00
//close all migrate connections
app.migrateM.Lock()
for k, c := range app.migrateClients {
2014-12-01 12:50:48 +03:00
c.Close()
delete(app.migrateClients, k)
2014-12-01 12:50:48 +03:00
}
app.migrateM.Unlock()
2014-12-01 12:50:48 +03:00
2014-07-17 11:19:46 +04:00
if app.httpListener != nil {
app.httpListener.Close()
}
app.closeAllRespClients()
//wait all connection closed
app.connWait.Wait()
2014-09-02 13:55:12 +04:00
app.closeScript()
app.m.Lock()
2014-06-09 13:23:32 +04:00
app.m.Close()
app.m.Unlock()
2014-06-09 13:23:32 +04:00
2014-10-11 13:44:31 +04:00
app.snap.Close()
2014-06-11 12:48:11 +04:00
if app.access != nil {
app.access.Close()
}
2014-05-20 04:41:24 +04:00
app.ldb.Close()
2014-05-02 13:08:20 +04:00
}
func (app *App) Run() {
2014-06-08 12:43:59 +04:00
if len(app.cfg.SlaveOf) > 0 {
2014-10-10 05:49:16 +04:00
app.slaveof(app.cfg.SlaveOf, false, app.cfg.Readonly)
}
2014-07-17 11:19:46 +04:00
go app.httpServe()
2014-11-01 18:28:28 +03:00
for {
select {
case <-app.quit:
return
default:
conn, err := app.listener.Accept()
if err != nil {
continue
}
newClientRESP(conn, app)
2014-05-02 13:08:20 +04:00
}
}
}
2014-06-11 10:52:34 +04:00
2014-07-17 11:19:46 +04:00
func (app *App) httpServe() {
if app.httpListener == nil {
return
}
mux := http.NewServeMux()
2014-08-01 07:42:16 +04:00
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
newClientHTTP(app, w, r)
})
2014-07-17 11:19:46 +04:00
svr := http.Server{Handler: mux}
svr.Serve(app.httpListener)
}
2014-06-11 12:48:11 +04:00
func (app *App) Ledis() *ledis.Ledis {
2014-06-11 10:52:34 +04:00
return app.ldb
}
func (app *App) Address() string {
return app.listener.Addr().String()
}