diff --git a/controller/aof.go b/controller/aof.go index 8b4a5802..27c0f53e 100644 --- a/controller/aof.go +++ b/controller/aof.go @@ -13,7 +13,6 @@ import ( "sync" "time" - "github.com/google/btree" "github.com/tidwall/resp" "github.com/tidwall/tile38/client" "github.com/tidwall/tile38/controller/log" @@ -123,13 +122,26 @@ func (c *Controller) loadAOF() error { Values: values, } if _, _, err := c.command(msg, nil); err != nil { - return err + if commandErrIsFatal(err) { + return err + } } c.aofsz += n count++ } } +func commandErrIsFatal(err error) bool { + // FSET (and other writable commands) may return errors that we need + // to ignore during the loading process. These errors may occur (though unlikely) + // due to the aof rewrite operation. + switch err { + case errKeyNotFound, errIDNotFound: + return false + } + return true +} + func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if d != nil { if !d.updated { @@ -139,6 +151,9 @@ func (c *Controller) writeAOF(value resp.Value, d *commandDetailsT) error { if hm, ok := c.hookcols[d.key]; ok { for _, hook := range hm { if err := c.DoHook(hook, d); err != nil { + if d.revert != nil { + d.revert() + } return errAOFHook{err} } } @@ -324,459 +339,3 @@ func (c *Controller) liveAOF(pos int64, conn net.Conn, rd *bufio.Reader) error { cond.L.Unlock() } } - -type treeKeyBoolT struct { - key string -} - -func (k *treeKeyBoolT) Less(item btree.Item) bool { - return k.key < item.(*treeKeyBoolT).key -} - -// aofshink shinks the aof file in the background -// When completed the only command that should exist in a shrunken aof is SET. -// We will read the commands backwards from last known position of the live aof -// and use an ondisk key value store for state. For now we use BoltDB, in the future -// we should use a store that is better performant. -// The following commands may exist in the aof. -// 'SET' -// - Has this key been marked 'ignore'? -// - Yes, then ignore -// - No, Has this id been marked 'soft-ignore' or 'hard-ignore'? -// - Yes, then ignore -// - No -// - Add command to key bucket -// - Mark id as 'soft-ignore' -// 'FSET' -// - Has this key been marked 'ignore'? -// - Yes, then ignore -// - No, Has this id been marked 'hard-ignore'? -// - Yes, then ignore -// - No -// - Add command to key bucket -// 'DEL' -// - Has this key been marked 'ignore'? -// - Yes, then ignore -// - No, Mark id as 'ignore'? -// 'DROP' -// - Has this key been marked 'ignore'? -// - Yes, then ignore -// - No, Mark key as 'ignore'? -// 'SETHOOK' -// - Direct copy from memory. -// 'DELHOOK' -// - Direct copy from memory. -// 'FLUSHDB' -// - Stop shrinking, nothing left to do - -func (c *Controller) aofshrink() { - // c.mu.Lock() - // c.f.Sync() - // if c.shrinking { - // c.mu.Unlock() - // return - // } - // c.shrinking = true - // endpos := int64(c.aofsz) - // start := time.Now() - // log.Infof("aof shrink started at pos %d", endpos) - - // var hooks []string - // for _, hook := range c.hooks { - // var orgs []string - // for _, endpoint := range hook.Endpoints { - // orgs = append(orgs, endpoint.Original) - // } - - // hooks = append(hooks, "SETHOOK "+hook.Name+" "+strings.Join(orgs, ",")+" "+hook.Command) - // } - - // c.mu.Unlock() - // var err error - // defer func() { - // c.mu.Lock() - // c.shrinking = false - // c.mu.Unlock() - // os.RemoveAll(c.dir + "/shrink.db") - // os.RemoveAll(c.dir + "/shrink") - // if err != nil { - // log.Error("aof shrink failed: " + err.Error()) - // } else { - // log.Info("aof shrink completed: " + time.Now().Sub(start).String()) - // } - // }() - // var db *bolt.DB - // db, err = bolt.Open(c.dir+"/shrink.db", 0600, nil) - // if err != nil { - // return - // } - // defer db.Close() - // var nf *os.File - // nf, err = os.Create(c.dir + "/shrink") - // if err != nil { - // return - // } - // defer nf.Close() - // defer func() { - // c.mu.Lock() - // defer c.mu.Unlock() - // if err == nil { - // c.f.Sync() - // _, err = nf.Seek(0, 2) - // if err == nil { - // var f *os.File - // f, err = os.Open(c.dir + "/aof") - // if err != nil { - // return - // } - // defer f.Close() - // _, err = f.Seek(endpos, 0) - // if err == nil { - // _, err = io.Copy(nf, f) - // if err == nil { - // f.Close() - // nf.Close() - // // At this stage we need to kill all aof followers. To do so we will - // // write a KILLAOF command to the stream. KILLAOF isn't really a command. - // // This will cause the followers will close their connection and then - // // automatically reconnect. The reconnection will force a sync of the aof. - // err = c.writeAOF(resp.MultiBulkValue("KILLAOF"), nil) - // if err == nil { - // c.f.Close() - // err = os.Rename(c.dir+"/shrink", c.dir+"/aof") - // if err != nil { - // log.Fatal("shink rename fatal operation") - // } - // c.f, err = os.OpenFile(c.dir+"/aof", os.O_CREATE|os.O_RDWR, 0600) - // if err != nil { - // log.Fatal("shink openfile fatal operation") - // } - // var n int64 - // n, err = c.f.Seek(0, 2) - // if err != nil { - // log.Fatal("shink seek end fatal operation") - // } - // c.aofsz = int(n) - // } - // } - // } - // } - // } - // }() - // var f *os.File - // f, err = os.Open(c.dir + "/aof") - // if err != nil { - // return - // } - // defer f.Close() - - // var buf []byte - // var pos int64 - // pos, err = f.Seek(endpos, 0) - // if err != nil { - // return - // } - // var readPreviousCommand func() ([]byte, error) - // readPreviousCommand = func() ([]byte, error) { - // if len(buf) >= 5 { - // if buf[len(buf)-1] != 0 { - // return nil, errCorruptedAOF - // } - // sz2 := int(binary.LittleEndian.Uint32(buf[len(buf)-5:])) - // if len(buf) >= sz2+9 { - // sz1 := int(binary.LittleEndian.Uint32(buf[len(buf)-(sz2+9):])) - // if sz1 != sz2 { - // return nil, errCorruptedAOF - // } - // command := buf[len(buf)-(sz2+5) : len(buf)-5] - // buf = buf[:len(buf)-(sz2+9)] - // return command, nil - // } - // } - // if pos == 0 { - // if len(buf) > 0 { - // return nil, io.ErrUnexpectedEOF - // } else { - // return nil, io.EOF - // } - // } - // sz := int64(backwardsBufferSize) - // offset := pos - sz - // if offset < 0 { - // sz = pos - // offset = 0 - // } - // pos, err = f.Seek(offset, 0) - // if err != nil { - // return nil, err - // } - // nbuf := make([]byte, int(sz)) - // _, err = io.ReadFull(f, nbuf) - // if err != nil { - // return nil, err - // } - // if len(buf) > 0 { - // nbuf = append(nbuf, buf...) - // } - // buf = nbuf - // return readPreviousCommand() - // } - // var tx *bolt.Tx - // tx, err = db.Begin(true) - // if err != nil { - // return - // } - // defer func() { - // tx.Rollback() - // }() - // var keyIgnoreM = map[string]bool{} - // var keyBucketM = btree.New(16) - // var cmd, key, id, field string - // var line string - // var command []byte - // var val []byte - // var b *bolt.Bucket - // reading: - // for i := 0; ; i++ { - // if i%500 == 0 { - // if err = tx.Commit(); err != nil { - // return - // } - // tx, err = db.Begin(true) - // if err != nil { - // return - // } - // } - // command, err = readPreviousCommand() - // if err != nil { - // if err == io.EOF { - // err = nil - // break - // } - // return - // } - // // quick path - // if len(command) == 0 { - // continue // ignore blank commands - // } - // line, cmd = token(string(command)) - // cmd = strings.ToLower(cmd) - // switch cmd { - // case "flushdb": - // break reading // all done - // case "drop": - // if line, key = token(line); key == "" { - // err = errors.New("DROP is missing key") - // return - // } - // if !keyIgnoreM[key] { - // keyIgnoreM[key] = true - // } - // case "del": - // if line, key = token(line); key == "" { - // err = errors.New("DEL is missing key") - // return - // } - // if keyIgnoreM[key] { - // continue // ignore - // } - // if line, id = token(line); id == "" { - // err = errors.New("DEL is missing id") - // return - // } - // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { - // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { - // return - // } - // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { - // return - // } - // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) - // } - // b = tx.Bucket([]byte(key + ".ignore_ids")) - // err = b.Put([]byte(id), []byte("2")) // 2 for hard ignore - // if err != nil { - // return - // } - - // case "set": - // if line, key = token(line); key == "" { - // err = errors.New("SET is missing key") - // return - // } - // if keyIgnoreM[key] { - // continue // ignore - // } - // if line, id = token(line); id == "" { - // err = errors.New("SET is missing id") - // return - // } - // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { - // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { - // return - // } - // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { - // return - // } - // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) - // } - // b = tx.Bucket([]byte(key + ".ignore_ids")) - // val = b.Get([]byte(id)) - // if val == nil { - // if err = b.Put([]byte(id), []byte("1")); err != nil { - // return - // } - // b = tx.Bucket([]byte(key + ".ids")) - // if err = b.Put([]byte(id), command); err != nil { - // return - // } - // } else { - // switch string(val) { - // default: - // err = errors.New("invalid ignore") - // case "1", "2": - // continue // ignore - // } - // } - // case "fset": - // if line, key = token(line); key == "" { - // err = errors.New("FSET is missing key") - // return - // } - // if keyIgnoreM[key] { - // continue // ignore - // } - // if line, id = token(line); id == "" { - // err = errors.New("FSET is missing id") - // return - // } - // if line, field = token(line); field == "" { - // err = errors.New("FSET is missing field") - // return - // } - // if keyBucketM.Get(&treeKeyBoolT{key}) == nil { - // if _, err = tx.CreateBucket([]byte(key + ".ids")); err != nil { - // return - // } - // if _, err = tx.CreateBucket([]byte(key + ".ignore_ids")); err != nil { - // return - // } - // keyBucketM.ReplaceOrInsert(&treeKeyBoolT{key}) - // } - // b = tx.Bucket([]byte(key + ".ignore_ids")) - // val = b.Get([]byte(id)) - // if val == nil { - // b = tx.Bucket([]byte(key + ":" + id + ":0")) - // if b == nil { - // if b, err = tx.CreateBucket([]byte(key + ":" + id + ":0")); err != nil { - // return - // } - // } - // if b.Get([]byte(field)) == nil { - // if err = b.Put([]byte(field), command); err != nil { - // return - // } - // } - // } else { - // switch string(val) { - // default: - // err = errors.New("invalid ignore") - // case "1": - // b = tx.Bucket([]byte(key + ":" + id + ":1")) - // if b == nil { - // if b, err = tx.CreateBucket([]byte(key + ":" + id + ":1")); err != nil { - // return - // } - // } - // if b.Get([]byte(field)) == nil { - // if err = b.Put([]byte(field), command); err != nil { - // return - // } - // } - // case "2": - // continue // ignore - // } - // } - // } - // } - // if err = tx.Commit(); err != nil { - // return - // } - // tx, err = db.Begin(false) - // if err != nil { - // return - // } - // keyBucketM.Ascend(func(item btree.Item) bool { - // key := item.(*treeKeyBoolT).key - // b := tx.Bucket([]byte(key + ".ids")) - // if b != nil { - // err = b.ForEach(func(id, command []byte) error { - // // parse the SET command - // _, fields, values, etype, eargs, err := c.parseSetArgs(string(command[4:])) - // if err != nil { - // return err - // } - // // store the fields in a map - // var fieldm = map[string]float64{} - // for i, field := range fields { - // fieldm[field] = values[i] - // } - // // append old FSET values. these are FSETs that existed prior to the last SET. - // f1 := tx.Bucket([]byte(key + ":" + string(id) + ":1")) - // if f1 != nil { - // err = f1.ForEach(func(field, command []byte) error { - // d, err := c.parseFSetArgs(string(command[5:])) - // if err != nil { - // return err - // } - // if _, ok := fieldm[d.field]; !ok { - // fieldm[d.field] = d.value - // } - // return nil - // }) - // if err != nil { - // return err - // } - // } - // // append new FSET values. these are FSETs that were added after the last SET. - // f0 := tx.Bucket([]byte(key + ":" + string(id) + ":0")) - // if f0 != nil { - // f0.ForEach(func(field, command []byte) error { - // d, err := c.parseFSetArgs(string(command[5:])) - // if err != nil { - // return err - // } - // fieldm[d.field] = d.value - // return nil - // }) - // } - // // rebuild the SET command - // ncommand := "set " + key + " " + string(id) - // for field, value := range fieldm { - // if value != 0 { - // ncommand += " field " + field + " " + strconv.FormatFloat(value, 'f', -1, 64) - // } - // } - // ncommand += " " + strings.ToUpper(etype) + " " + eargs - // _, err = writeCommand(nf, []byte(ncommand)) - // if err != nil { - // return err - // } - // return nil - // }) - // if err != nil { - // return false - // } - // } - // return true - // }) - // if err == nil { - // // add all of the hooks - // for _, line := range hooks { - // _, err = writeCommand(nf, []byte(line)) - // if err != nil { - // return - // } - // } - // } -} diff --git a/controller/collection/collection.go b/controller/collection/collection.go index 4abf5d96..46bdb563 100644 --- a/controller/collection/collection.go +++ b/controller/collection/collection.go @@ -1,8 +1,6 @@ package collection import ( - "math" - "github.com/google/btree" "github.com/tidwall/tile38/geojson" "github.com/tidwall/tile38/index" @@ -94,8 +92,17 @@ func (c *Collection) ReplaceOrInsert(id string, obj geojson.Object, fields []str nitem.Fields = oldFields c.weight += len(nitem.Fields) * 8 } - for i, field := range fields { - c.setField(nitem, field, values[i]) + if fields == nil && len(values) > 0 { + // directly set the field values, update weight + c.weight -= len(nitem.Fields) * 8 + nitem.Fields = values + c.weight += len(nitem.Fields) * 8 + + } else { + // map field name to value + for i, field := range fields { + c.setField(nitem, field, values[i]) + } } return oldObject, oldFields, nitem.Fields } @@ -170,13 +177,10 @@ func (c *Collection) setField(item *itemT, field string, value float64) (updated } c.weight -= len(item.Fields) * 8 for idx >= len(item.Fields) { - item.Fields = append(item.Fields, math.NaN()) + item.Fields = append(item.Fields, 0) } c.weight += len(item.Fields) * 8 ovalue := item.Fields[idx] - if math.IsNaN(ovalue) { - ovalue = 0 - } item.Fields[idx] = value return ovalue != value } diff --git a/controller/controller.go b/controller/controller.go index 105225df..fd71c559 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -37,6 +37,7 @@ type commandDetailsT struct { oldObj geojson.Object oldFields []float64 updated bool + revert func() } func (col *collectionT) Less(item btree.Item) bool { @@ -49,8 +50,7 @@ type Controller struct { host string port int f *os.File - cols *btree.BTree // use both tree and map. provide ordering. - colsm map[string]*collection.Collection // use both tree and map. provide performance. + cols *btree.BTree aofsz int dir string config Config @@ -74,7 +74,6 @@ func ListenAndServe(host string, port int, dir string) error { port: port, dir: dir, cols: btree.New(16), - colsm: make(map[string]*collection.Collection), follows: make(map[*bytes.Buffer]bool), fcond: sync.NewCond(&sync.Mutex{}), lives: make(map[*liveBuffer]bool), @@ -136,19 +135,24 @@ func ListenAndServe(host string, port int, dir string) error { func (c *Controller) setCol(key string, col *collection.Collection) { c.cols.ReplaceOrInsert(&collectionT{Key: key, Collection: col}) - c.colsm[key] = col } func (c *Controller) getCol(key string) *collection.Collection { - col, ok := c.colsm[key] - if !ok { + item := c.cols.Get(&collectionT{Key: key}) + if item == nil { return nil } - return col + return item.(*collectionT).Collection +} + +func (c *Controller) scanGreaterOrEqual(key string, iterator func(key string, col *collection.Collection) bool) { + c.cols.AscendGreaterOrEqual(&collectionT{Key: key}, func(item btree.Item) bool { + col := item.(*collectionT) + return iterator(col.Key, col.Collection) + }) } func (c *Controller) deleteCol(key string) *collection.Collection { - delete(c.colsm, key) i := c.cols.Delete(&collectionT{Key: key}) if i == nil { return nil @@ -353,13 +357,12 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co res, d, err = c.cmdDelHook(msg) case "hooks": res, err = c.cmdHooks(msg) - // case "massinsert": - // if !core.DevMode { - // err = fmt.Errorf("unknown command '%s'", cmd) - // return - // } - // err = c.cmdMassInsert(nline) - // resp = okResp() + case "massinsert": + if !core.DevMode { + err = fmt.Errorf("unknown command '%s'", msg.Values[0]) + return + } + res, err = c.cmdMassInsert(msg) // case "follow": // err = c.cmdFollow(nline) // resp = okResp() @@ -388,13 +391,11 @@ func (c *Controller) command(msg *server.Message, w io.Writer) (res string, d co // case "aofmd5": // resp, err = c.cmdAOFMD5(nline) case "gc": - start := time.Now() go runtime.GC() - res = server.OKMessage(msg, start) - // case "aofshrink": - // go c.aofshrink() - // resp = okResp() - + res = server.OKMessage(msg, time.Now()) + case "aofshrink": + go c.aofshrink() + res = server.OKMessage(msg, time.Now()) case "config get": res, err = c.cmdConfigGet(msg) case "config set": diff --git a/controller/crud.go b/controller/crud.go index aabdc7c7..e9693193 100644 --- a/controller/crud.go +++ b/controller/crud.go @@ -2,7 +2,6 @@ package controller import ( "bytes" - "math" "sort" "strconv" "strings" @@ -40,7 +39,7 @@ func orderFields(fmap map[string]int, fields []float64) []fvt { if idx < len(fields) { fv.field = field fv.value = fields[idx] - if !math.IsNaN(fv.value) && fv.value != 0 { + if fv.value != 0 { fvs = append(fvs, fv) } } @@ -212,6 +211,14 @@ func (c *Controller) cmdDel(msg *server.Message) (res string, d commandDetailsT, if ok { if col.Count() == 0 { c.deleteCol(d.key) + d.revert = func() { + c.setCol(d.key, col) + col.ReplaceOrInsert(d.id, d.obj, nil, d.fields) + } + } else { + d.revert = func() { + col.ReplaceOrInsert(d.id, d.obj, nil, d.fields) + } } found = true } @@ -246,6 +253,9 @@ func (c *Controller) cmdDrop(msg *server.Message) (res string, d commandDetailsT col := c.getCol(d.key) if col != nil { c.deleteCol(d.key) + d.revert = func() { + c.setCol(d.key, col) + } d.updated = true } else { d.key = "" // ignore the details @@ -273,7 +283,6 @@ func (c *Controller) cmdFlushDB(msg *server.Message) (res string, d commandDetai return } c.cols = btree.New(16) - c.colsm = make(map[string]*collection.Collection) c.hooks = make(map[string]*Hook) c.hookcols = make(map[string]map[string]*Hook) d.command = "flushdb" @@ -486,12 +495,23 @@ func (c *Controller) cmdSet(msg *server.Message) (res string, d commandDetailsT, if err != nil { return } + addedcol := false col := c.getCol(d.key) if col == nil { col = collection.New() c.setCol(d.key, col) + addedcol = true } d.oldObj, d.oldFields, d.fields = col.ReplaceOrInsert(d.id, d.obj, fields, values) + d.revert = func() { + if addedcol { + c.deleteCol(d.key) + } else if d.oldObj != nil { + col.ReplaceOrInsert(d.id, d.oldObj, nil, d.oldFields) + } else { + col.Remove(d.id) + } + } d.command = "set" d.updated = true // perhaps we should do a diff on the previous object? switch msg.OutputType { diff --git a/controller/dev.go b/controller/dev.go index b0c088c5..8cd5be04 100644 --- a/controller/dev.go +++ b/controller/dev.go @@ -1,72 +1,97 @@ package controller -func (c *Controller) cmdMassInsert(line string) error { - // // massinsert simply forwards a bunch of cmdSets - // var snumCols, snumPoints string - // var cols, objs int - // if line, snumCols = token(line); snumCols == "" { - // return errors.New("invalid number of arguments") - // } - // if line, snumPoints = token(line); snumPoints == "" { - // return errors.New("invalid number of arguments") - // } - // if line != "" { - // return errors.New("invalid number of arguments") - // } - // n, err := strconv.ParseUint(snumCols, 10, 64) - // if err != nil { - // return errors.New("invalid argument '" + snumCols + "'") - // } - // cols = int(n) - // n, err = strconv.ParseUint(snumPoints, 10, 64) - // if err != nil { - // return errors.New("invalid argument '" + snumPoints + "'") - // } - // docmd := func(cmd string) error { - // c.mu.Lock() - // defer c.mu.Unlock() - // _, d, err := c.command(cmd, nil) - // if err != nil { - // return err - // } - // if err := c.writeAOF(cmd, &d); err != nil { - // return err - // } - // return nil - // } - // rand.Seed(time.Now().UnixNano()) - // objs = int(n) - // var wg sync.WaitGroup - // var k uint64 - // wg.Add(cols) - // for i := 0; i < cols; i++ { - // key := "mi:" + strconv.FormatInt(int64(i), 10) - // go func(key string) { - // defer func() { - // wg.Done() - // }() - // for j := 0; j < objs; j++ { - // id := strconv.FormatInt(int64(j), 10) - // lat, lon := rand.Float64()*180-90, rand.Float64()*360-180 - // var line string - // if true { - // fields := fmt.Sprintf("FIELD field %f", rand.Float64()*10) - // line = fmt.Sprintf(`set %s %s %s POINT %f %f`, key, id, fields, lat, lon) - // } else { - // line = fmt.Sprintf(`set %s %s POINT %f %f`, key, id, lat, lon) - // } - // if err := docmd(line); err != nil { - // log.Fatal(err) - // return - // } - // atomic.AddUint64(&k, 1) - // if j%10000 == 10000-1 { - // log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) - // } - // } - // }(key) - // } - // wg.Wait() - // log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k)) - return nil +import ( + "errors" + "math/rand" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/tidwall/resp" + "github.com/tidwall/tile38/controller/log" + "github.com/tidwall/tile38/controller/server" +) + +const useRandField = true + +func (c *Controller) cmdMassInsert(msg *server.Message) (res string, err error) { + start := time.Now() + vs := msg.Values[1:] + + // massinsert simply forwards a bunch of cmdSets + var snumCols, snumPoints string + var cols, objs int + var ok bool + if vs, snumCols, ok = tokenval(vs); !ok || snumCols == "" { + return "", errInvalidNumberOfArguments + } + if vs, snumPoints, ok = tokenval(vs); !ok || snumPoints == "" { + return "", errInvalidNumberOfArguments + } + if len(vs) != 0 { + return "", errors.New("invalid number of arguments") + } + n, err := strconv.ParseUint(snumCols, 10, 64) + if err != nil { + return "", errInvalidArgument(snumCols) + } + cols = int(n) + n, err = strconv.ParseUint(snumPoints, 10, 64) + if err != nil { + return "", errInvalidArgument(snumPoints) + } + docmd := func(values []resp.Value) error { + c.mu.Lock() + defer c.mu.Unlock() + nmsg := &server.Message{} + *nmsg = *msg + nmsg.Values = values + nmsg.Command = strings.ToLower(values[0].String()) + + _, d, err := c.command(nmsg, nil) + if err != nil { + return err + } + if err := c.writeAOF(resp.ArrayValue(nmsg.Values), &d); err != nil { + return err + } + return nil + } + rand.Seed(time.Now().UnixNano()) + objs = int(n) + var wg sync.WaitGroup + var k uint64 + wg.Add(cols) + for i := 0; i < cols; i++ { + key := "mi:" + strconv.FormatInt(int64(i), 10) + go func(key string) { + defer func() { + wg.Done() + }() + + for j := 0; j < objs; j++ { + id := strconv.FormatInt(int64(j), 10) + lat, lon := rand.Float64()*180-90, rand.Float64()*360-180 + values := make([]resp.Value, 0, 16) + values = append(values, resp.StringValue("set"), resp.StringValue(key), resp.StringValue(id)) + if useRandField { + values = append(values, resp.StringValue("FIELD"), resp.StringValue("field"), resp.FloatValue(rand.Float64()*10)) + } + values = append(values, resp.StringValue("POINT"), resp.FloatValue(lat), resp.FloatValue(lon)) + if err := docmd(values); err != nil { + log.Fatal(err) + return + } + atomic.AddUint64(&k, 1) + if j%10000 == 10000-1 { + log.Infof("massinsert: %s %d/%d", key, atomic.LoadUint64(&k), cols*objs) + } + } + }(key) + } + wg.Wait() + log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k)) + return server.OKMessage(msg, start), nil } diff --git a/controller/fence.go b/controller/fence.go index 080836bd..8a361c0f 100644 --- a/controller/fence.go +++ b/controller/fence.go @@ -4,6 +4,7 @@ import ( "strings" "time" + "github.com/tidwall/tile38/controller/server" "github.com/tidwall/tile38/geojson" ) @@ -26,6 +27,7 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc match = false detect := "outside" if fence != nil { + match1 := fenceMatchObject(fence, details.oldObj) match2 := fenceMatchObject(fence, details.obj) if match1 && match2 { @@ -83,6 +85,7 @@ func (c *Controller) FenceMatch(hookName string, sw *scanWriter, fence *liveFenc } sw.fmap = fmap sw.fullFields = true + sw.msg.OutputType = server.JSON sw.writeObject(details.id, details.obj, details.fields) if sw.wr.Len() == 0 { return nil diff --git a/controller/hooks.go b/controller/hooks.go index b7d259ee..7214fe70 100644 --- a/controller/hooks.go +++ b/controller/hooks.go @@ -47,23 +47,33 @@ type Hook struct { } func (c *Controller) DoHook(hook *Hook, details *commandDetailsT) error { + var lerrs []error msgs := c.FenceMatch(hook.Name, hook.ScanWriter, hook.Fence, details, false) for _, msg := range msgs { for _, endpoint := range hook.Endpoints { switch endpoint.Protocol { case HTTP: if err := c.sendHTTPMessage(endpoint, msg); err != nil { - return err + lerrs = append(lerrs, err) + continue } return nil //sent case Disque: if err := c.sendDisqueMessage(endpoint, msg); err != nil { - return err + lerrs = append(lerrs, err) + continue } return nil // sent } } } + var errmsgs []string + for _, err := range lerrs { + errmsgs = append(errmsgs, err.Error()) + } + if len(errmsgs) > 0 { + return errors.New("not sent: " + strings.Join(errmsgs, ",")) + } return errors.New("not sent") } diff --git a/controller/scanner.go b/controller/scanner.go index 7bf79b6a..abd8e6c7 100644 --- a/controller/scanner.go +++ b/controller/scanner.go @@ -3,7 +3,6 @@ package controller import ( "bytes" "errors" - "math" "strconv" "github.com/tidwall/resp" @@ -178,9 +177,6 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64, if ok { if len(fields) > idx { value = fields[idx] - if math.IsNaN(value) { - value = 0 - } } } if !where.match(value) { @@ -192,9 +188,6 @@ func (sw *scanWriter) fieldMatch(fields []float64, o geojson.Object) ([]float64, var value float64 if len(fields) > idx { value = fields[idx] - if math.IsNaN(value) { - value = 0 - } } sw.fvals[idx] = value } @@ -261,7 +254,7 @@ func (sw *scanWriter) writeObject(id string, o geojson.Object, fields []float64) var i int for field, idx := range sw.fmap { if len(fields) > idx { - if !math.IsNaN(fields[idx]) { + if fields[idx] != 0 { if i > 0 { jsfields += `,` } diff --git a/controller/stats.go b/controller/stats.go index 89714e13..d0e80ca9 100644 --- a/controller/stats.go +++ b/controller/stats.go @@ -79,6 +79,7 @@ func (c *Controller) cmdServer(msg *server.Message) (res string, err error) { } m["aof_size"] = c.aofsz m["num_collections"] = c.cols.Len() + m["num_hooks"] = len(c.hooks) sz := 0 c.cols.Ascend(func(item btree.Item) bool { col := item.(*collectionT).Collection