diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index d9c737e1..ac02572c 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -52,6 +52,13 @@ const ( defaultLogVerbosity = logger.Debug ) +// Revid modes +const ( + normal = "Normal" + paused = "Paused" + burst = "Burst" +) + // Other misc consts const ( netSendRetryTime = 5 * time.Second @@ -81,23 +88,22 @@ func main() { cfg := handleFlags() if !*useNetsender { - // run revid for the specified duration - rv, _, err := startRevid(nil, cfg) + rv, err := revid.New(cfg, nil) if err != nil { + cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) + } + if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) - err = stopRevid(rv) - if err != nil { + if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } - err := run(nil, cfg) - if err != nil { + if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) - os.Exit(1) } } @@ -253,66 +259,97 @@ func handleFlags() revid.Config { } // initialize then run the main NetSender client -func run(rv *revid.Revid, cfg revid.Config) error { - // initialize NetSender and use NetSender's logger - //config.Logger = netsender.Logger() +func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") + var vars map[string]string + + // initialize NetSender and use NetSender's logger var ns netsender.Sender err := ns.Init(log, nil, nil, nil) if err != nil { return err } - vars, _ := ns.Vars() + + vars, _ = ns.Vars() vs := ns.VarSum() - paused := false - if vars["mode"] == "Paused" { - paused = true + + rv, err := revid.New(cfg, &ns) + if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } - if !paused { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + + // Update revid to get latest config settings from netreceiver. + err = rv.Update(vars) + if err != nil { + return err + } + + // If mode on netreceiver isn't paused then we can start revid. + if ns.Mode() != paused && ns.Mode() != burst { + err = rv.Start() if err != nil { return err } } + if ns.Mode() == burst { + ns.SetMode(paused, &vs) + } + for { - if err := send(&ns, rv); err != nil { - log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) + // TODO(saxon): replace this call with call to ns.Run(). + err = send(&ns, rv) + if err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } - if vs != ns.VarSum() { - // vars changed - vars, err := ns.Vars() - if err != nil { - log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) - time.Sleep(netSendRetryTime) - continue - } - vs = ns.VarSum() - if vars["mode"] == "Paused" { - if !paused { - log.Log(logger.Info, pkg+"pausing revid") - err = stopRevid(rv) - if err != nil { - log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error()) - continue - } - paused = true - } - } else { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused) - if err != nil { - return err - } - if paused { - paused = false - } - } + // If var sum hasn't change we continue + if vs == ns.VarSum() { + goto sleep + } + + vars, err = ns.Vars() + if err != nil { + log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + vs = ns.VarSum() + + err = rv.Update(vars) + if err != nil { + return err + } + + switch ns.Mode() { + case paused: + case normal: + err = rv.Start() + if err != nil { + return err + } + case burst: + log.Log(logger.Info, pkg+"Starting burst...") + err = rv.Start() + if err != nil { + return err + } + time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) + log.Log(logger.Info, pkg+"Stopping burst...") + err = rv.Stop() + if err != nil { + return err + } + ns.SetMode(paused, &vs) + } + sleep: + sleepTime, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + return err } - sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } } @@ -340,139 +377,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -// wrappers for stopping and starting revid -func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { - rv, err := revid.New(cfg, ns) - if err != nil { - return nil, cfg, err - } - err = rv.Start() - return rv, cfg, err -} - -func stopRevid(rv *revid.Revid) error { - err := rv.Stop() - if err != nil { - return err - } - - // FIXME(kortschak): Is this waiting on completion of work? - // Use a wait group and Wait method if it is. - time.Sleep(revidStopTime) - return nil -} - -func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { - if stop { - err := stopRevid(rv) - if err != nil { - return nil, cfg, err - } - } - - //look through the vars and update revid where needed - for key, value := range vars { - switch key { - case "Output": - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - cfg.Outputs[0] = revid.File - case "Http": - cfg.Outputs[0] = revid.Http - case "Rtmp": - cfg.Outputs[0] = revid.Rtmp - case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp - default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue - } - case "FramesPerClip": - f, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) - break - } - cfg.FramesPerClip = uint(f) - case "RtmpUrl": - cfg.RtmpUrl = value - case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.Bitrate = uint(r) - case "OutputFileName": - cfg.OutputFileName = value - case "InputFileName": - cfg.InputFileName = value - case "Height": - h, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) - break - } - cfg.Height = uint(h) - case "Width": - w, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) - break - } - cfg.Width = uint(w) - case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.FrameRate = uint(r) - case "HttpAddress": - cfg.HttpAddress = value - case "Quantization": - q, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) - break - } - cfg.Quantization = uint(q) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) - break - } - cfg.IntraRefreshPeriod = uint(p) - case "HorizontalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipHorizontal = true - case "false": - cfg.FlipHorizontal = false - default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) - } - case "VerticalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipVertical = true - case "false": - cfg.FlipVertical = false - default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) - } - default: - } - } - - return startRevid(ns, cfg) -} - // flagStrings implements an appending string set flag. type flagStrings []string diff --git a/experimentation/flac/decode.go b/experimentation/flac/decode.go new file mode 100644 index 00000000..34d42057 --- /dev/null +++ b/experimentation/flac/decode.go @@ -0,0 +1,144 @@ +/* +NAME + decode.go + +DESCRIPTION + decode.go provides functionality for the decoding of FLAC compressed audio + +AUTHOR + Saxon Nelson-Milton + +LICENSE + decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package flac + +import ( + "bytes" + "errors" + "io" + + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/mewkiz/flac" +) + +const wavFormat = 1 + +// writeSeeker implements a memory based io.WriteSeeker. +type writeSeeker struct { + buf []byte + pos int +} + +// Bytes returns the bytes contained in the writeSeekers buffer. +func (ws *writeSeeker) Bytes() []byte { + return ws.buf +} + +// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number +// of bytes written. If less than len(p) bytes are written, an error is returned. +func (ws *writeSeeker) Write(p []byte) (n int, err error) { + minCap := ws.pos + len(p) + if minCap > cap(ws.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra + copy(buf2, ws.buf) + ws.buf = buf2 + } + if minCap > len(ws.buf) { + ws.buf = ws.buf[:minCap] + } + copy(ws.buf[ws.pos:], p) + ws.pos += len(p) + return len(p), nil +} + +// Seek sets the offset for the next Read or Write to offset, interpreted according +// to whence: SeekStart means relative to the start of the file, SeekCurrent means +// relative to the current offset, and SeekEnd means relative to the end. Seek returns +// the new offset relative to the start of the file and an error, if any. +func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = ws.pos + offs + case io.SeekEnd: + newPos = len(ws.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + ws.pos = newPos + return int64(newPos), nil +} + +// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding +// fails, an error is returned. +func Decode(buf []byte) ([]byte, error) { + + // Lex the FLAC into a stream to hold audio and it's properties. + r := bytes.NewReader(buf) + stream, err := flac.Parse(r) + if err != nil { + return nil, errors.New("Could not parse FLAC") + } + + // Create WAV encoder and pass writeSeeker that will store output WAV. + ws := &writeSeeker{} + sr := int(stream.Info.SampleRate) + bps := int(stream.Info.BitsPerSample) + nc := int(stream.Info.NChannels) + enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat) + defer enc.Close() + + // Decode FLAC into frames of samples + intBuf := &audio.IntBuffer{ + Format: &audio.Format{NumChannels: nc, SampleRate: sr}, + SourceBitDepth: bps, + } + return decodeFrames(stream, intBuf, enc, ws) +} + +// decodeFrames parses frames from the stream and encodes them into WAV until +// the end of the stream is reached. The bytes from writeSeeker buffer are then +// returned. If any errors occur during encodeing, nil bytes and the error is returned. +func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { + var data []int + for { + frame, err := s.ParseNext() + + // If we've reached the end of the stream then we can output the writeSeeker's buffer. + if err == io.EOF { + return ws.Bytes(), nil + } else if err != nil { + return nil, err + } + + // Encode WAV audio samples. + data = data[:0] + for i := 0; i < frame.Subframes[0].NSamples; i++ { + for _, subframe := range frame.Subframes { + data = append(data, int(subframe.Samples[i])) + } + } + intBuf.Data = data + if err := e.Write(intBuf); err != nil { + return nil, err + } + } +} diff --git a/experimentation/flac/flac_test.go b/experimentation/flac/flac_test.go new file mode 100644 index 00000000..1f8019e5 --- /dev/null +++ b/experimentation/flac/flac_test.go @@ -0,0 +1,121 @@ +/* +NAME + flac_test.go + +DESCRIPTION + flac_test.go provides utilities to test FLAC audio decoding + +AUTHOR + Saxon Nelson-Milton + +LICENSE + flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package flac + +import ( + "io" + "io/ioutil" + "os" + "testing" +) + +const ( + testFile = "../../../test/test-data/av/input/robot.flac" + outFile = "testOut.wav" +) + +// TestWriteSeekerWrite checks that basic writing to the ws works as expected. +func TestWriteSeekerWrite(t *testing.T) { + ws := &writeSeeker{} + + const tstStr1 = "hello" + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Write failed, got: %v, want: %v", got, tstStr1) + } + + const tstStr2 = " world" + const want = "hello world" + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want { + t.Errorf("Second write failed, got: %v, want: %v", got, want) + } +} + +// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we +// can write, then seek to a knew place in the buf, and write again, either replacing +// bytes, or appending bytes. +func TestWriteSeekerSeek(t *testing.T) { + ws := &writeSeeker{} + + const tstStr1 = "hello" + want1 := tstStr1 + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want1) + } + + const tstStr2 = " world" + const want2 = tstStr1 + tstStr2 + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want2 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want2) + } + + const tstStr3 = "k!" + const want3 = "hello work!" + ws.Seek(-2, io.SeekEnd) + ws.Write([]byte(tstStr3)) + got = string(ws.buf) + if got != want3 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want3) + } + + const tstStr4 = "gopher" + const want4 = "hello gopher" + ws.Seek(6, io.SeekStart) + ws.Write([]byte(tstStr4)) + got = string(ws.buf) + if got != want4 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want4) + } +} + +// TestDecodeFlac checks that we can load a flac file and decode to wav, writing +// to a wav file. +func TestDecodeFlac(t *testing.T) { + b, err := ioutil.ReadFile(testFile) + if err != nil { + t.Fatalf("Could not read test file, failed with err: %v", err.Error()) + } + out, err := Decode(b) + if err != nil { + t.Errorf("Could not decode, failed with err: %v", err.Error()) + } + f, err := os.Create(outFile) + if err != nil { + t.Fatalf("Could not create output file, failed with err: %v", err.Error()) + } + _, err = f.Write(out) + if err != nil { + t.Fatalf("Could not write to output file, failed with err: %v", err.Error()) + } +} diff --git a/experimentation/ts-repair/main.go b/experimentation/ts-repair/main.go new file mode 100644 index 00000000..bed81f19 --- /dev/null +++ b/experimentation/ts-repair/main.go @@ -0,0 +1,265 @@ +/* +NAME + ts-repair/main.go + +DESCRIPTION + This program attempts to repair mpegts discontinuities using one of two methods + as selected by the mode flag. Setting the mode flag to 0 will result in repair + by shifting all CCs such that they are continuous. Setting the mode flag to 1 + will result in repair through setting the discontinuity indicator to true at + packets where a discontinuity exists. + + Specify the input file with the in flag, and the output file with out flag. + +AUTHOR + Saxon A. Nelson-Milton + +LICENSE + mpegts.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). +*/ + +package main + +import ( + "errors" + "flag" + "fmt" + "io" + "os" + + "bitbucket.org/ausocean/av/stream/mts" + "github.com/Comcast/gots/packet" +) + +const ( + PatPid = 0 + PmtPid = 4096 + VideoPid = 256 + HeadSize = 4 + DefaultAdaptationSize = 2 + AdaptationIdx = 4 + AdaptationControlIdx = 3 + AdaptationBodyIdx = AdaptationIdx + 1 + AdaptationControlMask = 0x30 + DefaultAdaptationBodySize = 1 + DiscontinuityIndicatorMask = 0x80 + DiscontinuityIndicatorIdx = AdaptationIdx + 1 +) + +// Various errors that we can encounter. +const ( + errBadInPath = "No file path provided, or file does not exist" + errCantCreateOut = "Can't create output file" + errCantGetPid = "Can't get pid from packet" + errReadFail = "Read failed" + errWriteFail = "Write to file failed" + errBadMode = "Bad fix mode" + errAdaptationPresent = "Adaptation field is already present in packet" + errNoAdaptationField = "No adaptation field in this packet" +) + +// Consts describing flag usage. +const ( + inUsage = "The path to the file to be repaired" + outUsage = "Output file path" + modeUsage = "Fix mode: 0 = cc-shift, 1 = di-update" +) + +// Repair modes. +const ( + ccShift = iota + diUpdate +) + +var ccMap = map[int]byte{ + PatPid: 16, + PmtPid: 16, + VideoPid: 16, +} + +// packetNo will keep track of the ts packet number for reference. +var packetNo int + +// Option defines a func that performs an action on p in order to change a ts option. +type Option func(p *Packet) + +// Packet is a byte array of size PacketSize i.e. 188 bytes. We define this +// to allow us to write receiver funcs for the [PacketSize]byte type. +type Packet [mts.PacketSize]byte + +// CC returns the CC of p. +func (p *Packet) CC() byte { + return (*p)[3] & 0x0f +} + +// setCC sets the CC of p. +func (p *Packet) setCC(cc byte) { + (*p)[3] |= cc & 0xf +} + +// setDI sets the discontinuity counter of p. +func (p *Packet) setDI(di bool) { + if di { + p[5] |= 0x80 + } else { + p[5] &= 0x7f + } +} + +// addAdaptationField adds an adaptation field to p, and applys the passed options to this field. +// TODO: this will probably break if we already have adaptation field. +func (p *Packet) addAdaptationField(options ...Option) error { + if p.hasAdaptation() { + return errors.New(errAdaptationPresent) + } + // Create space for adaptation field. + copy(p[HeadSize+DefaultAdaptationSize:], p[HeadSize:len(p)-DefaultAdaptationSize]) + + // TODO: seperate into own function + // Update adaptation field control. + p[AdaptationControlIdx] &= 0xff ^ AdaptationControlMask + p[AdaptationControlIdx] |= AdaptationControlMask + // Default the adaptationfield. + p.resetAdaptation() + + // Apply and options that have bee passed. + for _, option := range options { + option(p) + } + return nil +} + +// resetAdaptation sets fields in ps adaptation field to 0 if the adaptation field +// exists, otherwise an error is returned. +func (p *Packet) resetAdaptation() error { + if !p.hasAdaptation() { + return errors.New(errNoAdaptationField) + } + p[AdaptationIdx] = DefaultAdaptationBodySize + p[AdaptationBodyIdx] = 0x00 + return nil +} + +// hasAdaptation returns true if p has an adaptation field and false otherwise. +func (p *Packet) hasAdaptation() bool { + afc := p[AdaptationControlIdx] & AdaptationControlMask + if afc == 0x20 || afc == 0x30 { + return true + } else { + return false + } +} + +// DiscontinuityIndicator returns and Option that will set p's discontinuity +// indicator according to f. +func DiscontinuityIndicator(f bool) Option { + return func(p *Packet) { + set := byte(DiscontinuityIndicatorMask) + if !f { + set = 0x00 + } + p[DiscontinuityIndicatorIdx] &= 0xff ^ DiscontinuityIndicatorMask + p[DiscontinuityIndicatorIdx] |= DiscontinuityIndicatorMask & set + } +} + +func main() { + // Deal with input flags + inPtr := flag.String("in", "", inUsage) + outPtr := flag.String("out", "out.ts", outUsage) + modePtr := flag.Int("mode", diUpdate, modeUsage) + flag.Parse() + + // Try and open the given input file, otherwise panic - we can't do anything + inFile, err := os.Open(*inPtr) + defer inFile.Close() + if err != nil { + panic(errBadInPath) + } + + // Try and create output file, otherwise panic - we can't do anything + outFile, err := os.Create(*outPtr) + defer outFile.Close() + if err != nil { + panic(errCantCreateOut) + } + + // Read each packet from the input file reader + var p Packet + for { + // If we get an end of file then return, otherwise we panic - can't do anything else + if _, err := inFile.Read(p[:mts.PacketSize]); err == io.EOF { + return + } else if err != nil { + panic(errReadFail + ": " + err.Error()) + } + packetNo++ + + // Get the pid from the packet + pid, err := packet.Pid((*packet.Packet)(&p)) + if err != nil { + panic(errCantGetPid) + } + + // Get the cc from the packet and also the expected cc (if exists) + cc := p.CC() + expect, exists := expectedCC(int(pid)) + if !exists { + updateCCMap(int(pid), cc) + } else { + switch *modePtr { + // ccShift mode shifts all CC regardless of presence of Discontinuities or not + case ccShift: + p.setCC(expect) + // diUpdate mode finds discontinuities and sets the discontinuity indicator to true. + // If we have a pat or pmt then we need to add an adaptation field and then set the DI. + case diUpdate: + if cc != expect { + fmt.Printf("***** Discontinuity found (packetNo: %v pid: %v, cc: %v, expect: %v)\n", packetNo, pid, cc, expect) + if p.hasAdaptation() { + p.setDI(true) + } else { + p.addAdaptationField(DiscontinuityIndicator(true)) + } + updateCCMap(int(pid), p.CC()) + } + default: + panic(errBadMode) + } + } + + // Write this packet to the output file. + if _, err := outFile.Write(p[:]); err != nil { + panic(errWriteFail + ": " + err.Error()) + } + } +} + +// expectedCC returns the expected cc for the given pid. If the cc hasn't been +// used yet, then 16 and false is returned. +func expectedCC(pid int) (byte, bool) { + cc := ccMap[pid] + if cc == 16 { + return 16, false + } + ccMap[pid] = (cc + 1) & 0xf + return cc, true +} + +// updateCCMap updates the cc for the passed pid. +func updateCCMap(pid int, cc byte) { + ccMap[pid] = (cc + 1) & 0xf +} diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go deleted file mode 100644 index 4f7c9d7c..00000000 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "flag" - "log" - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - // Get the rtmp url from a cmd flag - rtmpUrlPtr := flag.String("rtmpUrl", "", "The rtmp url you would like to stream to.") - flag.Parse() - if *rtmpUrlPtr == "" { - log.Println("No RTMP url passed!") - return - } - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Outputs: []byte{revid.Rtmp}, - RtmpMethod: revid.LibRtmp, - RtmpUrl: *rtmpUrlPtr, - Packetization: revid.Flv, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!: ", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go deleted file mode 100644 index 768f560b..00000000 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -NAME - main.go - -DESCRIPTION - See Readme.md - -AUTHOR - Saxon Nelson-Milton - -LICENSE - main.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) - - It is free software: you can redistribute it and/or modify them - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - It is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - for more details. - - You should have received a copy of the GNU General Public License - along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses). -*/ - -package main - -import ( - "time" - - "bitbucket.org/ausocean/av/revid" - "bitbucket.org/ausocean/iot/pi/smartlogger" - "bitbucket.org/ausocean/utils/logger" -) - -const ( - inputFile = "../../../../test/test-data/av/input/betterInput.h264" - outputFile = "output.ts" - frameRate = "25" - runDuration = 120 * time.Second - logPath = "/var/log" -) - -// Test h264 inputfile to flv format into rtmp using librtmp c wrapper -func main() { - - config := revid.Config{ - Input: revid.File, - InputFileName: inputFile, - InputCodec: revid.H264, - Outputs: []byte{revid.File}, - OutputFileName: outputFile, - Packetization: revid.Mpegts, - Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), - } - revidInst, err := revid.New(config, nil) - if err != nil { - config.Logger.Log(logger.Error, "Should not have got an error!:", err.Error()) - return - } - revidInst.Start() - time.Sleep(runDuration) - revidInst.Stop() -} diff --git a/revid/config.go b/revid/config.go index dc9c5a8d..b0ba2bc4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -68,6 +68,7 @@ type Config struct { RtpAddress string Logger Logger SendRetry bool + BurstPeriod uint } // Enums for config struct @@ -114,6 +115,7 @@ const ( defaultInputCodec = H264 defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds ) // Validate checks for any errors in the config fields and defaults settings @@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error { } } + if c.BurstPeriod == 0 { + c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) + c.BurstPeriod = defaultBurstPeriod + } + if c.FramesPerClip < 1 { c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) diff --git a/revid/revid.go b/revid/revid.go index 6833ec2b..02656d03 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -119,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -326,7 +327,14 @@ func (r *Revid) IsRunning() bool { return ret } -// setIsRunning sets revid.isRunning using b. +func (r *Revid) Config() Config { + r.mu.Lock() + cfg := r.config + r.mu.Unlock() + return cfg +} + +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -340,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -363,12 +373,129 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() + return nil +} + +func (r *Revid) Update(vars map[string]string) error { + if r.IsRunning() { + if err := r.Stop(); err != nil { + return err + } + } + //look through the vars and update revid where needed + for key, value := range vars { + switch key { + case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! + switch value { + case "File": + r.config.Outputs[0] = File + case "Http": + r.config.Outputs[0] = Http + case "Rtmp": + r.config.Outputs[0] = Rtmp + case "FfmpegRtmp": + r.config.Outputs[0] = FfmpegRtmp + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + continue + } + case "FramesPerClip": + f, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + break + } + r.config.FramesPerClip = uint(f) + case "RtmpUrl": + r.config.RtmpUrl = value + case "Bitrate": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + r.config.Bitrate = uint(v) + case "OutputFileName": + r.config.OutputFileName = value + case "InputFileName": + r.config.InputFileName = value + case "Height": + h, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) + break + } + r.config.Height = uint(h) + case "Width": + w, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) + break + } + r.config.Width = uint(w) + case "FrameRate": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + r.config.FrameRate = uint(v) + case "HttpAddress": + r.config.HttpAddress = value + case "Quantization": + q, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + break + } + r.config.Quantization = uint(q) + case "IntraRefreshPeriod": + p, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + break + } + r.config.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": + switch strings.ToLower(value) { + case "true": + r.config.FlipHorizontal = true + case "false": + r.config.FlipHorizontal = false + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + } + case "VerticalFlip": + switch strings.ToLower(value) { + case "true": + r.config.FlipVertical = true + case "false": + r.config.FlipVertical = false + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + } + case "BurstPeriod": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) + break + } + r.config.BurstPeriod = uint(v) + } + } + return nil } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { + defer r.wg.Done() lastTime := time.Now() var count int loop: @@ -507,6 +634,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -555,6 +683,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -570,6 +699,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -578,4 +708,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() } diff --git a/rtmp/conn.go b/rtmp/conn.go index 3864df98..7e1b3b15 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -76,7 +76,7 @@ type link struct { protocol int32 timeout uint port uint16 - conn *net.TCPConn + conn net.Conn } // method represents an RTMP method. diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 6ff552b8..a309eaf9 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -85,7 +85,7 @@ var ( ) const ( - psiSndCnt = 7 + psiInterval = 1 * time.Second ) // Meta allows addition of metadata to encoded mts from outside of this pkg. @@ -99,6 +99,14 @@ var ( pmtTable = standardPmt.Bytes() ) +const ( + sdtPid = 17 + patPid = 0 + pmtPid = 4096 + videoPid = 256 + streamID = 0xe0 // First video stream ID. +) + // Time related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -119,9 +127,9 @@ type Encoder struct { tsSpace [PacketSize]byte pesSpace [pes.MaxPesSize]byte - psiCount int - continuity map[int]byte + + psiLastTime time.Time } // NewEncoder returns an Encoder with the specified frame rate. @@ -133,13 +141,18 @@ func NewEncoder(dst io.Writer, fps float64) *Encoder { ptsOffset: ptsOffset, continuity: map[int]byte{ - PatPid: 0, - PmtPid: 0, - VideoPid: 0, + patPid: 0, + pmtPid: 0, + videoPid: 0, }, } } +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) + const ( hasDTS = 0x1 hasPTS = 0x2 @@ -148,15 +161,18 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { - if e.psiCount <= 0 { + now := time.Now() + if now.Sub(e.psiLastTime) > psiInterval { err := e.writePSI() if err != nil { return err } + e.psiLastTime = now } + // Prepare PES data. pesPkt := pes.Packet{ - StreamID: StreamID, + StreamID: streamID, PDI: hasPTS, PTS: e.pts(), Data: nalu, @@ -168,10 +184,10 @@ func (e *Encoder) Encode(nalu []byte) error { for len(buf) != 0 { pkt := Packet{ PUSI: pusi, - PID: VideoPid, + PID: videoPid, RAI: pusi, - CC: e.ccFor(VideoPid), - AFC: HasAdaptationField | HasPayload, + CC: e.ccFor(videoPid), + AFC: hasAdaptationField | hasPayload, PCRF: pusi, } n := pkt.FillPayload(buf) @@ -184,7 +200,6 @@ func (e *Encoder) Encode(nalu []byte) error { pusi = false } _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) - e.psiCount-- if err != nil { return err } @@ -227,7 +242,6 @@ func (e *Encoder) writePSI() error { if err != nil { return err } - e.psiCount = psiSndCnt return nil } diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index e18213b5..08791af7 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -165,7 +165,7 @@ func FindPMT(d []byte) (p []byte, i int, err error) { } for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) - if pid == PmtPid { + if pid == pmtPid { p = d[i+4 : i+PacketSize] return }