diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 7e375992..1c2b9c90 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -50,6 +50,13 @@ const ( defaultLogVerbosity = logger.Debug ) +// Revid modes +const ( + normal = "Normal" + paused = "Paused" + burst = "Burst" +) + // Other misc consts const ( netSendRetryTime = 5 * time.Second @@ -72,23 +79,22 @@ func main() { cfg := handleFlags() if !*useNetsender { - // run revid for the specified duration - rv, _, err := startRevid(nil, cfg) + rv, err := revid.New(cfg, nil) if err != nil { + cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) + } + if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) - err = stopRevid(rv) - if err != nil { + if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } - err := run(nil, cfg) - if err != nil { + if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) - os.Exit(1) } } @@ -102,8 +108,6 @@ func handleFlags() revid.Config { inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") - output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp") - output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -126,6 +130,9 @@ func handleFlags() revid.Config { configFilePtr = flag.String("ConfigFile", "", "NetSender config file") ) + var outputs flagStrings + flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)") + flag.Parse() log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) @@ -167,40 +174,24 @@ func handleFlags() revid.Config { log.Log(logger.Error, pkg+"bad input codec argument") } - switch *output1Ptr { - case "File": - cfg.Output1 = revid.File - case "Http": - cfg.Output1 = revid.Http - case "Rtmp": - cfg.Output1 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp - case "Udp": - cfg.Output1 = revid.Udp - case "Rtp": - cfg.Output1 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 1 argument") - } - - switch *output2Ptr { - case "File": - cfg.Output2 = revid.File - case "Http": - cfg.Output2 = revid.Http - case "Rtmp": - cfg.Output2 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output2 = revid.FfmpegRtmp - case "Udp": - cfg.Output2 = revid.Udp - case "Rtp": - cfg.Output2 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 2 argument") + for _, o := range outputs { + switch o { + case "File": + cfg.Outputs = append(cfg.Outputs, revid.File) + case "Http": + cfg.Outputs = append(cfg.Outputs, revid.Http) + case "Rtmp": + cfg.Outputs = append(cfg.Outputs, revid.Rtmp) + case "FfmpegRtmp": + cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp) + case "Udp": + cfg.Outputs = append(cfg.Outputs, revid.Udp) + case "Rtp": + cfg.Outputs = append(cfg.Outputs, revid.Rtp) + case "": + default: + log.Log(logger.Error, pkg+"bad output argument", "arg", o) + } } switch *rtmpMethodPtr { @@ -259,66 +250,97 @@ func handleFlags() revid.Config { } // initialize then run the main NetSender client -func run(rv *revid.Revid, cfg revid.Config) error { - // initialize NetSender and use NetSender's logger - //config.Logger = netsender.Logger() +func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") + var vars map[string]string + + // initialize NetSender and use NetSender's logger var ns netsender.Sender err := ns.Init(log, nil, nil, nil) if err != nil { return err } - vars, _ := ns.Vars() + + vars, _ = ns.Vars() vs := ns.VarSum() - paused := false - if vars["mode"] == "Paused" { - paused = true + + rv, err := revid.New(cfg, &ns) + if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } - if !paused { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + + // Update revid to get latest config settings from netreceiver. + err = rv.Update(vars) + if err != nil { + return err + } + + // If mode on netreceiver isn't paused then we can start revid. + if ns.Mode() != paused && ns.Mode() != burst { + err = rv.Start() if err != nil { return err } } + if ns.Mode() == burst { + ns.SetMode(paused, &vs) + } + for { - if err := send(&ns, rv); err != nil { - log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) + // TODO(saxon): replace this call with call to ns.Run(). + err = send(&ns, rv) + if err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } - if vs != ns.VarSum() { - // vars changed - vars, err := ns.Vars() - if err != nil { - log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) - time.Sleep(netSendRetryTime) - continue - } - vs = ns.VarSum() - if vars["mode"] == "Paused" { - if !paused { - log.Log(logger.Info, pkg+"pausing revid") - err = stopRevid(rv) - if err != nil { - log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error()) - continue - } - paused = true - } - } else { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused) - if err != nil { - return err - } - if paused { - paused = false - } - } + // If var sum hasn't change we continue + if vs == ns.VarSum() { + goto sleep + } + + vars, err = ns.Vars() + if err != nil { + log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + vs = ns.VarSum() + + err = rv.Update(vars) + if err != nil { + return err + } + + switch ns.Mode() { + case paused: + case normal: + err = rv.Start() + if err != nil { + return err + } + case burst: + log.Log(logger.Info, pkg+"Starting burst...") + err = rv.Start() + if err != nil { + return err + } + time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) + log.Log(logger.Info, pkg+"Stopping burst...") + err = rv.Stop() + if err != nil { + return err + } + ns.SetMode(paused, &vs) + } + sleep: + sleepTime, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + return err } - sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } } @@ -346,131 +368,27 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -// wrappers for stopping and starting revid -func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { - rv, err := revid.New(cfg, ns) - if err != nil { - return nil, cfg, err +// flagStrings implements an appending string set flag. +type flagStrings []string + +func (v *flagStrings) String() string { + if *v != nil { + return strings.Join(*v, ",") } - err = rv.Start() - return rv, cfg, err + return "" } -func stopRevid(rv *revid.Revid) error { - err := rv.Stop() - if err != nil { - return err +func (v *flagStrings) Set(s string) error { + if s == "" { + return nil } - - // FIXME(kortschak): Is this waiting on completion of work? - // Use a wait group and Wait method if it is. - time.Sleep(revidStopTime) + for _, e := range *v { + if e == s { + return nil + } + } + *v = append(*v, s) return nil } -func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { - if stop { - err := stopRevid(rv) - if err != nil { - return nil, cfg, err - } - } - - //look through the vars and update revid where needed - for key, value := range vars { - switch key { - case "Output": - switch value { - case "File": - cfg.Output1 = revid.File - case "Http": - cfg.Output1 = revid.Http - case "Rtmp": - cfg.Output1 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp - default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue - } - case "FramesPerClip": - f, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) - break - } - cfg.FramesPerClip = uint(f) - case "RtmpUrl": - cfg.RtmpUrl = value - case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.Bitrate = uint(r) - case "OutputFileName": - cfg.OutputFileName = value - case "InputFileName": - cfg.InputFileName = value - case "Height": - h, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) - break - } - cfg.Height = uint(h) - case "Width": - w, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) - break - } - cfg.Width = uint(w) - case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.FrameRate = uint(r) - case "HttpAddress": - cfg.HttpAddress = value - case "Quantization": - q, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) - break - } - cfg.Quantization = uint(q) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) - break - } - cfg.IntraRefreshPeriod = uint(p) - case "HorizontalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipHorizontal = true - case "false": - cfg.FlipHorizontal = false - default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) - } - case "VerticalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipVertical = true - case "false": - cfg.FlipVertical = false - default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) - } - default: - } - } - - return startRevid(ns, cfg) -} +func (v *flagStrings) Get() interface{} { return *v } diff --git a/experimentation/flac/decode.go b/experimentation/flac/decode.go new file mode 100644 index 00000000..34d42057 --- /dev/null +++ b/experimentation/flac/decode.go @@ -0,0 +1,144 @@ +/* +NAME + decode.go + +DESCRIPTION + decode.go provides functionality for the decoding of FLAC compressed audio + +AUTHOR + Saxon Nelson-Milton + +LICENSE + decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package flac + +import ( + "bytes" + "errors" + "io" + + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/mewkiz/flac" +) + +const wavFormat = 1 + +// writeSeeker implements a memory based io.WriteSeeker. +type writeSeeker struct { + buf []byte + pos int +} + +// Bytes returns the bytes contained in the writeSeekers buffer. +func (ws *writeSeeker) Bytes() []byte { + return ws.buf +} + +// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number +// of bytes written. If less than len(p) bytes are written, an error is returned. +func (ws *writeSeeker) Write(p []byte) (n int, err error) { + minCap := ws.pos + len(p) + if minCap > cap(ws.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra + copy(buf2, ws.buf) + ws.buf = buf2 + } + if minCap > len(ws.buf) { + ws.buf = ws.buf[:minCap] + } + copy(ws.buf[ws.pos:], p) + ws.pos += len(p) + return len(p), nil +} + +// Seek sets the offset for the next Read or Write to offset, interpreted according +// to whence: SeekStart means relative to the start of the file, SeekCurrent means +// relative to the current offset, and SeekEnd means relative to the end. Seek returns +// the new offset relative to the start of the file and an error, if any. +func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = ws.pos + offs + case io.SeekEnd: + newPos = len(ws.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + ws.pos = newPos + return int64(newPos), nil +} + +// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding +// fails, an error is returned. +func Decode(buf []byte) ([]byte, error) { + + // Lex the FLAC into a stream to hold audio and it's properties. + r := bytes.NewReader(buf) + stream, err := flac.Parse(r) + if err != nil { + return nil, errors.New("Could not parse FLAC") + } + + // Create WAV encoder and pass writeSeeker that will store output WAV. + ws := &writeSeeker{} + sr := int(stream.Info.SampleRate) + bps := int(stream.Info.BitsPerSample) + nc := int(stream.Info.NChannels) + enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat) + defer enc.Close() + + // Decode FLAC into frames of samples + intBuf := &audio.IntBuffer{ + Format: &audio.Format{NumChannels: nc, SampleRate: sr}, + SourceBitDepth: bps, + } + return decodeFrames(stream, intBuf, enc, ws) +} + +// decodeFrames parses frames from the stream and encodes them into WAV until +// the end of the stream is reached. The bytes from writeSeeker buffer are then +// returned. If any errors occur during encodeing, nil bytes and the error is returned. +func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { + var data []int + for { + frame, err := s.ParseNext() + + // If we've reached the end of the stream then we can output the writeSeeker's buffer. + if err == io.EOF { + return ws.Bytes(), nil + } else if err != nil { + return nil, err + } + + // Encode WAV audio samples. + data = data[:0] + for i := 0; i < frame.Subframes[0].NSamples; i++ { + for _, subframe := range frame.Subframes { + data = append(data, int(subframe.Samples[i])) + } + } + intBuf.Data = data + if err := e.Write(intBuf); err != nil { + return nil, err + } + } +} diff --git a/experimentation/flac/flac_test.go b/experimentation/flac/flac_test.go new file mode 100644 index 00000000..1f8019e5 --- /dev/null +++ b/experimentation/flac/flac_test.go @@ -0,0 +1,121 @@ +/* +NAME + flac_test.go + +DESCRIPTION + flac_test.go provides utilities to test FLAC audio decoding + +AUTHOR + Saxon Nelson-Milton + +LICENSE + flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ +package flac + +import ( + "io" + "io/ioutil" + "os" + "testing" +) + +const ( + testFile = "../../../test/test-data/av/input/robot.flac" + outFile = "testOut.wav" +) + +// TestWriteSeekerWrite checks that basic writing to the ws works as expected. +func TestWriteSeekerWrite(t *testing.T) { + ws := &writeSeeker{} + + const tstStr1 = "hello" + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Write failed, got: %v, want: %v", got, tstStr1) + } + + const tstStr2 = " world" + const want = "hello world" + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want { + t.Errorf("Second write failed, got: %v, want: %v", got, want) + } +} + +// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we +// can write, then seek to a knew place in the buf, and write again, either replacing +// bytes, or appending bytes. +func TestWriteSeekerSeek(t *testing.T) { + ws := &writeSeeker{} + + const tstStr1 = "hello" + want1 := tstStr1 + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want1) + } + + const tstStr2 = " world" + const want2 = tstStr1 + tstStr2 + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want2 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want2) + } + + const tstStr3 = "k!" + const want3 = "hello work!" + ws.Seek(-2, io.SeekEnd) + ws.Write([]byte(tstStr3)) + got = string(ws.buf) + if got != want3 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want3) + } + + const tstStr4 = "gopher" + const want4 = "hello gopher" + ws.Seek(6, io.SeekStart) + ws.Write([]byte(tstStr4)) + got = string(ws.buf) + if got != want4 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want4) + } +} + +// TestDecodeFlac checks that we can load a flac file and decode to wav, writing +// to a wav file. +func TestDecodeFlac(t *testing.T) { + b, err := ioutil.ReadFile(testFile) + if err != nil { + t.Fatalf("Could not read test file, failed with err: %v", err.Error()) + } + out, err := Decode(b) + if err != nil { + t.Errorf("Could not decode, failed with err: %v", err.Error()) + } + f, err := os.Create(outFile) + if err != nil { + t.Fatalf("Could not create output file, failed with err: %v", err.Error()) + } + _, err = f.Write(out) + if err != nil { + t.Fatalf("Could not write to output file, failed with err: %v", err.Error()) + } +} diff --git a/revid/config.go b/revid/config.go index 304d3e64..b0ba2bc4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -40,8 +40,7 @@ type Config struct { Input uint8 InputCodec uint8 - Output1 uint8 - Output2 uint8 + Outputs []uint8 RtmpMethod uint8 Packetization uint8 @@ -69,6 +68,7 @@ type Config struct { RtpAddress string Logger Logger SendRetry bool + BurstPeriod uint } // Enums for config struct @@ -115,6 +115,7 @@ const ( defaultInputCodec = H264 defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds ) // Validate checks for any errors in the config fields and defaults settings @@ -172,45 +173,38 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output1 { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output1 = Http - break + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", + "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + case NothingDefined: + c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", + defaultOutput) + c.Outputs[i] = defaultOutput + fallthrough + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", + "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - case NothingDefined: - c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Output1 = defaultOutput - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - default: - return errors.New("bad output type defined in config") } - switch c.Output2 { - case File: - case Rtp: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output2 = Http - break - } - case NothingDefined: - case Http: - default: - return errors.New("bad output2 type defined in config") + if c.BurstPeriod == 0 { + c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) + c.BurstPeriod = defaultBurstPeriod } if c.FramesPerClip < 1 { diff --git a/revid/revid.go b/revid/revid.go index ea1c0adc..02656d03 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -118,8 +119,12 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. + mu sync.Mutex isRunning bool + + wg sync.WaitGroup + + err chan error } // packer takes data segments and packs them into clips @@ -157,8 +162,15 @@ func (p *packer) Write(frame []byte) (int, error) { return n, err } p.packetCount++ + var hasRtmp bool + for _, d := range p.owner.config.Outputs { + if d == Rtmp { + hasRtmp = true + break + } + } now := time.Now() - if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { + if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { p.owner.buffer.Flush() p.packetCount = 0 p.lastTime = now @@ -169,16 +181,35 @@ func (p *packer) Write(frame []byte) (int, error) { // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - r := Revid{ns: ns} + r := Revid{ns: ns, err: make(chan error)} r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } + go r.handleErrors() return &r, nil } +// TODO(Saxon): put more thought into error severity. +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + err = r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) + } + } + } +} + // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate @@ -203,40 +234,35 @@ func (r *Revid) reset(config Config) error { } } - n := 1 - if r.config.Output2 != 0 && r.config.Output2 != Rtp { - n = 2 - } - r.destination = make([]loadSender, n) - - for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} { - switch outType { + r.destination = r.destination[:0] + for _, typ := range r.config.Outputs { + switch typ { case File: s, err := newFileSender(config.OutputFileName) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Http: - r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtp: r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { @@ -293,21 +319,40 @@ func (r *Revid) reset(config Config) error { return nil } -// IsRunning returns whether the receiver is running. +// IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret +} + +func (r *Revid) Config() Config { + r.mu.Lock() + cfg := r.config + r.mu.Unlock() + return cfg +} + +// setIsRunning sets r.isRunning using b. +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -316,28 +361,145 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() + return nil +} + +func (r *Revid) Update(vars map[string]string) error { + if r.IsRunning() { + if err := r.Stop(); err != nil { + return err + } + } + //look through the vars and update revid where needed + for key, value := range vars { + switch key { + case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! + switch value { + case "File": + r.config.Outputs[0] = File + case "Http": + r.config.Outputs[0] = Http + case "Rtmp": + r.config.Outputs[0] = Rtmp + case "FfmpegRtmp": + r.config.Outputs[0] = FfmpegRtmp + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + continue + } + case "FramesPerClip": + f, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + break + } + r.config.FramesPerClip = uint(f) + case "RtmpUrl": + r.config.RtmpUrl = value + case "Bitrate": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + r.config.Bitrate = uint(v) + case "OutputFileName": + r.config.OutputFileName = value + case "InputFileName": + r.config.InputFileName = value + case "Height": + h, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) + break + } + r.config.Height = uint(h) + case "Width": + w, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) + break + } + r.config.Width = uint(w) + case "FrameRate": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + r.config.FrameRate = uint(v) + case "HttpAddress": + r.config.HttpAddress = value + case "Quantization": + q, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + break + } + r.config.Quantization = uint(q) + case "IntraRefreshPeriod": + p, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + break + } + r.config.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": + switch strings.ToLower(value) { + case "true": + r.config.FlipHorizontal = true + case "false": + r.config.FlipHorizontal = false + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + } + case "VerticalFlip": + switch strings.ToLower(value) { + case "true": + r.config.FlipVertical = true + case "false": + r.config.FlipVertical = false + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + } + case "BurstPeriod": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) + break + } + r.config.BurstPeriod = uint(v) + } + } + return nil } // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { + defer r.wg.Done() lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -381,7 +543,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -472,11 +634,9 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + r.wg.Add(1) + go r.processFrom(stdout, 0) + return nil } func (r *Revid) startV4L() error { @@ -512,7 +672,6 @@ func (r *Revid) startV4L() error { r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) - delay := time.Second / time.Duration(r.config.FrameRate) stdout, err := r.cmd.StdoutPipe() if err != nil { return err @@ -524,15 +683,13 @@ func (r *Revid) startV4L() error { return err } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + r.wg.Add(1) + go r.processFrom(stdout, time.Duration(0)) + return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { - delay := time.Second / time.Duration(r.config.FrameRate) f, err := os.Open(r.config.InputFileName) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) @@ -542,5 +699,14 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.lexTo(r.encoder, f, delay) + r.wg.Add(1) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) + return nil +} + +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { + r.config.Logger.Log(logger.Info, pkg+"reading input data") + r.err <- r.lexTo(r.encoder, read, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() } diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go index 837531bb..7ccfd932 100644 --- a/rtmp/amf/amf.go +++ b/rtmp/amf/amf.go @@ -51,7 +51,7 @@ import ( const ( typeNumber = 0x00 typeBoolean = 0x01 - typeString = 0x02 + TypeString = 0x02 TypeObject = 0x03 typeMovieClip = 0x04 TypeNull = 0x05 @@ -93,7 +93,7 @@ type Property struct { var ( ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. - ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ) @@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) { } // EncodeString encodes a string. +// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString. func EncodeString(buf []byte, val string) ([]byte, error) { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { @@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) { } if len(val) < 65536 { - buf[0] = typeString + buf[0] = TypeString buf = buf[1:] binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) buf = buf[2:] @@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { return EncodeNumber(buf, prop.Number) case typeBoolean: return EncodeBoolean(buf, prop.Number != 0) - case typeString: + case TypeString: return EncodeString(buf, prop.String) case TypeNull: if len(buf) < 2 { @@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { prop.Number = float64(buf[0]) buf = buf[1:] - case typeString: + case TypeString: n := DecodeInt16(buf[:2]) if len(buf) < int(n+2) { return 0, ErrShortBuffer @@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { } // Encode encodes an Object into its AMF representation. -// This is the top-level encoding function and is typically the only function callers will need to use. func Encode(obj *Object, buf []byte) ([]byte, error) { if len(buf) < 5 { return nil, ErrShortBuffer @@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) { // StringProperty is a wrapper for Property that returns a String property's value, if any. func (obj *Object) StringProperty(name string, idx int) (string, error) { - prop, err := obj.Property(name, idx, typeString) + prop, err := obj.Property(name, idx, TypeString) if err != nil { return "", err } diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go index 59548c09..957e1c7e 100644 --- a/rtmp/amf/amf_test.go +++ b/rtmp/amf/amf_test.go @@ -58,7 +58,7 @@ func TestSanity(t *testing.T) { // TestStrings tests string encoding and decoding. func TestStrings(t *testing.T) { // Short string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:3] = size // enc[3:] = data for _, s := range testStrings { @@ -67,8 +67,8 @@ func TestStrings(t *testing.T) { if err != nil { t.Errorf("EncodeString failed") } - if buf[0] != typeString { - t.Errorf("Expected typeString, got %v", buf[0]) + if buf[0] != TypeString { + t.Errorf("Expected TypeString, got %v", buf[0]) } ds := DecodeString(buf[1:]) if s != ds { @@ -76,7 +76,7 @@ func TestStrings(t *testing.T) { } } // Long string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:5] = size // enc[5:] = data s := string(make([]byte, 65536)) @@ -148,7 +148,7 @@ func TestProperties(t *testing.T) { // Encode/decode string properties. enc = buf[:] for i := range testStrings { - enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc) if err != nil { t.Errorf("EncodeProperty of string failed") } @@ -235,7 +235,7 @@ func TestObject(t *testing.T) { // Construct a more complicated object that includes a nested object. var obj2 Object for i := range testStrings { - obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) } obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) diff --git a/rtmp/conn.go b/rtmp/conn.go index 2554d092..7e1b3b15 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -76,7 +76,7 @@ type link struct { protocol int32 timeout uint port uint16 - conn *net.TCPConn + conn net.Conn } // method represents an RTMP method. @@ -121,20 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.app == "" { - return nil, errInvalidURL - } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.protocol |= featureWrite diff --git a/rtmp/packet.go b/rtmp/packet.go index 3cf18f14..b28a1cba 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -81,7 +81,7 @@ const ( // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} -// packet defines an RTMP packet. +// packet represents an RTMP packet. type packet struct { headerType uint8 packetType uint8 @@ -90,7 +90,6 @@ type packet struct { timestamp uint32 streamID uint32 bodySize uint32 - bytesRead uint32 buf []byte body []byte } @@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) - pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] @@ -201,25 +199,18 @@ func (pkt *packet) readFrom(c *Conn) error { hSize += 4 } - if pkt.bodySize > 0 && pkt.body == nil { - pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) + pkt.resize(pkt.bodySize, pkt.headerType) + + if pkt.bodySize > c.inChunkSize { + c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) } - toRead := pkt.bodySize - pkt.bytesRead - chunkSize := c.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead - } - - _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[:pkt.bodySize]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } - pkt.bytesRead += uint32(chunkSize) - // Keep the packet as a reference for other packets on this channel. if c.channelsIn[pkt.channel] == nil { c.channelsIn[pkt.channel] = &packet{} @@ -237,15 +228,16 @@ func (pkt *packet) readFrom(c *Conn) error { c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelsIn[pkt.channel].body = nil - c.channelsIn[pkt.channel].bytesRead = 0 c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } -// resize adjusts the packet's storage to accommodate a body of the given size and header type. +// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - pkt.buf = make([]byte, fullHeaderSize+size) + if cap(pkt.buf) < fullHeaderSize+int(size) { + pkt.buf = make([]byte, fullHeaderSize+size) + } pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht @@ -407,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { return nil } } else { - // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + // Send previously deferred packet if combining it with the next one would exceed the chunk size. if len(c.deferred)+size+hSize > chunkSize { c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) _, err := c.write(c.deferred) @@ -419,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) for size+hSize != 0 { if chunkSize > size { chunkSize = size diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index eae4277e..4fea7d82 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -41,7 +41,6 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). -// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { @@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } elems := strings.SplitN(u.Path[1:], "/", 3) app = elems[0] + if app == "" { + return protocol, host, port, app, playpath, errInvalidURL + } playpath = elems[1] if len(elems) == 3 && len(elems[2]) != 0 { playpath = path.Join(elems[1:]...) @@ -97,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } + switch { + case port != 0: + case (protocol & featureSSL) != 0: + return protocol, host, port, app, playpath, errUnimplemented // port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 + } + return protocol, host, port, app, playpath, nil } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index bfee3e37..6f666785 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -174,6 +174,13 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"connected") + + defer func() { + if err != nil { + c.link.conn.Close() + } + }() + err = handshake(c) if err != nil { c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) @@ -185,12 +192,14 @@ func connect(c *Conn) error { c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } + c.log(DebugLevel, pkg+"negotiating") + var buf [256]byte for !c.isPlaying { - pkt := packet{} + pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { - break + return err } switch pkt.packetType { @@ -199,14 +208,10 @@ func connect(c *Conn) error { default: err = handlePacket(c, &pkt) if err != nil { - break + return err } } } - - if !c.isPlaying { - return err - } return nil } @@ -276,26 +281,18 @@ func sendConnectPacket(c *Conn) error { return err } - enc[0] = amf.TypeObject - enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) - if err != nil { - return err + // required link info + info := amf.Object{Properties: []amf.Property{ + amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app}, + amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate}, + amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}}, } - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } - enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) - if err != nil { - return err - } - enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + enc, err = amf.Encode(&info, enc) if err != nil { return err } - // add auth string, if any + // optional link auth info if c.link.auth != "" { enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 618f2b95..be4908a6 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) { } defer f.Close() + rs := &rtmpSender{conn: c} // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(c, true, true, 25) + flvEncoder, err := flv.NewEncoder(rs, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) } diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index decc8558..02761b91 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -122,7 +122,7 @@ var ( ) const ( - psiSndCnt = 7 + psiInterval = 1 * time.Second ) // timeLocation holds time and location data @@ -199,9 +199,9 @@ type Encoder struct { tsSpace [PacketSize]byte pesSpace [pes.MaxPesSize]byte - psiCount int - continuity map[int]byte + + psiLastTime time.Time } // NewEncoder returns an Encoder with the specified frame rate. @@ -233,6 +233,15 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { + now := time.Now() + if now.Sub(e.psiLastTime) > psiInterval { + err := e.writePSI() + if err != nil { + return err + } + e.psiLastTime = now + } + // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -262,13 +271,6 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } - if e.psiCount <= 0 { - err := e.writePSI() - if err != nil { - return err - } - } - e.psiCount-- _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) if err != nil { return err @@ -318,7 +320,6 @@ func (e *Encoder) writePSI() error { if err != nil { return err } - e.psiCount = psiSndCnt return nil } diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index e48f17f0..0bef80d2 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -128,19 +128,19 @@ type Packet struct { } // FindPMT will take a clip of mpegts and try to find a PMT table - if one -// is found, then it is returned, otherwise nil and an error is returned. -func FindPMT(d []byte) (p []byte, err error) { +// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned. +func FindPMT(d []byte) (p []byte, i int, err error) { if len(d) < PacketSize { - return nil, errors.New("Mmpegts data not of valid length") + return nil, -1, errors.New("Mmpegts data not of valid length") } - for i := 0; i < len(d); i += PacketSize { + for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) if pid == pmtPid { p = d[i+4 : i+PacketSize] return } } - return nil, errors.New("Could not find pmt table in mpegts data") + return nil, -1, errors.New("Could not find pmt table in mpegts data") } // FillPayload takes a channel and fills the packets Payload field until the