From 9c471e3f9c9bfe907379ee2ad502b2f75fd0c5eb Mon Sep 17 00:00:00 2001 From: tidwall Date: Thu, 3 Nov 2022 10:07:17 -0700 Subject: [PATCH] Replace out old style atomics --- internal/endpoint/endpoint.go | 6 ++-- internal/server/aof.go | 3 +- internal/server/dev.go | 9 +++--- internal/server/server.go | 54 +++++++++++++++++++++-------------- tests/fence_test.go | 6 ++-- tests/mock_test.go | 14 ++++----- 6 files changed, 49 insertions(+), 43 deletions(-) diff --git a/internal/endpoint/endpoint.go b/internal/endpoint/endpoint.go index f13feeb7..0d61c03c 100644 --- a/internal/endpoint/endpoint.go +++ b/internal/endpoint/endpoint.go @@ -147,7 +147,7 @@ type Manager struct { mu sync.RWMutex conns map[string]Conn publisher LocalPublisher - shutdown int32 // atomic bool + shutdown atomic.Bool // atomic bool wg sync.WaitGroup // run wait group } @@ -164,7 +164,7 @@ func NewManager(publisher LocalPublisher) *Manager { func (epc *Manager) Shutdown() { defer epc.wg.Wait() - atomic.StoreInt32(&epc.shutdown, 1) + epc.shutdown.Store(true) // expire the connections epc.mu.Lock() defer epc.mu.Unlock() @@ -177,7 +177,7 @@ func (epc *Manager) Shutdown() { func (epc *Manager) run() { defer epc.wg.Done() for { - if atomic.LoadInt32(&epc.shutdown) != 0 { + if epc.shutdown.Load() { return } time.Sleep(time.Second) diff --git a/internal/server/aof.go b/internal/server/aof.go index 250080b1..a43eb491 100644 --- a/internal/server/aof.go +++ b/internal/server/aof.go @@ -11,7 +11,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/tidwall/buntdb" @@ -157,7 +156,7 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error { } if s.aof != nil { - atomic.StoreInt32(&s.aofdirty, 1) // prewrite optimization flag + s.aofdirty.Store(true) // prewrite optimization flag n := len(s.aofbuf) s.aofbuf = redcon.AppendArray(s.aofbuf, len(args)) for _, arg := range args { diff --git a/internal/server/dev.go b/internal/server/dev.go index 1f6e47f9..15410fa1 100644 --- a/internal/server/dev.go +++ b/internal/server/dev.go @@ -90,7 +90,7 @@ func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { } rand.Seed(time.Now().UnixNano()) objs = int(n) - var k uint64 + var k atomic.Uint64 for i := 0; i < cols; i++ { key := "mi:" + strconv.FormatInt(int64(i), 10) func(key string) { @@ -137,15 +137,14 @@ func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) { log.Fatal(err) return } - atomic.AddUint64(&k, 1) + k.Add(1) if j%1000 == 1000-1 { - log.Debugf("massinsert: %s %d/%d", - key, atomic.LoadUint64(&k), cols*objs) + log.Debugf("massinsert: %s %d/%d", key, k.Load(), cols*objs) } } }(key) } - log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k)) + log.Infof("massinsert: done %d objects", k.Load()) return OKMessage(msg, start), nil } diff --git a/internal/server/server.go b/internal/server/server.go index dc9b2952..b5c5d3f3 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -70,6 +70,9 @@ type commandDetails struct { // Server is a tile38 controller type Server struct { + // user defined options + opts Options + // static values unix string host string @@ -103,25 +106,22 @@ type Server struct { connsmu sync.RWMutex conns map[int]*Client - mu sync.RWMutex - aof *os.File // active aof file - aofdirty int32 // mark the aofbuf as having data - aofbuf []byte // prewrite buffer - aofsz int // active size of the aof file - qdb *buntdb.DB // hook queue log - qidx uint64 // hook queue log last idx + mu sync.RWMutex + + // aof + aof *os.File // active aof file + aofdirty atomic.Bool // mark the aofbuf as having data + aofbuf []byte // prewrite buffer + aofsz int // active size of the aof file + shrinking bool // aof shrinking flag + shrinklog [][]string // aof shrinking log + + // database + qdb *buntdb.DB // hook queue log + qidx uint64 // hook queue log last idx cols *btree.Map[string, *collection.Collection] // data collections - follows map[*bytes.Buffer]bool - fcond *sync.Cond - lstack []*commandDetails - lives map[*liveBuffer]bool - lcond *sync.Cond - fcup bool // follow caught up - fcuponce bool // follow caught up once - shrinking bool // aof shrinking flag - shrinklog [][]string // aof shrinking log hooks *btree.BTree // hook name -- [string]*Hook hookCross *rtree.RTree // hook spatial tree for "cross" geofences hookTree *rtree.RTree // hook spatial tree for all @@ -130,16 +130,26 @@ type Server struct { groupObjects *btree.BTree // objects that are connected to hooks hookExpires *btree.BTree // queue of all hooks marked for expiration - aofconnM map[net.Conn]io.Closer + // followers (external aof readers) + follows map[*bytes.Buffer]bool + fcond *sync.Cond + lstack []*commandDetails + lives map[*liveBuffer]bool + lcond *sync.Cond + fcup bool // follow caught up + fcuponce bool // follow caught up once + aofconnM map[net.Conn]io.Closer + + // lua scripts luascripts *lScriptMap luapool *lStatePool + // pubsub system (SUBSCRIBE, PUBLISH, and SETCHAN) pubsub *pubsub + // monitor connections (using the MONITOR command) monconnsMu sync.RWMutex - monconns map[net.Conn]bool // monitor connections - - opts Options + monconns map[net.Conn]bool } // Options for Serve() @@ -629,14 +639,14 @@ func (s *Server) netServe() error { // write to client if len(client.out) > 0 { - if atomic.LoadInt32(&s.aofdirty) != 0 { + if s.aofdirty.Load() { func() { // prewrite s.mu.Lock() defer s.mu.Unlock() s.flushAOF(false) }() - atomic.StoreInt32(&s.aofdirty, 0) + s.aofdirty.Store(false) } conn.Write(client.out) client.out = nil diff --git a/tests/fence_test.go b/tests/fence_test.go index 1a2b7dec..0ffc6826 100644 --- a/tests/fence_test.go +++ b/tests/fence_test.go @@ -473,15 +473,15 @@ func fence_eecio_test(mc *mockServer) error { return nil }() }() - var timeok int32 + var timeok atomic.Bool go func() { time.Sleep(time.Second * 30) - if atomic.LoadInt32(&timeok) == 0 { + if !timeok.Load() { panic("timeout") } }() wg.Wait() - atomic.StoreInt32(&timeok, 1) + timeok.Store(true) if err3 != nil { return err3 } diff --git a/tests/mock_test.go b/tests/mock_test.go index 3957dc1b..e93e9901 100644 --- a/tests/mock_test.go +++ b/tests/mock_test.go @@ -108,8 +108,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { if opts.Metrics { s.mport = getNextPort() } - var ferrt int32 // atomic flag for when ferr has been set - var ferr error // ferr for when the server fails to start + var ferr atomic.Pointer[error] // ferr for when the server fails to start go func() { sopts := server.Options{ Host: "localhost", @@ -126,23 +125,22 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) { } err := server.Serve(sopts) if err != nil { - ferr = err - atomic.StoreInt32(&ferrt, 1) + ferr.CompareAndSwap(nil, &err) } }() - if err := s.waitForStartup(&ferr, &ferrt); err != nil { + if err := s.waitForStartup(&ferr); err != nil { s.Close() return nil, err } return s, nil } -func (s *mockServer) waitForStartup(ferr *error, ferrt *int32) error { +func (s *mockServer) waitForStartup(ferr *atomic.Pointer[error]) error { var lerr error start := time.Now() for { - if atomic.LoadInt32(ferrt) != 0 { - return *ferr + if perr := ferr.Load(); perr != nil { + return *perr } if time.Since(start) > time.Second*5 { if lerr != nil {