rpl file store check read table keepalived

This commit is contained in:
siddontang 2014-11-13 13:41:07 +08:00
parent 4e802b06e3
commit 10a367f978
1 changed files with 29 additions and 0 deletions

View File

@ -58,11 +58,15 @@ type FileStore struct {
rs tableReaders rs tableReaders
w *tableWriter w *tableWriter
quit chan struct{}
} }
func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) { func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error) {
s := new(FileStore) s := new(FileStore)
s.quit = make(chan struct{})
var err error var err error
if err = os.MkdirAll(base, 0755); err != nil { if err = os.MkdirAll(base, 0755); err != nil {
@ -88,6 +92,8 @@ func NewFileStore(base string, maxSize int64, syncType int) (*FileStore, error)
s.w = newTableWriter(s.base, index, s.maxFileSize) s.w = newTableWriter(s.base, index, s.maxFileSize)
s.w.SetSyncType(syncType) s.w.SetSyncType(syncType)
go s.checkTableReaders()
return s, nil return s, nil
} }
@ -244,6 +250,8 @@ func (s *FileStore) Clear() error {
} }
func (s *FileStore) Close() error { func (s *FileStore) Close() error {
close(s.quit)
s.wm.Lock() s.wm.Lock()
s.rm.Lock() s.rm.Lock()
@ -267,6 +275,27 @@ func (s *FileStore) Close() error {
return nil return nil
} }
func (s *FileStore) checkTableReaders() {
t := time.NewTicker(60 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
s.rm.Lock()
for _, r := range s.rs {
if !r.Keepalived() {
r.Close()
}
}
s.rm.Unlock()
case <-s.quit:
return
}
}
}
func (s *FileStore) load() error { func (s *FileStore) load() error {
fs, err := ioutil.ReadDir(s.base) fs, err := ioutil.ReadDir(s.base)
if err != nil { if err != nil {