mirror of https://github.com/tidwall/tile38.git
164 lines
3.6 KiB
Go
164 lines
3.6 KiB
Go
package resp
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SyncPolicy represents a file's fsync policy.
|
|
type SyncPolicy int
|
|
|
|
const (
|
|
Never SyncPolicy = iota // The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
|
|
EverySecond SyncPolicy = iota // The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
|
|
Always SyncPolicy = iota // The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
|
|
)
|
|
|
|
// String returns a string respesentation.
|
|
func (policy SyncPolicy) String() string {
|
|
switch policy {
|
|
default:
|
|
return "unknown"
|
|
case Never:
|
|
return "never"
|
|
case EverySecond:
|
|
return "every second"
|
|
case Always:
|
|
return "always"
|
|
}
|
|
}
|
|
|
|
var errClosed = errors.New("closed")
|
|
|
|
// AOF represents an open file descriptor.
|
|
type AOF struct {
|
|
mu sync.Mutex
|
|
f *os.File
|
|
closed bool
|
|
rd *Reader
|
|
policy SyncPolicy
|
|
atEnd bool
|
|
}
|
|
|
|
// OpenAOF will open and return an AOF file. If the file does not exist a new one will be created.
|
|
func OpenAOF(path string) (*AOF, error) {
|
|
var err error
|
|
aof := &AOF{}
|
|
aof.f, err = os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0600)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
aof.policy = EverySecond
|
|
go func() {
|
|
for {
|
|
time.Sleep(time.Second)
|
|
aof.mu.Lock()
|
|
if aof.closed {
|
|
aof.mu.Unlock()
|
|
return
|
|
}
|
|
if aof.policy == EverySecond {
|
|
aof.f.Sync()
|
|
}
|
|
aof.mu.Unlock()
|
|
}
|
|
}()
|
|
return aof, nil
|
|
}
|
|
|
|
// SetSyncPolicy set the sync policy of the file.
|
|
// The policy 'EverySecond' means that fsync will be called every second or so. This is fast enough, and at most you can lose one second of data if there is a disaster.
|
|
// The policy 'Never' means fsync is never called, the Operating System will be in charge of your data. This is the fastest and less safe method.
|
|
// The policy 'Always' means fsync is called after every write. This is super duper safe and very incredibly slow.
|
|
// EverySecond is the default.
|
|
func (aof *AOF) SetSyncPolicy(policy SyncPolicy) {
|
|
aof.mu.Lock()
|
|
defer aof.mu.Unlock()
|
|
if aof.policy == policy {
|
|
return
|
|
}
|
|
switch policy {
|
|
default:
|
|
return
|
|
case Never, EverySecond, Always:
|
|
}
|
|
aof.policy = policy
|
|
}
|
|
|
|
// Close will close the file.
|
|
func (aof *AOF) Close() error {
|
|
aof.mu.Lock()
|
|
defer aof.mu.Unlock()
|
|
if aof.closed {
|
|
return errClosed
|
|
}
|
|
aof.f.Close()
|
|
aof.closed = true
|
|
return nil
|
|
}
|
|
|
|
func (aof *AOF) readValues(iterator func(v Value)) error {
|
|
aof.atEnd = false
|
|
if _, err := aof.f.Seek(0, 0); err != nil {
|
|
return err
|
|
}
|
|
rd := NewReader(aof.f)
|
|
for {
|
|
v, _, err := rd.ReadValue()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return err
|
|
}
|
|
if iterator != nil {
|
|
iterator(v)
|
|
}
|
|
}
|
|
if _, err := aof.f.Seek(0, 2); err != nil {
|
|
return err
|
|
}
|
|
aof.atEnd = true
|
|
return nil
|
|
}
|
|
|
|
// Append writes a value to the end of the file.
|
|
func (aof *AOF) Append(v Value) error {
|
|
b, err := v.MarshalRESP()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
aof.mu.Lock()
|
|
defer aof.mu.Unlock()
|
|
if aof.closed {
|
|
return errClosed
|
|
}
|
|
if !aof.atEnd {
|
|
if err := aof.readValues(nil); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
_, err = aof.f.Write(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if aof.policy == Always {
|
|
aof.f.Sync()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Scan iterates though all values in the file.
|
|
// This operation could take a long time if there lots of values, and the operation cannot be canceled part way through.
|
|
func (aof *AOF) Scan(iterator func(v Value)) error {
|
|
aof.mu.Lock()
|
|
defer aof.mu.Unlock()
|
|
if aof.closed {
|
|
return errClosed
|
|
}
|
|
return aof.readValues(iterator)
|
|
}
|