mirror of https://github.com/tidwall/tile38.git
Sync hook msg ttl with server time
This commit is contained in:
parent
cd08d7fa7d
commit
b99cd397d6
|
@ -219,7 +219,7 @@ func (c *Controller) queueHooks(d *commandDetailsT) error {
|
|||
for _, msg := range hmsgs {
|
||||
c.qidx++ // increment the log id
|
||||
key := hookLogPrefix + uint64ToString(c.qidx)
|
||||
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults())
|
||||
_, _, err := tx.Set(key, string(msg), hookLogSetDefaults)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -17,16 +17,9 @@ import (
|
|||
"github.com/tidwall/tile38/controller/server"
|
||||
)
|
||||
|
||||
const hookLogTTL = time.Second * 30
|
||||
|
||||
func hookLogSetDefaults() *buntdb.SetOptions {
|
||||
if hookLogTTL > 0 {
|
||||
return &buntdb.SetOptions{
|
||||
Expires: true, // automatically delete after 30 seconds
|
||||
TTL: hookLogTTL,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
var hookLogSetDefaults = &buntdb.SetOptions{
|
||||
Expires: true, // automatically delete after 30 seconds
|
||||
TTL: time.Second * 30,
|
||||
}
|
||||
|
||||
type hooksByName []*Hook
|
||||
|
@ -424,8 +417,8 @@ func (h *Hook) Close() {
|
|||
// notify the manager that there may be something new in the queue.
|
||||
func (h *Hook) Signal() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.cond.Broadcast()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// the manager is a forever loop that calls proc whenever there's a signal.
|
||||
|
@ -456,8 +449,8 @@ func (h *Hook) manager() {
|
|||
func (h *Hook) proc() (ok bool) {
|
||||
var keys, vals []string
|
||||
var ttls []time.Duration
|
||||
start := time.Now()
|
||||
err := h.db.Update(func(tx *buntdb.Tx) error {
|
||||
|
||||
// get keys and vals
|
||||
err := tx.AscendGreaterOrEqual("hooks", h.query, func(key, val string) bool {
|
||||
if strings.HasPrefix(key, hookLogPrefix) {
|
||||
|
@ -472,15 +465,13 @@ func (h *Hook) proc() (ok bool) {
|
|||
|
||||
// delete the keys
|
||||
for _, key := range keys {
|
||||
if hookLogTTL > 0 {
|
||||
ttl, err := tx.TTL(key)
|
||||
if err != nil {
|
||||
if err != buntdb.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
ttl, err := tx.TTL(key)
|
||||
if err != nil {
|
||||
if err != buntdb.ErrNotFound {
|
||||
return err
|
||||
}
|
||||
ttls = append(ttls, ttl)
|
||||
}
|
||||
ttls = append(ttls, ttl)
|
||||
_, err = tx.Delete(key)
|
||||
if err != nil {
|
||||
if err != buntdb.ErrNotFound {
|
||||
|
@ -514,22 +505,20 @@ func (h *Hook) proc() (ok bool) {
|
|||
// failed to send. try to reinsert the remaining. if this fails we lose log entries.
|
||||
keys = keys[i:]
|
||||
vals = vals[i:]
|
||||
if hookLogTTL > 0 {
|
||||
ttls = ttls[i:]
|
||||
}
|
||||
ttls = ttls[i:]
|
||||
h.db.Update(func(tx *buntdb.Tx) error {
|
||||
for i, key := range keys {
|
||||
val := vals[i]
|
||||
var opts *buntdb.SetOptions
|
||||
if hookLogTTL > 0 {
|
||||
opts = &buntdb.SetOptions{
|
||||
ttl := ttls[i] - time.Since(start)
|
||||
if ttl > 0 {
|
||||
opts := &buntdb.SetOptions{
|
||||
Expires: true,
|
||||
TTL: ttls[i],
|
||||
TTL: ttl,
|
||||
}
|
||||
_, _, err := tx.Set(key, val, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
_, _, err := tx.Set(key, val, opts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue