Replace out old style atomics

This commit is contained in:
tidwall 2022-11-03 10:07:17 -07:00
parent bfcd9fc94f
commit 9c471e3f9c
6 changed files with 49 additions and 43 deletions

View File

@ -147,7 +147,7 @@ type Manager struct {
mu sync.RWMutex mu sync.RWMutex
conns map[string]Conn conns map[string]Conn
publisher LocalPublisher publisher LocalPublisher
shutdown int32 // atomic bool shutdown atomic.Bool // atomic bool
wg sync.WaitGroup // run wait group wg sync.WaitGroup // run wait group
} }
@ -164,7 +164,7 @@ func NewManager(publisher LocalPublisher) *Manager {
func (epc *Manager) Shutdown() { func (epc *Manager) Shutdown() {
defer epc.wg.Wait() defer epc.wg.Wait()
atomic.StoreInt32(&epc.shutdown, 1) epc.shutdown.Store(true)
// expire the connections // expire the connections
epc.mu.Lock() epc.mu.Lock()
defer epc.mu.Unlock() defer epc.mu.Unlock()
@ -177,7 +177,7 @@ func (epc *Manager) Shutdown() {
func (epc *Manager) run() { func (epc *Manager) run() {
defer epc.wg.Done() defer epc.wg.Done()
for { for {
if atomic.LoadInt32(&epc.shutdown) != 0 { if epc.shutdown.Load() {
return return
} }
time.Sleep(time.Second) time.Sleep(time.Second)

View File

@ -11,7 +11,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/tidwall/buntdb" "github.com/tidwall/buntdb"
@ -157,7 +156,7 @@ func (s *Server) writeAOF(args []string, d *commandDetails) error {
} }
if s.aof != nil { if s.aof != nil {
atomic.StoreInt32(&s.aofdirty, 1) // prewrite optimization flag s.aofdirty.Store(true) // prewrite optimization flag
n := len(s.aofbuf) n := len(s.aofbuf)
s.aofbuf = redcon.AppendArray(s.aofbuf, len(args)) s.aofbuf = redcon.AppendArray(s.aofbuf, len(args))
for _, arg := range args { for _, arg := range args {

View File

@ -90,7 +90,7 @@ func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
} }
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
objs = int(n) objs = int(n)
var k uint64 var k atomic.Uint64
for i := 0; i < cols; i++ { for i := 0; i < cols; i++ {
key := "mi:" + strconv.FormatInt(int64(i), 10) key := "mi:" + strconv.FormatInt(int64(i), 10)
func(key string) { func(key string) {
@ -137,15 +137,14 @@ func (s *Server) cmdMassInsert(msg *Message) (res resp.Value, err error) {
log.Fatal(err) log.Fatal(err)
return return
} }
atomic.AddUint64(&k, 1) k.Add(1)
if j%1000 == 1000-1 { if j%1000 == 1000-1 {
log.Debugf("massinsert: %s %d/%d", log.Debugf("massinsert: %s %d/%d", key, k.Load(), cols*objs)
key, atomic.LoadUint64(&k), cols*objs)
} }
} }
}(key) }(key)
} }
log.Infof("massinsert: done %d objects", atomic.LoadUint64(&k)) log.Infof("massinsert: done %d objects", k.Load())
return OKMessage(msg, start), nil return OKMessage(msg, start), nil
} }

View File

@ -70,6 +70,9 @@ type commandDetails struct {
// Server is a tile38 controller // Server is a tile38 controller
type Server struct { type Server struct {
// user defined options
opts Options
// static values // static values
unix string unix string
host string host string
@ -104,24 +107,21 @@ type Server struct {
conns map[int]*Client conns map[int]*Client
mu sync.RWMutex mu sync.RWMutex
// aof
aof *os.File // active aof file aof *os.File // active aof file
aofdirty int32 // mark the aofbuf as having data aofdirty atomic.Bool // mark the aofbuf as having data
aofbuf []byte // prewrite buffer aofbuf []byte // prewrite buffer
aofsz int // active size of the aof file aofsz int // active size of the aof file
shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log
// database
qdb *buntdb.DB // hook queue log qdb *buntdb.DB // hook queue log
qidx uint64 // hook queue log last idx qidx uint64 // hook queue log last idx
cols *btree.Map[string, *collection.Collection] // data collections cols *btree.Map[string, *collection.Collection] // data collections
follows map[*bytes.Buffer]bool
fcond *sync.Cond
lstack []*commandDetails
lives map[*liveBuffer]bool
lcond *sync.Cond
fcup bool // follow caught up
fcuponce bool // follow caught up once
shrinking bool // aof shrinking flag
shrinklog [][]string // aof shrinking log
hooks *btree.BTree // hook name -- [string]*Hook hooks *btree.BTree // hook name -- [string]*Hook
hookCross *rtree.RTree // hook spatial tree for "cross" geofences hookCross *rtree.RTree // hook spatial tree for "cross" geofences
hookTree *rtree.RTree // hook spatial tree for all hookTree *rtree.RTree // hook spatial tree for all
@ -130,16 +130,26 @@ type Server struct {
groupObjects *btree.BTree // objects that are connected to hooks groupObjects *btree.BTree // objects that are connected to hooks
hookExpires *btree.BTree // queue of all hooks marked for expiration hookExpires *btree.BTree // queue of all hooks marked for expiration
// followers (external aof readers)
follows map[*bytes.Buffer]bool
fcond *sync.Cond
lstack []*commandDetails
lives map[*liveBuffer]bool
lcond *sync.Cond
fcup bool // follow caught up
fcuponce bool // follow caught up once
aofconnM map[net.Conn]io.Closer aofconnM map[net.Conn]io.Closer
// lua scripts
luascripts *lScriptMap luascripts *lScriptMap
luapool *lStatePool luapool *lStatePool
// pubsub system (SUBSCRIBE, PUBLISH, and SETCHAN)
pubsub *pubsub pubsub *pubsub
// monitor connections (using the MONITOR command)
monconnsMu sync.RWMutex monconnsMu sync.RWMutex
monconns map[net.Conn]bool // monitor connections monconns map[net.Conn]bool
opts Options
} }
// Options for Serve() // Options for Serve()
@ -629,14 +639,14 @@ func (s *Server) netServe() error {
// write to client // write to client
if len(client.out) > 0 { if len(client.out) > 0 {
if atomic.LoadInt32(&s.aofdirty) != 0 { if s.aofdirty.Load() {
func() { func() {
// prewrite // prewrite
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.flushAOF(false) s.flushAOF(false)
}() }()
atomic.StoreInt32(&s.aofdirty, 0) s.aofdirty.Store(false)
} }
conn.Write(client.out) conn.Write(client.out)
client.out = nil client.out = nil

View File

@ -473,15 +473,15 @@ func fence_eecio_test(mc *mockServer) error {
return nil return nil
}() }()
}() }()
var timeok int32 var timeok atomic.Bool
go func() { go func() {
time.Sleep(time.Second * 30) time.Sleep(time.Second * 30)
if atomic.LoadInt32(&timeok) == 0 { if !timeok.Load() {
panic("timeout") panic("timeout")
} }
}() }()
wg.Wait() wg.Wait()
atomic.StoreInt32(&timeok, 1) timeok.Store(true)
if err3 != nil { if err3 != nil {
return err3 return err3
} }

View File

@ -108,8 +108,7 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
if opts.Metrics { if opts.Metrics {
s.mport = getNextPort() s.mport = getNextPort()
} }
var ferrt int32 // atomic flag for when ferr has been set var ferr atomic.Pointer[error] // ferr for when the server fails to start
var ferr error // ferr for when the server fails to start
go func() { go func() {
sopts := server.Options{ sopts := server.Options{
Host: "localhost", Host: "localhost",
@ -126,23 +125,22 @@ func mockOpenServer(opts MockServerOptions) (*mockServer, error) {
} }
err := server.Serve(sopts) err := server.Serve(sopts)
if err != nil { if err != nil {
ferr = err ferr.CompareAndSwap(nil, &err)
atomic.StoreInt32(&ferrt, 1)
} }
}() }()
if err := s.waitForStartup(&ferr, &ferrt); err != nil { if err := s.waitForStartup(&ferr); err != nil {
s.Close() s.Close()
return nil, err return nil, err
} }
return s, nil return s, nil
} }
func (s *mockServer) waitForStartup(ferr *error, ferrt *int32) error { func (s *mockServer) waitForStartup(ferr *atomic.Pointer[error]) error {
var lerr error var lerr error
start := time.Now() start := time.Now()
for { for {
if atomic.LoadInt32(ferrt) != 0 { if perr := ferr.Load(); perr != nil {
return *ferr return *perr
} }
if time.Since(start) > time.Second*5 { if time.Since(start) > time.Second*5 {
if lerr != nil { if lerr != nil {