diff --git a/revid/config/config.go b/revid/config/config.go index 18e1dae8..25d8c9f6 100644 --- a/revid/config/config.go +++ b/revid/config/config.go @@ -187,8 +187,9 @@ type Config struct { // logging.Info, logging.Warning logging.Error, logging.Fatal. LogLevel int8 - Loop bool // If true will restart reading of input after an io.EOF. - MinFPS uint // The reduced framerate of the video when there is no motion. + Loop bool // If true will restart reading of input after an io.EOF. + MaxFileSize uint // Maximum size in bytes that a file will be written when File output is to be used. A value of 0 means unlimited. + MinFPS uint // The reduced framerate of the video when there is no motion. // MinFrames defines the frequency of key NAL units SPS, PPS and IDR in // number of NAL units. This will also determine the frequency of PSI if the diff --git a/revid/config/variables.go b/revid/config/variables.go index 0727c02c..f77dfce3 100644 --- a/revid/config/variables.go +++ b/revid/config/variables.go @@ -65,6 +65,7 @@ const ( KeyISO = "ISO" KeyLogging = "logging" KeyLoop = "Loop" + KeyMaxFileSize = "MaxFileSize" KeyMinFPS = "MinFPS" KeyMinFrames = "MinFrames" KeyMode = "mode" @@ -399,6 +400,11 @@ var Variables = []struct { Type: typeBool, Update: func(c *Config, v string) { c.Loop = parseBool(KeyLoop, v, c) }, }, + { + Name: KeyMaxFileSize, + Type: typeUint, + Update: func(c *Config, v string) { c.MaxFileSize = parseUint(KeyMaxFileSize, v, c) }, + }, { Name: KeyMinFPS, Type: typeUint, diff --git a/revid/pipeline.go b/revid/pipeline.go index f959674e..d35c20b6 100644 --- a/revid/pipeline.go +++ b/revid/pipeline.go @@ -200,7 +200,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. mtsSenders = append(mtsSenders, w) case config.OutputFile: r.cfg.Logger.Debug("using File output") - w, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, false) + w, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, false, r.cfg.MaxFileSize) if err != nil { return err } @@ -208,7 +208,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. case config.OutputFiles: r.cfg.Logger.Debug("using Files output") pb := pool.NewBuffer(int(r.cfg.PoolStartElementSize), int(nElements), writeTimeout) - fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true) + fs, err := newFileSender(r.cfg.Logger, r.cfg.OutputPath, true, r.cfg.MaxFileSize) if err != nil { return err } diff --git a/revid/senders.go b/revid/senders.go index 5f5e9e89..e454cef7 100644 --- a/revid/senders.go +++ b/revid/senders.go @@ -35,6 +35,7 @@ import ( "net" "os" "sync" + "syscall" "time" "github.com/Comcast/gots/packet" @@ -189,39 +190,77 @@ func extractMeta(r string, log logging.Logger) error { // fileSender implements loadSender for a local file destination. type fileSender struct { - file *os.File - data []byte - multiFile bool - path string - init bool - log logging.Logger + file *os.File + data []byte + multiFile bool + maxFileSize uint // maxFileSize is in bytes. A size of 0 means there is no size limit. + path string + log logging.Logger } // newFileSender returns a new fileSender. Setting multi true will write a new // file for each write to this sender. -func newFileSender(l logging.Logger, path string, multiFile bool) (*fileSender, error) { +func newFileSender(l logging.Logger, path string, multiFile bool, maxFileSize uint) (*fileSender, error) { return &fileSender{ - path: path, - log: l, - multiFile: multiFile, - init: true, + path: path, + log: l, + multiFile: multiFile, + maxFileSize: maxFileSize, }, nil } // Write implements io.Writer. func (s *fileSender) Write(d []byte) (int, error) { - if s.init || s.multiFile { - fileName := s.path + time.Now().String() - s.log.Debug("creating new output file", "init", s.init, "multiFile", s.multiFile, "fileName", fileName) + s.log.Debug("checking disk space") + var stat syscall.Statfs_t + if err := syscall.Statfs("/", &stat); err != nil { + return 0, fmt.Errorf("could not read system disk space, abandoning write: %w", err) + } + availableSpace := stat.Bavail * uint64(stat.Bsize) + totalSpace := stat.Blocks * uint64(stat.Bsize) + s.log.Debug("available, total disk space in bytes", "availableSpace", availableSpace, "totalSpace", totalSpace) + var spaceBuffer uint64 = 50000000 // 50MB. + if availableSpace < spaceBuffer { + return 0, fmt.Errorf("reached limit of disk space with a buffer of %v bytes, abandoning write", spaceBuffer) + } + + // If the write will exceed the max file size, close the file so that a new one can be created. + if s.maxFileSize != 0 && s.file != nil { + fileInfo, err := s.file.Stat() + if err != nil { + return 0, fmt.Errorf("could not read files stats: %w", err) + } + size := uint(fileInfo.Size()) + s.log.Debug("checked file size", "bytes", size) + if size+uint(len(d)) > s.maxFileSize { + s.log.Debug("new write would exceed max file size, closing existing file", "maxFileSize", s.maxFileSize) + s.file.Close() + s.file = nil + } + } + + if s.file == nil { + fileName := s.path + time.Now().Format("2006-01-02_15-04-05") + s.log.Debug("creating new output file", "multiFile", s.multiFile, "fileName", fileName) f, err := os.Create(fileName) if err != nil { return 0, fmt.Errorf("could not create file to write media to: %w", err) } s.file = f - s.init = false } - s.log.Debug("writing output file", "len(d)", len(d)) - return s.file.Write(d) + + s.log.Debug("writing to output file", "bytes", len(d)) + n, err := s.file.Write(d) + if err != nil { + return n, err + } + + if s.multiFile { + s.file.Close() + s.file = nil + } + + return n, nil } func (s *fileSender) Close() error { return s.file.Close() }