isolated expires list mutex

This commit is contained in:
Josh Baker 2017-09-30 11:06:10 -07:00
parent a99613fe23
commit 1780badf1b
8 changed files with 77 additions and 63 deletions

View File

@ -31,7 +31,7 @@ func (err errAOFHook) Error() string {
var errInvalidAOF = errors.New("invalid aof file")
func (c *Controller) loadAOF() error {
fi, err := c.f.Stat()
fi, err := c.aof.Stat()
if err != nil {
return err
}
@ -54,7 +54,7 @@ func (c *Controller) loadAOF() error {
count, float64(d)/float64(time.Second), ps, byteSpeed)
}()
var msg server.Message
rd := bufio.NewReader(c.f)
rd := bufio.NewReader(c.aof)
for {
var nn int
ch, err := rd.ReadByte()
@ -194,7 +194,7 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error {
if err != nil {
return err
}
n, err := c.f.Write(data)
n, err := c.aof.Write(data)
if err != nil {
return err
}
@ -342,7 +342,7 @@ func (c *Controller) cmdAOF(msg *server.Message) (res string, err error) {
if err != nil || pos < 0 {
return "", errInvalidArgument(spos)
}
f, err := os.Open(c.f.Name())
f, err := os.Open(c.aof.Name())
if err != nil {
return "", err
}
@ -375,7 +375,7 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *server.AnyReaderWrite
}
c.mu.RLock()
f, err := os.Open(c.f.Name())
f, err := os.Open(c.aof.Name())
c.mu.RUnlock()
if err != nil {
return err

View File

@ -263,7 +263,7 @@ func (c *Controller) aofshrink() {
// anything below this point is unrecoverable. just log and exit process
// back up the live aof, just in case of fatal error
if err := c.f.Close(); err != nil {
if err := c.aof.Close(); err != nil {
log.Fatalf("shink live aof close fatal operation: %v", err)
}
if err := os.Rename(path.Join(c.dir, "appendonly.aof"), path.Join(c.dir, "appendonly.bak")); err != nil {
@ -272,12 +272,12 @@ func (c *Controller) aofshrink() {
if err := os.Rename(path.Join(c.dir, "shrink"), path.Join(c.dir, "appendonly.aof")); err != nil {
log.Fatalf("shink rename fatal operation: %v", err)
}
c.f, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
c.aof, err = os.OpenFile(path.Join(c.dir, "appendonly.aof"), os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatalf("shink openfile fatal operation: %v", err)
}
var n int64
n, err = c.f.Seek(0, 2)
n, err = c.aof.Seek(0, 2)
if err != nil {
log.Fatalf("shink seek end fatal operation: %v", err)
}

View File

@ -19,7 +19,7 @@ func (c *Controller) checksum(pos, size int64) (sum string, err error) {
return "", io.EOF
}
var f *os.File
f, err = os.Open(c.f.Name())
f, err = os.Open(c.aof.Name())
if err != nil {
return
}
@ -186,10 +186,10 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e
}
}
fullpos := pos
fname := c.f.Name()
fname := c.aof.Name()
if pos == 0 {
c.f.Close()
c.f, err = os.Create(fname)
c.aof.Close()
c.aof, err = os.Create(fname)
if err != nil {
log.Fatalf("could not recreate aof, possible data loss. %s", err.Error())
return 0, err
@ -199,7 +199,7 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e
// we want to truncate at a command location
// search for nearest command
pos, err = getEndOfLastValuePositionInFile(c.f.Name(), fullpos)
pos, err = getEndOfLastValuePositionInFile(c.aof.Name(), fullpos)
if err != nil {
return 0, err
}
@ -211,12 +211,12 @@ func (c *Controller) followCheckSome(addr string, followc int) (pos int64, err e
}
log.Warnf("truncating aof to %d", pos)
// any errror below are fatal.
c.f.Close()
c.aof.Close()
if err := os.Truncate(fname, pos); err != nil {
log.Fatalf("could not truncate aof, possible data loss. %s", err.Error())
return 0, err
}
c.f, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
c.aof, err = os.OpenFile(fname, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatalf("could not create aof, possible data loss. %s", err.Error())
return 0, err

View File

@ -82,7 +82,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
}
var list []*clientConn
c.connsmu.RLock()
for _, cc := range c.conns2 {
for _, cc := range c.conns {
list = append(list, cc)
}
c.connsmu.RUnlock()
@ -115,7 +115,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
}
name := ""
c.connsmu.RLock()
if cc, ok := c.conns2[conn]; ok {
if cc, ok := c.conns[conn]; ok {
name = cc.name.get()
}
c.connsmu.RUnlock()
@ -141,7 +141,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
}
}
c.connsmu.RLock()
if cc, ok := c.conns2[conn]; ok {
if cc, ok := c.conns[conn]; ok {
cc.name.set(name)
}
c.connsmu.RUnlock()
@ -187,7 +187,7 @@ func (c *Controller) cmdClient(msg *server.Message, conn *server.Conn) (string,
}
var cclose *clientConn
c.connsmu.RLock()
for _, cc := range c.conns2 {
for _, cc := range c.conns {
if useID && fmt.Sprintf("%d", cc.id) == id {
cclose = cc
break

View File

@ -64,6 +64,7 @@ type Controller struct {
host string
port int
http bool
dir string
started time.Time
config *Config
epc *endpoint.EndpointManager
@ -81,15 +82,19 @@ type Controller struct {
outOfMemory abool
connsmu sync.RWMutex
conns2 map[*server.Conn]*clientConn
conns map[*server.Conn]*clientConn
exlistmu sync.RWMutex
exlist []exitem
mu sync.RWMutex
aof *os.File // active aof file
aofsz int // active size of the aof file
qdb *buntdb.DB // hook queue log
qidx uint64 // hook queue log last idx
cols *btree.BTree // data collections
expires map[string]map[string]time.Time // synced with cols
mu sync.RWMutex
f *os.File
qdb *buntdb.DB // hook queue log
qidx uint64 // hook queue log last idx
cols *btree.BTree
aofsz int
dir string
follows map[*bytes.Buffer]bool
fcond *sync.Cond
lstack []*commandDetailsT
@ -102,8 +107,6 @@ type Controller struct {
hooks map[string]*Hook // hook name
hookcols map[string]map[string]*Hook // col key
aofconnM map[net.Conn]bool
expires map[string]map[string]time.Time
exlist []exitem
}
// ListenAndServe starts a new tile38 server
@ -126,7 +129,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
aofconnM: make(map[net.Conn]bool),
expires: make(map[string]map[string]time.Time),
started: time.Now(),
conns2: make(map[*server.Conn]*clientConn),
conns: make(map[*server.Conn]*clientConn),
epc: endpoint.NewEndpointManager(),
http: http,
}
@ -161,6 +164,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
if err != nil {
return err
}
c.qdb = qdb
c.qidx = qidx
if err := c.migrateAOF(); err != nil {
@ -170,7 +174,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
if err != nil {
return err
}
c.f = f
c.aof = f
if err := c.loadAOF(); err != nil {
return err
}
@ -192,7 +196,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
}()
handler := func(conn *server.Conn, msg *server.Message, rd *server.AnyReaderWriter, w io.Writer, websocket bool) error {
c.connsmu.RLock()
if cc, ok := c.conns2[conn]; ok {
if cc, ok := c.conns[conn]; ok {
cc.last.set(time.Now())
}
c.connsmu.RUnlock()
@ -236,7 +240,7 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
cc.conn = conn
c.connsmu.Lock()
c.conns2[conn] = cc
c.conns[conn] = cc
c.connsmu.Unlock()
c.statsTotalConns.add(1)
@ -244,9 +248,10 @@ func ListenAndServeEx(host string, port int, dir string, ln *net.Listener, http
closed := func(conn *server.Conn) {
c.connsmu.Lock()
delete(c.conns2, conn)
delete(c.conns, conn)
c.connsmu.Unlock()
}
return server.ListenAndServe(host, port, protected, handler, opened, closed, ln, http)
}

View File

@ -465,7 +465,10 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai
return
}
c.cols = btree.New(16, 0)
c.clearAllExpires()
c.exlistmu.Lock()
c.exlist = nil
c.exlistmu.Unlock()
c.expires = make(map[string]map[string]time.Time)
c.hooks = make(map[string]*Hook)
c.hookcols = make(map[string]map[string]*Hook)
d.command = "flushdb"

View File

@ -32,9 +32,16 @@ func (a *exitem) Less(v btree.Item, ctx interface{}) bool {
return a.id < b.id
}
// clearAllExpires removes all items that are marked at expires.
func (c *Controller) clearAllExpires() {
c.expires = make(map[string]map[string]time.Time)
// fillExpiresList occurs once at startup
func (c *Controller) fillExpiresList() {
c.exlistmu.Lock()
c.exlist = c.exlist[:0]
for key, m := range c.expires {
for id, at := range m {
c.exlist = append(c.exlist, exitem{key, id, at})
}
}
c.exlistmu.Unlock()
}
// clearIDExpires clears a single item from the expires list.
@ -67,9 +74,9 @@ func (c *Controller) expireAt(key, id string, at time.Time) {
c.expires[key] = m
}
m[id] = at
if c.exlist != nil {
c.exlist = append(c.exlist, exitem{key, id, at})
}
c.exlistmu.Lock()
c.exlist = append(c.exlist, exitem{key, id, at})
c.exlistmu.Unlock()
}
// getExpires returns the when an item expires.
@ -94,33 +101,35 @@ func (c *Controller) hasExpired(key, id string) bool {
return time.Now().After(at)
}
func (c *Controller) fillExpiresList() {
c.exlist = make([]exitem, 0)
for key, m := range c.expires {
for id, at := range m {
c.exlist = append(c.exlist, exitem{key, id, at})
}
}
}
// backgroundExpiring watches for when items that have expired must be purged
// from the database. It's executes 10 times a seconds.
func (c *Controller) backgroundExpiring() {
rand.Seed(time.Now().UnixNano())
var purgelist []exitem
for {
c.mu.Lock()
if c.stopBackgroundExpiring.on() {
c.mu.Unlock()
return
}
now := time.Now()
var purged int
purgelist = purgelist[:0]
c.exlistmu.Lock()
for i := 0; i < 20 && len(c.exlist) > 0; i++ {
ix := rand.Int() % len(c.exlist)
if now.After(c.exlist[ix].at) {
if c.hasExpired(c.exlist[ix].key, c.exlist[ix].id) {
// purge from exlist
purgelist = append(purgelist, c.exlist[ix])
c.exlist[ix] = c.exlist[len(c.exlist)-1]
c.exlist = c.exlist[:len(c.exlist)-1]
}
}
c.exlistmu.Unlock()
if len(purgelist) > 0 {
c.mu.Lock()
for _, item := range purgelist {
if c.hasExpired(item.key, item.id) {
// purge from database
msg := &server.Message{}
msg.Values = resp.MultiBulkValue("del", c.exlist[ix].key, c.exlist[ix].id).Array()
msg.Values = resp.MultiBulkValue("del", item.key, item.id).Array()
msg.Command = "del"
_, d, err := c.cmdDel(msg)
if err != nil {
@ -133,15 +142,12 @@ func (c *Controller) backgroundExpiring() {
log.Fatal(err)
continue
}
purged++
}
c.exlist[ix] = c.exlist[len(c.exlist)-1]
c.exlist = c.exlist[:len(c.exlist)-1]
}
}
c.mu.Unlock()
if purged > 5 {
continue
c.mu.Unlock()
if len(purgelist) > 5 {
continue
}
}
time.Sleep(time.Second / 10)
}

View File

@ -146,7 +146,7 @@ func (c *Controller) writeInfoServer(w *bytes.Buffer) {
}
func (c *Controller) writeInfoClients(w *bytes.Buffer) {
c.connsmu.RLock()
fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns2)) // Number of client connections (excluding connections from slaves)
fmt.Fprintf(w, "connected_clients:%d\r\n", len(c.conns)) // Number of client connections (excluding connections from slaves)
c.connsmu.RUnlock()
}
func (c *Controller) writeInfoMemory(w *bytes.Buffer) {