From 92ba9c89a275868e0bc295cd1dcca1a6a526cb70 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 19 Jan 2019 19:39:43 +1030 Subject: [PATCH 01/58] cmd/revid-cli,revid: use a slice for output destination flags --- cmd/revid-cli/main.go | 94 +++++++++++++--------- revid/cmd/h264-file-to-flv-rtmp/main.go | 2 +- revid/cmd/h264-file-to-mpgets-file/main.go | 2 +- revid/config.go | 67 +++++++-------- revid/revid.go | 18 +++-- 5 files changed, 93 insertions(+), 90 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 7e375992..5b826b91 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -102,8 +102,6 @@ func handleFlags() revid.Config { inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") - output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp") - output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -126,6 +124,9 @@ func handleFlags() revid.Config { configFilePtr = flag.String("ConfigFile", "", "NetSender config file") ) + var outputs flagStrings + flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)") + flag.Parse() log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) @@ -167,40 +168,24 @@ func handleFlags() revid.Config { log.Log(logger.Error, pkg+"bad input codec argument") } - switch *output1Ptr { - case "File": - cfg.Output1 = revid.File - case "Http": - cfg.Output1 = revid.Http - case "Rtmp": - cfg.Output1 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp - case "Udp": - cfg.Output1 = revid.Udp - case "Rtp": - cfg.Output1 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 1 argument") - } - - switch *output2Ptr { - case "File": - cfg.Output2 = revid.File - case "Http": - cfg.Output2 = revid.Http - case "Rtmp": - cfg.Output2 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output2 = revid.FfmpegRtmp - case "Udp": - cfg.Output2 = revid.Udp - case "Rtp": - cfg.Output2 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 2 argument") + for _, o := range outputs { + switch o { + case "File": + cfg.Outputs = append(cfg.Outputs, revid.File) + case "Http": + cfg.Outputs = append(cfg.Outputs, revid.Http) + case "Rtmp": + cfg.Outputs = append(cfg.Outputs, revid.Rtmp) + case "FfmpegRtmp": + cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp) + case "Udp": + cfg.Outputs = append(cfg.Outputs, revid.Udp) + case "Rtp": + cfg.Outputs = append(cfg.Outputs, revid.Rtp) + case "": + default: + log.Log(logger.Error, pkg+"bad output argument", "arg", o) + } } switch *rtmpMethodPtr { @@ -380,15 +365,19 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m for key, value := range vars { switch key { case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Output1 = revid.File + cfg.Outputs[0] = revid.File case "Http": - cfg.Output1 = revid.Http + cfg.Outputs[0] = revid.Http case "Rtmp": - cfg.Output1 = revid.Rtmp + cfg.Outputs[0] = revid.Rtmp case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp + cfg.Outputs[0] = revid.FfmpegRtmp default: log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue @@ -474,3 +463,28 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m return startRevid(ns, cfg) } + +// flagStrings implements an appending string set flag. +type flagStrings []string + +func (v *flagStrings) String() string { + if *v != nil { + return strings.Join(*v, ",") + } + return "" +} + +func (v *flagStrings) Set(s string) error { + if s == "" { + return nil + } + for _, e := range *v { + if e == s { + return nil + } + } + *v = append(*v, s) + return nil +} + +func (v *flagStrings) Get() interface{} { return *v } diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go index 5f79df4a..4f7c9d7c 100644 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ b/revid/cmd/h264-file-to-flv-rtmp/main.go @@ -58,7 +58,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.Rtmp, + Outputs: []byte{revid.Rtmp}, RtmpMethod: revid.LibRtmp, RtmpUrl: *rtmpUrlPtr, Packetization: revid.Flv, diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go index 03a39fde..768f560b 100644 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ b/revid/cmd/h264-file-to-mpgets-file/main.go @@ -50,7 +50,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.File, + Outputs: []byte{revid.File}, OutputFileName: outputFile, Packetization: revid.Mpegts, Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), diff --git a/revid/config.go b/revid/config.go index 304d3e64..dc9c5a8d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -40,8 +40,7 @@ type Config struct { Input uint8 InputCodec uint8 - Output1 uint8 - Output2 uint8 + Outputs []uint8 RtmpMethod uint8 Packetization uint8 @@ -172,45 +171,33 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output1 { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output1 = Http - break + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", + "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + case NothingDefined: + c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", + defaultOutput) + c.Outputs[i] = defaultOutput + fallthrough + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", + "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - case NothingDefined: - c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Output1 = defaultOutput - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - default: - return errors.New("bad output type defined in config") - } - - switch c.Output2 { - case File: - case Rtp: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output2 = Http - break - } - case NothingDefined: - case Http: - default: - return errors.New("bad output2 type defined in config") } if c.FramesPerClip < 1 { diff --git a/revid/revid.go b/revid/revid.go index ea1c0adc..a15fab3a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -157,8 +157,15 @@ func (p *packer) Write(frame []byte) (int, error) { return n, err } p.packetCount++ + var hasRtmp bool + for _, d := range p.owner.config.Outputs { + if d == Rtmp { + hasRtmp = true + break + } + } now := time.Now() - if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { + if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { p.owner.buffer.Flush() p.packetCount = 0 p.lastTime = now @@ -203,13 +210,8 @@ func (r *Revid) reset(config Config) error { } } - n := 1 - if r.config.Output2 != 0 && r.config.Output2 != Rtp { - n = 2 - } - r.destination = make([]loadSender, n) - - for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} { + r.destination = make([]loadSender, len(r.config.Outputs)) + for outNo, outType := range r.config.Outputs { switch outType { case File: s, err := newFileSender(config.OutputFileName) From adfb87dcf972c9933a06b70c50ee73af8f3886d1 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:30:24 +1030 Subject: [PATCH 02/58] TypeString now exported (was typeString). --- rtmp/amf/amf.go | 14 +++++++------- rtmp/amf/amf_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go index 837531bb..7ccfd932 100644 --- a/rtmp/amf/amf.go +++ b/rtmp/amf/amf.go @@ -51,7 +51,7 @@ import ( const ( typeNumber = 0x00 typeBoolean = 0x01 - typeString = 0x02 + TypeString = 0x02 TypeObject = 0x03 typeMovieClip = 0x04 TypeNull = 0x05 @@ -93,7 +93,7 @@ type Property struct { var ( ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. - ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ) @@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) { } // EncodeString encodes a string. +// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString. func EncodeString(buf []byte, val string) ([]byte, error) { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { @@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) { } if len(val) < 65536 { - buf[0] = typeString + buf[0] = TypeString buf = buf[1:] binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) buf = buf[2:] @@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { return EncodeNumber(buf, prop.Number) case typeBoolean: return EncodeBoolean(buf, prop.Number != 0) - case typeString: + case TypeString: return EncodeString(buf, prop.String) case TypeNull: if len(buf) < 2 { @@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { prop.Number = float64(buf[0]) buf = buf[1:] - case typeString: + case TypeString: n := DecodeInt16(buf[:2]) if len(buf) < int(n+2) { return 0, ErrShortBuffer @@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { } // Encode encodes an Object into its AMF representation. -// This is the top-level encoding function and is typically the only function callers will need to use. func Encode(obj *Object, buf []byte) ([]byte, error) { if len(buf) < 5 { return nil, ErrShortBuffer @@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) { // StringProperty is a wrapper for Property that returns a String property's value, if any. func (obj *Object) StringProperty(name string, idx int) (string, error) { - prop, err := obj.Property(name, idx, typeString) + prop, err := obj.Property(name, idx, TypeString) if err != nil { return "", err } diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go index 59548c09..957e1c7e 100644 --- a/rtmp/amf/amf_test.go +++ b/rtmp/amf/amf_test.go @@ -58,7 +58,7 @@ func TestSanity(t *testing.T) { // TestStrings tests string encoding and decoding. func TestStrings(t *testing.T) { // Short string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:3] = size // enc[3:] = data for _, s := range testStrings { @@ -67,8 +67,8 @@ func TestStrings(t *testing.T) { if err != nil { t.Errorf("EncodeString failed") } - if buf[0] != typeString { - t.Errorf("Expected typeString, got %v", buf[0]) + if buf[0] != TypeString { + t.Errorf("Expected TypeString, got %v", buf[0]) } ds := DecodeString(buf[1:]) if s != ds { @@ -76,7 +76,7 @@ func TestStrings(t *testing.T) { } } // Long string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:5] = size // enc[5:] = data s := string(make([]byte, 65536)) @@ -148,7 +148,7 @@ func TestProperties(t *testing.T) { // Encode/decode string properties. enc = buf[:] for i := range testStrings { - enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc) if err != nil { t.Errorf("EncodeProperty of string failed") } @@ -235,7 +235,7 @@ func TestObject(t *testing.T) { // Construct a more complicated object that includes a nested object. var obj2 Object for i := range testStrings { - obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) } obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) From 42be87d98a26684b2a40e51fadded6e3bd705587 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:33:43 +1030 Subject: [PATCH 03/58] sendConnectPacket() now encodes required link info in one go using amf.Encode(). --- rtmp/rtmp.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index bfee3e37..1175e8ce 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -276,26 +276,18 @@ func sendConnectPacket(c *Conn) error { return err } - enc[0] = amf.TypeObject - enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) - if err != nil { - return err + // required link info + info := amf.Object{Properties: []amf.Property{ + amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app}, + amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate}, + amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}}, } - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } - enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) - if err != nil { - return err - } - enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + enc, err = amf.Encode(&info, enc) if err != nil { return err } - // add auth string, if any + // optional link auth info if c.link.auth != "" { enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { From 828cc3780a9005282a5bbaa7a309747870104994 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:56:55 +1030 Subject: [PATCH 04/58] Removed superfluous packet.bytesRead. --- rtmp/packet.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 3cf18f14..642e6194 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -81,7 +81,7 @@ const ( // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} -// packet defines an RTMP packet. +// packet represents an RTMP packet. type packet struct { headerType uint8 packetType uint8 @@ -90,7 +90,6 @@ type packet struct { timestamp uint32 streamID uint32 bodySize uint32 - bytesRead uint32 buf []byte body []byte } @@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) - pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] @@ -205,21 +203,17 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - toRead := pkt.bodySize - pkt.bytesRead - chunkSize := c.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead + sz := c.inChunkSize + if pkt.bodySize < sz { + sz = pkt.bodySize } - _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[:sz]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } - pkt.bytesRead += uint32(chunkSize) - // Keep the packet as a reference for other packets on this channel. if c.channelsIn[pkt.channel] == nil { c.channelsIn[pkt.channel] = &packet{} @@ -237,7 +231,6 @@ func (pkt *packet) readFrom(c *Conn) error { c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelsIn[pkt.channel].body = nil - c.channelsIn[pkt.channel].bytesRead = 0 c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } From ddd1e4ab1789fe5a66577f3741e80bfbd393155b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:09:49 +1030 Subject: [PATCH 05/58] readFrom() now logs a warning if it is reading a large packet (which I suspect never happens). --- rtmp/packet.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 642e6194..0f3ae241 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -203,12 +203,11 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - sz := c.inChunkSize - if pkt.bodySize < sz { - sz = pkt.bodySize + if pkt.bodySize > c.inChunkSize { + c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) } - _, err = c.read(pkt.body[:sz]) + _, err = c.read(pkt.body[:pkt.bodySize]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err From 969e2f4fa940719e14924faac0ec358066d7fb46 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:41:32 +1030 Subject: [PATCH 06/58] packet.resize() now only makes a new buf when necessary. --- rtmp/packet.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 0f3ae241..a92f7022 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -199,9 +199,7 @@ func (pkt *packet) readFrom(c *Conn) error { hSize += 4 } - if pkt.bodySize > 0 && pkt.body == nil { - pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) - } + pkt.resize(pkt.bodySize, pkt.headerType) if pkt.bodySize > c.inChunkSize { c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) @@ -234,10 +232,12 @@ func (pkt *packet) readFrom(c *Conn) error { return nil } -// resize adjusts the packet's storage to accommodate a body of the given size and header type. +// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - pkt.buf = make([]byte, fullHeaderSize+size) + if cap(pkt.buf) < fullHeaderSize+int(size) { + pkt.buf = make([]byte, fullHeaderSize+size) + } pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht From fbad21bc76af71deca65e924229c6ef643130064 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:52:58 +1030 Subject: [PATCH 07/58] connect() now constructs its packet with a buf to avoid needless packet resizing later. --- rtmp/rtmp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1175e8ce..a6897eef 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -186,8 +186,10 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"negotiating") + + var buf [256]byte for !c.isPlaying { - pkt := packet{} + pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { break From 04ad1fec257ea38e9ad03eeeb0c17f6d16631080 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:20:56 +1030 Subject: [PATCH 08/58] Log packet size before la and ra. --- rtmp/packet.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index a92f7022..b28a1cba 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -399,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { return nil } } else { - // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + // Send previously deferred packet if combining it with the next one would exceed the chunk size. if len(c.deferred)+size+hSize > chunkSize { c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) _, err := c.write(c.deferred) @@ -411,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) for size+hSize != 0 { if chunkSize > size { chunkSize = size From a362d1d2abb6b02298b572ae793d7e10ab016a52 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:50:51 +1030 Subject: [PATCH 09/58] connect() now closes c.link.conn upon an error. --- rtmp/rtmp.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index a6897eef..3352558a 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -174,6 +174,13 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"connected") + + defer func() { + if err != nil { + c.link.conn.Close() + } + }() + err = handshake(c) if err != nil { c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) From efe40a6778e563f9e01447b8ebe015c362f48927 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:59:28 +1030 Subject: [PATCH 10/58] Move test for empty RTMP 'app' into parseURL(). --- rtmp/conn.go | 3 --- rtmp/parseurl.go | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 2554d092..1db2cb6e 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,9 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.app == "" { - return nil, errInvalidURL - } if c.link.port == 0 { switch { case (c.link.protocol & featureSSL) != 0: diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index eae4277e..ad3409b6 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -41,7 +41,6 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). -// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { @@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } elems := strings.SplitN(u.Path[1:], "/", 3) app = elems[0] + if app == "" { + return protocol, host, port, app, playpath, errInvalidURL + } playpath = elems[1] if len(elems) == 3 && len(elems[2]) != 0 { playpath = path.Join(elems[1:]...) From 89d9bf3eef7b91ae9d98f1e0bbafd5b0aa125926 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 10:28:05 +1030 Subject: [PATCH 11/58] Return errors from the loop in connect() rather than breaking. --- rtmp/rtmp.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 3352558a..6f666785 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -192,14 +192,14 @@ func connect(c *Conn) error { c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } - c.log(DebugLevel, pkg+"negotiating") + c.log(DebugLevel, pkg+"negotiating") var buf [256]byte for !c.isPlaying { pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { - break + return err } switch pkt.packetType { @@ -208,14 +208,10 @@ func connect(c *Conn) error { default: err = handlePacket(c, &pkt) if err != nil { - break + return err } } } - - if !c.isPlaying { - return err - } return nil } From 0105c76dc5e029d47f0d858805bfcbd12569bb2a Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 13:33:44 +1030 Subject: [PATCH 12/58] Move port defaulting logic to parseURL(). --- rtmp/conn.go | 11 ----------- rtmp/parseurl.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 1db2cb6e..3864df98 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,17 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.protocol |= featureWrite diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index ad3409b6..c2880368 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } + if port == 0 { + switch { + case (protocol & featureSSL) != 0: + port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 + } + } return protocol, host, port, app, playpath, nil } From 3a70dc6ddc447a8549a1c086aa658aa08fdfafc6 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:38:27 +1030 Subject: [PATCH 13/58] parseURL() now returns errUnimplemented for SSL. --- rtmp/parseurl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index c2880368..cc197fb9 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -103,6 +103,7 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp switch { case (protocol & featureSSL) != 0: port = 433 + return errUnimplemented case (protocol & featureHTTP) != 0: port = 80 default: From 2697dcf51520bb1b33ceb962e0d66f083aeca6e7 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:57:40 +1030 Subject: [PATCH 14/58] More elegant switch in parseURL() for port logic. --- rtmp/parseurl.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index cc197fb9..4fea7d82 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,16 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } - if port == 0 { - switch { - case (protocol & featureSSL) != 0: - port = 433 - return errUnimplemented - case (protocol & featureHTTP) != 0: - port = 80 - default: - port = 1935 - } + switch { + case port != 0: + case (protocol & featureSSL) != 0: + return protocol, host, port, app, playpath, errUnimplemented // port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 } + return protocol, host, port, app, playpath, nil } From 56422a366e7b4961780777f845c56631648ae0c6 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 14:57:40 +1030 Subject: [PATCH 15/58] av/stream/flac: added decode.go and flac_test.go --- stream/flac/decode.go | 0 stream/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 stream/flac/decode.go create mode 100644 stream/flac/flac_test.go diff --git a/stream/flac/decode.go b/stream/flac/decode.go new file mode 100644 index 00000000..e69de29b diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go new file mode 100644 index 00000000..e69de29b From 44c79e225673f3f7fd74a420273d23f94227ec91 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 15:41:49 +1030 Subject: [PATCH 16/58] av/stream/flac: wrote decode function and test to see if we can get wav. --- stream/flac/decode.go | 108 +++++++++++++++++++++++++++++++++++++++ stream/flac/flac_test.go | 31 +++++++++++ 2 files changed, 139 insertions(+) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index e69de29b..2ecfb737 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -0,0 +1,108 @@ +package flac + +import ( + "bytes" + "errors" + "io" + + "github.com/go-audio/audio" + "github.com/go-audio/wav" + "github.com/mewkiz/flac" +) + +const wavAudioFormat = 1 + +type buffer struct { + Buffer bytes.Buffer + Index int64 +} + +func (b *buffer) Bytes() []byte { + return b.Buffer.Bytes() +} + +func (b *buffer) Read(p []byte) (int, error) { + n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) + + if err == nil { + if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { + b.Index += int64(len(p)) + } else { + b.Index = int64(b.Buffer.Len()) + } + } + + return n, err +} + +func (b *buffer) Write(p []byte) (int, error) { + n, err := b.Buffer.Write(p) + + if err == nil { + b.Index = int64(b.Buffer.Len()) + } + + return n, err +} + +func (b *buffer) Seek(offset int64, whence int) (int64, error) { + var err error + var Index int64 = 0 + + switch whence { + case 0: + if offset >= int64(b.Buffer.Len()) || offset < 0 { + err = errors.New("Invalid Offset.") + } else { + b.Index = offset + Index = offset + } + default: + err = errors.New("Unsupported Seek Method.") + } + + return Index, err +} + +// Decode takes a slice of flac and decodes to wav +func Decode(buf []byte) ([]byte, error) { + r := bytes.NewReader(buf) + stream, err := flac.Parse(r) + if err != nil { + return nil, errors.New("Could not parse FLAC") + } + fb := &buffer{} + enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + defer enc.Close() + var data []int + for { + // Decode FLAC audio samples. + frame, err := stream.ParseNext() + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Encode WAV audio samples. + data = data[:0] + for i := 0; i < frame.Subframes[0].NSamples; i++ { + for _, subframe := range frame.Subframes { + data = append(data, int(subframe.Samples[i])) + } + } + buf := &audio.IntBuffer{ + Format: &audio.Format{ + NumChannels: int(stream.Info.NChannels), + SampleRate: int(stream.Info.SampleRate), + }, + Data: data, + SourceBitDepth: int(stream.Info.BitsPerSample), + } + if err := enc.Write(buf); err != nil { + return nil, err + } + } + return fb.Bytes(), nil +} diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index e69de29b..763731d4 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -0,0 +1,31 @@ +package flac + +import ( + "io/ioutil" + "os" + "testing" +) + +const ( + testFile = "/home/saxon/Desktop/robot.flac" + outFile = "out.wav" +) + +func TestDecodeFlac(t *testing.T) { + b, err := ioutil.ReadFile(testFile) + if err != nil { + t.Fatalf("Could not read test file, failed with err: %v", err.Error()) + } + out, err := Decode(b) + if err != nil { + t.Errorf("Could not decode, failed with err: %v", err.Error()) + } + f, err := os.Create(outFile) + if err != nil { + t.Fatalf("Could not create output file, failed with err: %v", err.Error()) + } + _, err = f.Write(out) + if err != nil { + t.Fatalf("Could not write to output file, failed with err: %v", err.Error()) + } +} From 6fda0b3c3fe91515c792cac3e29124090f7dcf18 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:34:15 +1030 Subject: [PATCH 17/58] av/stream/flac: using writerseeker to pass to wav.NewEncoder because I don't want to give it a file, but it's not working --- stream/flac/decode.go | 68 ++++++++-------------------------------- stream/flac/flac_test.go | 2 +- 2 files changed, 14 insertions(+), 56 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 2ecfb737..941610e2 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,66 +4,16 @@ import ( "bytes" "errors" "io" + "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" + "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 -type buffer struct { - Buffer bytes.Buffer - Index int64 -} - -func (b *buffer) Bytes() []byte { - return b.Buffer.Bytes() -} - -func (b *buffer) Read(p []byte) (int, error) { - n, err := bytes.NewBuffer(b.Buffer.Bytes()[b.Index:]).Read(p) - - if err == nil { - if b.Index+int64(len(p)) < int64(b.Buffer.Len()) { - b.Index += int64(len(p)) - } else { - b.Index = int64(b.Buffer.Len()) - } - } - - return n, err -} - -func (b *buffer) Write(p []byte) (int, error) { - n, err := b.Buffer.Write(p) - - if err == nil { - b.Index = int64(b.Buffer.Len()) - } - - return n, err -} - -func (b *buffer) Seek(offset int64, whence int) (int64, error) { - var err error - var Index int64 = 0 - - switch whence { - case 0: - if offset >= int64(b.Buffer.Len()) || offset < 0 { - err = errors.New("Invalid Offset.") - } else { - b.Index = offset - Index = offset - } - default: - err = errors.New("Unsupported Seek Method.") - } - - return Index, err -} - // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -71,10 +21,12 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - fb := &buffer{} - enc := wav.NewEncoder(fb, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + ws := &writerseeker.WriterSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int + var out []byte + var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -103,6 +55,12 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } + d, err = ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } + out = append(out, d...) } - return fb.Bytes(), nil + + return d, nil } diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 763731d4..13bef836 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -8,7 +8,7 @@ import ( const ( testFile = "/home/saxon/Desktop/robot.flac" - outFile = "out.wav" + outFile = "testOut.wav" ) func TestDecodeFlac(t *testing.T) { From 155134eeed1bd79ddc52c2bdd27c92fb9f527cef Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:37:16 +1030 Subject: [PATCH 18/58] av/stream/flac: moved readAll to after loop --- stream/flac/decode.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 941610e2..6c8445e4 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -25,8 +25,6 @@ func Decode(buf []byte) ([]byte, error) { enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int - var out []byte - var d []byte for { // Decode FLAC audio samples. frame, err := stream.ParseNext() @@ -55,12 +53,10 @@ func Decode(buf []byte) ([]byte, error) { if err := enc.Write(buf); err != nil { return nil, err } - d, err = ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - out = append(out, d...) } - + d, err := ioutil.ReadAll(ws.Reader()) + if err != nil { + return nil, err + } return d, nil } From 28e26cd151cc9866cf9e1072c2936528c2ed4554 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 17:50:09 +1030 Subject: [PATCH 19/58] av/stream/flc: using my own writeSeeker implementation - working --- stream/flac/decode.go | 52 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 6c8445e4..667fcf9c 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -4,16 +4,55 @@ import ( "bytes" "errors" "io" - "io/ioutil" "github.com/go-audio/audio" "github.com/go-audio/wav" "github.com/mewkiz/flac" - "github.com/orcaman/writerseeker" ) const wavAudioFormat = 1 +type WriterSeeker struct { + buf []byte + pos int +} + +func (ws *WriterSeeker) Bytes() []byte { + return ws.buf +} + +func (m *WriterSeeker) Write(p []byte) (n int, err error) { + minCap := m.pos + len(p) + if minCap > cap(m.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra + copy(buf2, m.buf) + m.buf = buf2 + } + if minCap > len(m.buf) { + m.buf = m.buf[:minCap] + } + copy(m.buf[m.pos:], p) + m.pos += len(p) + return len(p), nil +} + +func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { + newPos, offs := 0, int(offset) + switch whence { + case io.SeekStart: + newPos = offs + case io.SeekCurrent: + newPos = m.pos + offs + case io.SeekEnd: + newPos = len(m.buf) + offs + } + if newPos < 0 { + return 0, errors.New("negative result pos") + } + m.pos = newPos + return int64(newPos), nil +} + // Decode takes a slice of flac and decodes to wav func Decode(buf []byte) ([]byte, error) { r := bytes.NewReader(buf) @@ -21,7 +60,7 @@ func Decode(buf []byte) ([]byte, error) { if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &writerseeker.WriterSeeker{} + ws := &WriterSeeker{} enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) defer enc.Close() var data []int @@ -54,9 +93,6 @@ func Decode(buf []byte) ([]byte, error) { return nil, err } } - d, err := ioutil.ReadAll(ws.Reader()) - if err != nil { - return nil, err - } - return d, nil + + return ws.Bytes(), nil } From 5f3bf33213926fee73fd1c2a8fee55735e93a12b Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 21 Jan 2019 22:52:17 +1030 Subject: [PATCH 20/58] av/stream/flac/decode.go: wrote func headers --- stream/flac/decode.go | 79 ++++++++++++++++++++++++++++++---------- stream/flac/flac_test.go | 26 +++++++++++++ 2 files changed, 85 insertions(+), 20 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 667fcf9c..42c4dace 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -1,3 +1,29 @@ +/* +NAME + decode.go + +DESCRIPTION + decode.go provides functionality for the decoding of FLAC compressed audio + +AUTHOR + Saxon Nelson-Milton + +LICENSE + decode.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( @@ -10,59 +36,72 @@ import ( "github.com/mewkiz/flac" ) -const wavAudioFormat = 1 +const wavFormat = 1 -type WriterSeeker struct { +// writeSeeker implements a memory based io.WriteSeeker. +type writeSeeker struct { buf []byte pos int } -func (ws *WriterSeeker) Bytes() []byte { +// Bytes returns the bytes contained in the writeSeekers buffer. +func (ws *writeSeeker) Bytes() []byte { return ws.buf } -func (m *WriterSeeker) Write(p []byte) (n int, err error) { - minCap := m.pos + len(p) - if minCap > cap(m.buf) { // Make sure buf has enough capacity: - buf2 := make([]byte, len(m.buf), minCap+len(p)) // add some extra - copy(buf2, m.buf) - m.buf = buf2 +// Write writes len(p) bytes from p to the writeSeeker's buf and returns the number +// of bytes written. If less than len(p) bytes are written, an error is returned. +func (ws *writeSeeker) Write(p []byte) (n int, err error) { + minCap := ws.pos + len(p) + if minCap > cap(ws.buf) { // Make sure buf has enough capacity: + buf2 := make([]byte, len(ws.buf), minCap+len(p)) // add some extra + copy(buf2, ws.buf) + ws.buf = buf2 } - if minCap > len(m.buf) { - m.buf = m.buf[:minCap] + if minCap > len(ws.buf) { + ws.buf = ws.buf[:minCap] } - copy(m.buf[m.pos:], p) - m.pos += len(p) + copy(ws.buf[ws.pos:], p) + ws.pos += len(p) return len(p), nil } -func (m *WriterSeeker) Seek(offset int64, whence int) (int64, error) { +// Seek sets the offset for the next Read or Write to offset, interpreted according +// to whence: SeekStart means relative to the start of the file, SeekCurrent means +// relative to the current offset, and SeekEnd means relative to the end. Seek returns +// the new offset relative to the start of the file and an error, if any. +func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { newPos, offs := 0, int(offset) switch whence { case io.SeekStart: newPos = offs case io.SeekCurrent: - newPos = m.pos + offs + newPos = ws.pos + offs case io.SeekEnd: - newPos = len(m.buf) + offs + newPos = len(ws.buf) + offs } if newPos < 0 { return 0, errors.New("negative result pos") } - m.pos = newPos + ws.pos = newPos return int64(newPos), nil } -// Decode takes a slice of flac and decodes to wav +// Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding +// fails, an error is returned. func Decode(buf []byte) ([]byte, error) { + // Lex and decode the FLAC into a stream to hold audio and properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { return nil, errors.New("Could not parse FLAC") } - ws := &WriterSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavAudioFormat) + + // Create WAV encoder and pass writeSeeker that will store output WAV. + ws := &writeSeeker{} + enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) defer enc.Close() + var data []int for { // Decode FLAC audio samples. diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 13bef836..d69c0494 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -1,3 +1,29 @@ +/* +NAME + flac_test.go + +DESCRIPTION + flac_test.go provides utilities to test FLAC audio decoding + +AUTHOR + Saxon Nelson-Milton + +LICENSE + flac_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) + + It is free software: you can redistribute it and/or modify them + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with revid in gpl.txt. If not, see http://www.gnu.org/licenses. +*/ package flac import ( From 7628efdbc298a1e1527375fd4416c35f52349feb Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:26:22 +1030 Subject: [PATCH 21/58] av/stream/flac: working on cleaning up decode code --- stream/flac/decode.go | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 42c4dace..5a470370 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -90,7 +90,8 @@ func (ws *writeSeeker) Seek(offset int64, whence int) (int64, error) { // Decode takes buf, a slice of FLAC, and decodes to WAV. If complete decoding // fails, an error is returned. func Decode(buf []byte) ([]byte, error) { - // Lex and decode the FLAC into a stream to hold audio and properties. + + // Lex the FLAC into a stream to hold audio and it's properties. r := bytes.NewReader(buf) stream, err := flac.Parse(r) if err != nil { @@ -99,17 +100,30 @@ func Decode(buf []byte) ([]byte, error) { // Create WAV encoder and pass writeSeeker that will store output WAV. ws := &writeSeeker{} - enc := wav.NewEncoder(ws, int(stream.Info.SampleRate), int(stream.Info.BitsPerSample), int(stream.Info.NChannels), wavFormat) + sr := int(stream.Info.SampleRate) + bps := int(stream.Info.BitsPerSample) + nc := int(stream.Info.NChannels) + enc := wav.NewEncoder(ws, sr, bps, nc, wavFormat) defer enc.Close() + // Decode FLAC into frames of samples + intBuf := &audio.IntBuffer{ + Format: &audio.Format{NumChannels: nc, SampleRate: sr}, + SourceBitDepth: bps, + } + return decodeFrames(stream, intBuf, enc, ws) +} + +// +func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { - // Decode FLAC audio samples. - frame, err := stream.ParseNext() - if err != nil { - if err == io.EOF { - break - } + frame, err := s.ParseNext() + + // If we've reached the end of the stream then we can output the writeSeeker's buffer. + if err == io.EOF { + return ws.Bytes(), nil + } else if err != nil { return nil, err } @@ -120,18 +134,9 @@ func Decode(buf []byte) ([]byte, error) { data = append(data, int(subframe.Samples[i])) } } - buf := &audio.IntBuffer{ - Format: &audio.Format{ - NumChannels: int(stream.Info.NChannels), - SampleRate: int(stream.Info.SampleRate), - }, - Data: data, - SourceBitDepth: int(stream.Info.BitsPerSample), - } - if err := enc.Write(buf); err != nil { + intBuf.Data = data + if err := e.Write(intBuf); err != nil { return nil, err } } - - return ws.Bytes(), nil } From 6f1767d152d98ca5128f4c91f56449fc6854b322 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:40:40 +1030 Subject: [PATCH 22/58] av/stream/flac: finished cleaning up decode --- stream/flac/decode.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/stream/flac/decode.go b/stream/flac/decode.go index 5a470370..34d42057 100644 --- a/stream/flac/decode.go +++ b/stream/flac/decode.go @@ -114,7 +114,9 @@ func Decode(buf []byte) ([]byte, error) { return decodeFrames(stream, intBuf, enc, ws) } -// +// decodeFrames parses frames from the stream and encodes them into WAV until +// the end of the stream is reached. The bytes from writeSeeker buffer are then +// returned. If any errors occur during encodeing, nil bytes and the error is returned. func decodeFrames(s *flac.Stream, intBuf *audio.IntBuffer, e *wav.Encoder, ws *writeSeeker) ([]byte, error) { var data []int for { From b5611bb2b42f8145dc364c1a6fbba0d96fc15757 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 10:45:36 +1030 Subject: [PATCH 23/58] av/stream/flac: added writeseeker tests --- stream/flac/flac_test.go | 44 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index d69c0494..9537d682 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -27,6 +27,7 @@ LICENSE package flac import ( + "io" "io/ioutil" "os" "testing" @@ -37,6 +38,49 @@ const ( outFile = "testOut.wav" ) +func TestWriteSeekerWrite(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + +} + +func TestWriteSeekerSeek(t *testing.T) { + writerSeeker := &writeSeeker{} + var ws io.WriteSeeker = writerSeeker + + ws.Write([]byte("hello")) + if string(writerSeeker.buf) != "hello" { + t.Fail() + } + + ws.Write([]byte(" world")) + if string(writerSeeker.buf) != "hello world" { + t.Fail() + } + + ws.Seek(-2, io.SeekEnd) + ws.Write([]byte("k!")) + if string(writerSeeker.buf) != "hello work!" { + t.Fail() + } + + ws.Seek(6, io.SeekStart) + ws.Write([]byte("gopher")) + if string(writerSeeker.buf) != "hello gopher" { + t.Fail() + } +} + func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 99b7f4a44bae180cd3f3cf288da5bca4d619581f Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 11:15:39 +1030 Subject: [PATCH 24/58] av/stream/flac: saving progress --- stream/flac/flac_test.go | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 9537d682..79274819 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -38,45 +38,51 @@ const ( outFile = "testOut.wav" ) +// TestWriteSeekerWrite checks that basic writing to the ws works as expected. func TestWriteSeekerWrite(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Write failed, got: %v, want: %v", got, tstStr1) } - ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want = "hello world" + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want { + t.Errorf("Second write failed, got: %v, want: %v", got, want) } - } +// TestWriteSeekerSeek checks that writing and seeking works as expected, i.e. we +// can write, then seek to a knew place in the buf, and write again, either replacing +// bytes, or appending bytes. func TestWriteSeekerSeek(t *testing.T) { - writerSeeker := &writeSeeker{} - var ws io.WriteSeeker = writerSeeker + ws := &writeSeeker{} ws.Write([]byte("hello")) - if string(writerSeeker.buf) != "hello" { + if string(ws.buf) != "hello" { t.Fail() } ws.Write([]byte(" world")) - if string(writerSeeker.buf) != "hello world" { + if string(ws.buf) != "hello world" { t.Fail() } ws.Seek(-2, io.SeekEnd) ws.Write([]byte("k!")) - if string(writerSeeker.buf) != "hello work!" { + if string(ws.buf) != "hello work!" { t.Fail() } ws.Seek(6, io.SeekStart) ws.Write([]byte("gopher")) - if string(writerSeeker.buf) != "hello gopher" { + if string(ws.buf) != "hello gopher" { t.Fail() } } From ba209a1d7c3339102b9a84185b5b9fa4efe961d6 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 11:21:12 +1030 Subject: [PATCH 25/58] mts/mpegts.go: FindPMT now also returns index --- stream/mts/mpegts.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index e48f17f0..276bee13 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -129,18 +129,18 @@ type Packet struct { // FindPMT will take a clip of mpegts and try to find a PMT table - if one // is found, then it is returned, otherwise nil and an error is returned. -func FindPMT(d []byte) (p []byte, err error) { +func FindPMT(d []byte) (p []byte, i int, err error) { if len(d) < PacketSize { - return nil, errors.New("Mmpegts data not of valid length") + return nil, -1, errors.New("Mmpegts data not of valid length") } - for i := 0; i < len(d); i += PacketSize { + for i = 0; i < len(d); i += PacketSize { pid := (uint16(d[i+1]&0x1f) << 8) | uint16(d[i+2]) if pid == pmtPid { p = d[i+4 : i+PacketSize] return } } - return nil, errors.New("Could not find pmt table in mpegts data") + return nil, -1, errors.New("Could not find pmt table in mpegts data") } // FillPayload takes a channel and fills the packets Payload field until the From 6b4e0946dd5816343fa9dbcbbf363ded3ac54991 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 11:27:24 +1030 Subject: [PATCH 26/58] stream/mts/mpegts.go: updated FindPMT comment --- stream/mts/mpegts.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/mts/mpegts.go b/stream/mts/mpegts.go index 276bee13..0bef80d2 100644 --- a/stream/mts/mpegts.go +++ b/stream/mts/mpegts.go @@ -128,7 +128,7 @@ type Packet struct { } // FindPMT will take a clip of mpegts and try to find a PMT table - if one -// is found, then it is returned, otherwise nil and an error is returned. +// is found, then it is returned along with its index, otherwise nil, -1 and an error is returned. func FindPMT(d []byte) (p []byte, i int, err error) { if len(d) < PacketSize { return nil, -1, errors.New("Mmpegts data not of valid length") From 9ffc5367cb01297c8aa696e24e440977353595ff Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:14:40 +1030 Subject: [PATCH 27/58] av/stream/flac: cleaned up testing file --- stream/flac/flac_test.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 79274819..0d8079f7 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -64,29 +64,43 @@ func TestWriteSeekerWrite(t *testing.T) { func TestWriteSeekerSeek(t *testing.T) { ws := &writeSeeker{} - ws.Write([]byte("hello")) - if string(ws.buf) != "hello" { - t.Fail() + const tstStr1 = "hello" + want1 := tstStr1 + ws.Write([]byte(tstStr1)) + got := string(ws.buf) + if got != tstStr1 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want1) } - ws.Write([]byte(" world")) - if string(ws.buf) != "hello world" { - t.Fail() + const tstStr2 = " world" + const want2 = tstStr1 + tstStr2 + ws.Write([]byte(tstStr2)) + got = string(ws.buf) + if got != want2 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want2) } + const tstStr3 = "k!" + const want3 = "hello work!" ws.Seek(-2, io.SeekEnd) - ws.Write([]byte("k!")) - if string(ws.buf) != "hello work!" { - t.Fail() + ws.Write([]byte(tstStr3)) + got = string(ws.buf) + if got != want3 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want3) } + const tstStr4 = "gopher" + const want4 = "hello gopher" ws.Seek(6, io.SeekStart) - ws.Write([]byte("gopher")) - if string(ws.buf) != "hello gopher" { - t.Fail() + ws.Write([]byte(tstStr4)) + got = string(ws.buf) + if got != want4 { + t.Errorf("Unexpected output, got: %v, want: %v", got, want4) } } +// TestDecodeFlac checks that we can load a flac file and decode to wav, writing +// to a wav file. func TestDecodeFlac(t *testing.T) { b, err := ioutil.ReadFile(testFile) if err != nil { From 4602a555d5d098471ad9ee6885a44786f2e36619 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 12:27:52 +1030 Subject: [PATCH 28/58] av/stream/flac: updated test file directory --- stream/flac/flac_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream/flac/flac_test.go b/stream/flac/flac_test.go index 0d8079f7..1f8019e5 100644 --- a/stream/flac/flac_test.go +++ b/stream/flac/flac_test.go @@ -34,7 +34,7 @@ import ( ) const ( - testFile = "/home/saxon/Desktop/robot.flac" + testFile = "../../../test/test-data/av/input/robot.flac" outFile = "testOut.wav" ) From 2da8d2af85100797248dcd7a789777266a19e0de Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 13:54:56 +1030 Subject: [PATCH 29/58] av/rtmp/rtmp_test.go: using local rtmpSender io.writer implemntation to handle errors from rtmp --- rtmp/rtmp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 618f2b95..be4908a6 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) { } defer f.Close() + rs := &rtmpSender{conn: c} // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(c, true, true, 25) + flvEncoder, err := flv.NewEncoder(rs, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) } From df146cfb17f06951c2fc4318d967ab83ffb2ba3f Mon Sep 17 00:00:00 2001 From: saxon Date: Wed, 23 Jan 2019 15:50:25 +1030 Subject: [PATCH 30/58] stream/mts/encoder.go: writing psi at start of stream --- stream/mts/encoder.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index decc8558..60393285 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -233,6 +233,12 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { + if e.psiCount <= 0 { + err := e.writePSI() + if err != nil { + return err + } + } // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -262,14 +268,8 @@ func (e *Encoder) Encode(nalu []byte) error { pkt.PCR = e.pcr() pusi = false } - if e.psiCount <= 0 { - err := e.writePSI() - if err != nil { - return err - } - } - e.psiCount-- _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) + e.psiCount-- if err != nil { return err } From 42c9fb1d097abb012ba802b5792c16a870a49719 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 24 Jan 2019 14:33:22 +1030 Subject: [PATCH 31/58] stream/mts/encoder.go: writing psi based on time interval rather than number of packets interval --- stream/mts/encoder.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 60393285..8a4121a4 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -122,7 +122,7 @@ var ( ) const ( - psiSndCnt = 7 + psiInterval = 1 * time.Second ) // timeLocation holds time and location data @@ -199,9 +199,10 @@ type Encoder struct { tsSpace [PacketSize]byte pesSpace [pes.MaxPesSize]byte - psiCount int - continuity map[int]byte + + now time.Time + psiLastTime time.Time } // NewEncoder returns an Encoder with the specified frame rate. @@ -233,12 +234,15 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { - if e.psiCount <= 0 { + e.now = time.Now() + if e.now.Sub(e.psiLastTime) > psiInterval { err := e.writePSI() if err != nil { return err } + e.psiLastTime = e.now } + // Prepare PES data. pesPkt := pes.Packet{ StreamID: streamID, @@ -269,7 +273,6 @@ func (e *Encoder) Encode(nalu []byte) error { pusi = false } _, err := e.dst.Write(pkt.Bytes(e.tsSpace[:PacketSize])) - e.psiCount-- if err != nil { return err } @@ -318,7 +321,6 @@ func (e *Encoder) writePSI() error { if err != nil { return err } - e.psiCount = psiSndCnt return nil } From 31b9ec07e9be6b67393f8eda884556233705a685 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 24 Jan 2019 14:39:14 +1030 Subject: [PATCH 32/58] stream/mts/encoder.go: no need to have a now field to capture current time - this can be local to encode function --- stream/mts/encoder.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/stream/mts/encoder.go b/stream/mts/encoder.go index 8a4121a4..02761b91 100644 --- a/stream/mts/encoder.go +++ b/stream/mts/encoder.go @@ -201,7 +201,6 @@ type Encoder struct { continuity map[int]byte - now time.Time psiLastTime time.Time } @@ -234,13 +233,13 @@ const ( // generate handles the incoming data and generates equivalent mpegts packets - // sending them to the output channel. func (e *Encoder) Encode(nalu []byte) error { - e.now = time.Now() - if e.now.Sub(e.psiLastTime) > psiInterval { + now := time.Now() + if now.Sub(e.psiLastTime) > psiInterval { err := e.writePSI() if err != nil { return err } - e.psiLastTime = e.now + e.psiLastTime = now } // Prepare PES data. From 071e6fd0f7fe99d530f2f20e8ca241cf79a05587 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 29 Jan 2019 19:56:02 +1030 Subject: [PATCH 33/58] revid: honour difference between rtmp dst and others --- revid/revid.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index a15fab3a..5b37dc09 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -210,7 +210,7 @@ func (r *Revid) reset(config Config) error { } } - r.destination = make([]loadSender, len(r.config.Outputs)) + r.destination = r.destination[:0] for outNo, outType := range r.config.Outputs { switch outType { case File: @@ -218,19 +218,19 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Http: r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) case Udp: @@ -238,7 +238,7 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtp: r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { From 58102e5587ad7cd1d6fe5f26576d390cad9a8465 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 30 Jan 2019 13:19:00 +1030 Subject: [PATCH 34/58] revid: fix http destination assignment --- revid/revid.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 5b37dc09..9de472e1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,8 +211,8 @@ func (r *Revid) reset(config Config) error { } r.destination = r.destination[:0] - for outNo, outType := range r.config.Outputs { - switch outType { + for _, typ := range r.config.Outputs { + switch typ { case File: s, err := newFileSender(config.OutputFileName) if err != nil { @@ -232,7 +232,7 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Http: - r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { From ed4d97f893414b3d1f19edc305e4e82c382eaf73 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:03:50 +1030 Subject: [PATCH 35/58] stream/mts: patch for revid.Start() no exit bug --- revid/revid.go | 51 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 9de472e1..df1e111a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,6 +120,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + + err chan error } // packer takes data segments and packs them into clips @@ -176,13 +178,29 @@ func (p *packer) Write(frame []byte) (int, error) { // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - r := Revid{ns: ns} + r := Revid{ns: ns, err: make(chan error)} r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } + go func() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) + } + } + } + }() return &r, nil } @@ -312,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - err := r.setupInput() - return err + go r.setupInput() + return nil } // Stop halts any processing of video data from a camera or file @@ -474,11 +492,13 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + delay := time.Second / time.Duration(r.config.FrameRate) + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } func (r *Revid) startV4L() error { @@ -526,10 +546,12 @@ func (r *Revid) startV4L() error { return err } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } // setupInputForFile sets things up for getting input from a file @@ -544,5 +566,8 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.lexTo(r.encoder, f, delay) + go func() { + r.err <- r.lexTo(r.encoder, f, delay) + }() + return nil } From d53eafe215f430a480199ec9604b12b0276f83a3 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:15:38 +1030 Subject: [PATCH 36/58] revid/revid.go: not running r.setupInput() as routine - now getting error an returning --- revid/revid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index df1e111a..febb05c1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -330,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - go r.setupInput() - return nil + err := r.setupInput() + return err } // Stop halts any processing of video data from a camera or file From e18b1f51f0d1557d0a5a56cd119bf2f31c9c5467 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:49:44 +1030 Subject: [PATCH 37/58] revid/revid.go: capture error from r.Start() in error handling routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index febb05c1..2b146bf4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -194,7 +194,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) } - r.Start() + err = r.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } From 1e307fc37b4b971310466f5f204d3e08c24d539d Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:42:20 +1030 Subject: [PATCH 38/58] revid/revid.go: made routines named rather than anonymous --- revid/revid.go | 55 +++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 2b146bf4..c8f6d217 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -185,23 +185,25 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { return nil, err } - go func() { - for { - err := <-r.err + go r.handleErrors() + return &r, nil +} + +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() if err != nil { - r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) - err = r.Stop() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) - } - err = r.Start() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) - } + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + err = r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } } - }() - return &r, nil + } } // Bitrate returns the result of the most recent bitrate check. @@ -492,12 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Duration(0)) return nil } @@ -534,7 +531,6 @@ func (r *Revid) startV4L() error { r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) - delay := time.Second / time.Duration(r.config.FrameRate) stdout, err := r.cmd.StdoutPipe() if err != nil { return err @@ -546,17 +542,12 @@ func (r *Revid) startV4L() error { return err } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { - delay := time.Second / time.Duration(r.config.FrameRate) f, err := os.Open(r.config.InputFileName) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) @@ -566,8 +557,12 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go func() { - r.err <- r.lexTo(r.encoder, f, delay) - }() + go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) return nil } + +func (r *Revid) lex(read io.Reader, delay time.Duration) { + r.config.Logger.Log(logger.Info, pkg+"reading input data") + r.err <- r.lexTo(r.encoder, read, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading input data") +} From 051263c144c195d15bea214327bbf259385fdc3c Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:58:25 +1030 Subject: [PATCH 39/58] revid/revid.go: revid.lex to revid.transcode --- revid/revid.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index c8f6d217..6d52be59 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -494,7 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.lex(stdout, time.Duration(0)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -542,7 +542,7 @@ func (r *Revid) startV4L() error { return err } - go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -557,11 +557,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) lex(read io.Reader, delay time.Duration) { +func (r *Revid) transcode(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From d26aa8643aec5661ceca0ccc3866b1f090691dd7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 09:47:31 +1030 Subject: [PATCH 40/58] revid: renamed transcode to processFrom. Using mutex for isRunning flag. Created setIsRunning func to set state of isRunning. --- revid/revid.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 6d52be59..f7313b57 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -121,6 +122,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + mu sync.Mutex + err chan error } @@ -189,6 +192,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// TODO: put more thought into error severity func (r *Revid) handleErrors() { for { err := <-r.err @@ -317,18 +321,21 @@ func (r *Revid) reset(config Config) error { // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") @@ -338,12 +345,12 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! @@ -359,7 +366,7 @@ func (r *Revid) outputClips() { lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -403,7 +410,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -494,10 +501,16 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" @@ -542,7 +555,7 @@ func (r *Revid) startV4L() error { return err } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } @@ -557,11 +570,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) transcode(read io.Reader, delay time.Duration) { +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From 9bddf343f589771be3f40e9ef6eebef0cbcd1de0 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:07:00 +1030 Subject: [PATCH 41/58] revid/revid.go: moved revid.mu declaration . Updated todo owner --- revid/revid.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index f7313b57..3e658312 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,10 +120,9 @@ type Revid struct { bitrate int // isRunning is a loaded and cocked foot-gun. + mu sync.Mutex isRunning bool - mu sync.Mutex - err chan error } @@ -192,7 +191,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } -// TODO: put more thought into error severity +// TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { err := <-r.err @@ -319,7 +318,7 @@ func (r *Revid) reset(config Config) error { return nil } -// IsRunning returns whether the receiver is running. +// IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { r.mu.Lock() ret := r.isRunning @@ -327,6 +326,13 @@ func (r *Revid) IsRunning() bool { return ret } +// setIsRunning sets revid.isRunning using b. +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { @@ -505,12 +511,6 @@ func (r *Revid) startRaspivid() error { return nil } -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" From c44d6bbfd3187cd51440186e1b571d9355dc74c7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:08:49 +1030 Subject: [PATCH 42/58] revid/revid.go: not during time.Duration conversion --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 3e658312..6833ec2b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -507,7 +507,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.processFrom(stdout, time.Duration(0)) + go r.processFrom(stdout, 0) return nil } From 0e34623f0f5bb50adb6d17eed657338c38bfb15b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sat, 2 Feb 2019 12:27:21 +1030 Subject: [PATCH 43/58] rtmp: Use a net.Conn interface instead of *net.TCPConn. --- rtmp/conn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 3864df98..7e1b3b15 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -76,7 +76,7 @@ type link struct { protocol int32 timeout uint port uint16 - conn *net.TCPConn + conn net.Conn } // method represents an RTMP method. From 1af4b250306d9240d097845672da2fcbc0c2413e Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:17:44 +1030 Subject: [PATCH 44/58] cmd/revid-cli & revid: removed startRevid and stopRevid as shouldn't be required when we have revid.Start() and revid.Stop(). Created revid.Config() which returns copy of config safely using mutex. removed updateRevid in revid-cli and move to fun revid.Update() - as there's no reason why it can't just be a receiver func - even better considering we want to start moving alot of stuff from revid-cli to the revid-api anyways. --- cmd/revid-cli/main.go | 54 +++++--------------- revid/revid.go | 115 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 42 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 5b826b91..826cb2a4 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -72,23 +72,22 @@ func main() { cfg := handleFlags() if !*useNetsender { - // run revid for the specified duration - rv, _, err := startRevid(nil, cfg) + rv, err := revid.New(cfg, nil) if err != nil { + cfg.Logger.Log(logger.Fatal, pkg+"failed to initialiase revid", "error", err.Error()) + } + if err = rv.Start(); err != nil { cfg.Logger.Log(logger.Fatal, pkg+"failed to start revid", "error", err.Error()) } time.Sleep(*runDurationPtr) - err = stopRevid(rv) - if err != nil { + if err = rv.Stop(); err != nil { cfg.Logger.Log(logger.Error, pkg+"failed to stop revid before program termination", "error", err.Error()) } return } - err := run(nil, cfg) - if err != nil { + if err := run(cfg); err != nil { log.Log(logger.Fatal, pkg+"failed to run revid", "error", err.Error()) - os.Exit(1) } } @@ -244,27 +243,20 @@ func handleFlags() revid.Config { } // initialize then run the main NetSender client -func run(rv *revid.Revid, cfg revid.Config) error { +func run(cfg revid.Config) error { // initialize NetSender and use NetSender's logger - //config.Logger = netsender.Logger() log.Log(logger.Info, pkg+"running in NetSender mode") var ns netsender.Sender - err := ns.Init(log, nil, nil, nil) - if err != nil { + if err := ns.Init(log, nil, nil, nil); err != nil { return err } vars, _ := ns.Vars() vs := ns.VarSum() - paused := false - if vars["mode"] == "Paused" { - paused = true - } - if !paused { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) - if err != nil { - return err - } + + rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + if err != nil { + return err } for { @@ -331,28 +323,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -// wrappers for stopping and starting revid -func startRevid(ns *netsender.Sender, cfg revid.Config) (*revid.Revid, revid.Config, error) { - rv, err := revid.New(cfg, ns) - if err != nil { - return nil, cfg, err - } - err = rv.Start() - return rv, cfg, err -} - -func stopRevid(rv *revid.Revid) error { - err := rv.Stop() - if err != nil { - return err - } - - // FIXME(kortschak): Is this waiting on completion of work? - // Use a wait group and Wait method if it is. - time.Sleep(revidStopTime) - return nil -} - func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { if stop { err := stopRevid(rv) diff --git a/revid/revid.go b/revid/revid.go index 6833ec2b..81546ead 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -40,6 +40,7 @@ import ( "sync" "time" + "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -326,6 +327,13 @@ func (r *Revid) IsRunning() bool { return ret } +func (r *Revid) Config() Config { + r.mu.Lock() + ret := r.config + r.mu.Unlock() + return ret +} + // setIsRunning sets revid.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() @@ -366,6 +374,113 @@ func (r *Revid) Stop() error { return nil } +func (r *Revid) Update(vars map[string]string) error { + if r.IsRunning() { + r.Stop() + } + //look through the vars and update revid where needed + for key, value := range vars { + switch key { + case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! + switch value { + case "File": + cfg.Outputs[0] = revid.File + case "Http": + cfg.Outputs[0] = revid.Http + case "Rtmp": + cfg.Outputs[0] = revid.Rtmp + case "FfmpegRtmp": + cfg.Outputs[0] = revid.FfmpegRtmp + default: + log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + continue + } + case "FramesPerClip": + f, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + break + } + cfg.FramesPerClip = uint(f) + case "RtmpUrl": + cfg.RtmpUrl = value + case "Bitrate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.Bitrate = uint(r) + case "OutputFileName": + cfg.OutputFileName = value + case "InputFileName": + cfg.InputFileName = value + case "Height": + h, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid height param", "value", value) + break + } + cfg.Height = uint(h) + case "Width": + w, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid width param", "value", value) + break + } + cfg.Width = uint(w) + case "FrameRate": + r, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + break + } + cfg.FrameRate = uint(r) + case "HttpAddress": + cfg.HttpAddress = value + case "Quantization": + q, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + break + } + cfg.Quantization = uint(q) + case "IntraRefreshPeriod": + p, err := strconv.ParseUint(value, 10, 0) + if err != nil { + log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + break + } + cfg.IntraRefreshPeriod = uint(p) + case "HorizontalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipHorizontal = true + case "false": + cfg.FlipHorizontal = false + default: + log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + } + case "VerticalFlip": + switch strings.ToLower(value) { + case "true": + cfg.FlipVertical = true + case "false": + cfg.FlipVertical = false + default: + log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + } + default: + } + } + + return startRevid(ns, cfg) +} + // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { From 9095044e234e730c19ed6a4f468bb46f2ba5c944 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 21:55:40 +1030 Subject: [PATCH 45/58] revid: using waitgroups so that revid.Stop() is safer - we can wait until the input and output routines are done before we do anything, like touch the revid config. Also started modifying revid.Update() to remove errors introduced after the copy of updateRevid from revid-cli to revid.go in the previous commit. --- revid/revid.go | 59 ++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 81546ead..33d847ea 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - revid.go + r.go DESCRIPTION See Readme.md @@ -40,7 +40,6 @@ import ( "sync" "time" - "bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/stream" "bitbucket.org/ausocean/av/stream/flv" "bitbucket.org/ausocean/av/stream/lex" @@ -120,10 +119,11 @@ type Revid struct { // bitrate hold the last send bitrate calculation result. bitrate int - // isRunning is a loaded and cocked foot-gun. mu sync.Mutex isRunning bool + wg sync.WaitGroup + err chan error } @@ -334,7 +334,7 @@ func (r *Revid) Config() Config { return ret } -// setIsRunning sets revid.isRunning using b. +// setIsRunning sets r.isRunning using b. func (r *Revid) setIsRunning(b bool) { r.mu.Lock() r.isRunning = b @@ -348,9 +348,11 @@ func (r *Revid) Start() error { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") + // TODO: this doesn't need to be here r.config.Logger.Log(logger.Debug, pkg+"setting up output") r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") + r.wg.Add(1) go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") err := r.setupInput() @@ -371,6 +373,7 @@ func (r *Revid) Stop() error { if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } + r.wg.Wait() return nil } @@ -388,89 +391,89 @@ func (r *Revid) Update(vars map[string]string) error { // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Outputs[0] = revid.File + r.config.Outputs[0] = File case "Http": - cfg.Outputs[0] = revid.Http + r.config.Outputs[0] = Http case "Rtmp": - cfg.Outputs[0] = revid.Rtmp + r.config.Outputs[0] = Rtmp case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp + r.config.Outputs[0] = FfmpegRtmp default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue } case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } - cfg.FramesPerClip = uint(f) + r.config.FramesPerClip = uint(f) case "RtmpUrl": - cfg.RtmpUrl = value + r.config.RtmpUrl = value case "Bitrate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.Bitrate = uint(r) + r.config.Bitrate = uint(r) case "OutputFileName": - cfg.OutputFileName = value + r.config.OutputFileName = value case "InputFileName": - cfg.InputFileName = value + r.config.InputFileName = value case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid height param", "value", value) break } - cfg.Height = uint(h) + r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid width param", "value", value) break } - cfg.Width = uint(w) + r.config.Width = uint(w) case "FrameRate": r, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - cfg.FrameRate = uint(r) + r.config.FrameRate = uint(r) case "HttpAddress": - cfg.HttpAddress = value + r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } - cfg.Quantization = uint(q) + r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } - cfg.IntraRefreshPeriod = uint(p) + r.config.IntraRefreshPeriod = uint(p) case "HorizontalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipHorizontal = true + r.config.FlipHorizontal = true case "false": - cfg.FlipHorizontal = false + r.config.FlipHorizontal = false default: log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { case "true": - cfg.FlipVertical = true + r.config.FlipVertical = true case "false": - cfg.FlipVertical = false + r.config.FlipVertical = false default: log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } @@ -478,7 +481,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return startRevid(ns, cfg) + return r.Start() } // outputClips takes the clips produced in the packClips method and outputs them @@ -622,6 +625,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } + r.wg.Add(1) go r.processFrom(stdout, 0) return nil } @@ -670,6 +674,7 @@ func (r *Revid) startV4L() error { return err } + r.wg.Add(1) go r.processFrom(stdout, time.Duration(0)) return nil } @@ -685,6 +690,7 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. + r.wg.Add(1) go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } @@ -693,4 +699,5 @@ func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") + r.wg.Done() } From 1010721dd070882058660fff25f3e84db4ad446d Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:37:38 +1030 Subject: [PATCH 46/58] cmd/revid-cli & revid: Checking revid mode differently - now using ns.Mode(), which should soon be an available feature. Also now using ns.SetMode() - which tells netreceiver that we've changed mode. --- cmd/revid-cli/main.go | 157 +++++++++--------------------------------- revid/revid.go | 26 +++---- 2 files changed, 45 insertions(+), 138 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 826cb2a4..58f8e507 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -50,6 +50,13 @@ const ( defaultLogVerbosity = logger.Debug ) +// Revid modes +const ( + normal = "Normal" + paused = "Paused" + burst = "Burst" +) + // Other misc consts const ( netSendRetryTime = 5 * time.Second @@ -244,9 +251,9 @@ func handleFlags() revid.Config { // initialize then run the main NetSender client func run(cfg revid.Config) error { - // initialize NetSender and use NetSender's logger log.Log(logger.Info, pkg+"running in NetSender mode") + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err @@ -254,8 +261,12 @@ func run(cfg revid.Config) error { vars, _ := ns.Vars() vs := ns.VarSum() - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, false) + rv, err := revid.New(cfg, &ns) if err != nil { + log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) + } + + if err = rv.Update(vars); err != nil { return err } @@ -267,7 +278,6 @@ func run(cfg revid.Config) error { } if vs != ns.VarSum() { - // vars changed vars, err := ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) @@ -275,24 +285,32 @@ func run(cfg revid.Config) error { continue } vs = ns.VarSum() - if vars["mode"] == "Paused" { - if !paused { + + switch ns.Mode() { + case paused: + if rv.IsRunning() { log.Log(logger.Info, pkg+"pausing revid") - err = stopRevid(rv) - if err != nil { - log.Log(logger.Error, pkg+"failed to stop revide", "error", err.Error()) + if err = rv.Stop(); err != nil { + log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) continue } - paused = true + ns.SetMode(paused) } - } else { - rv, cfg, err = updateRevid(&ns, rv, cfg, vars, !paused) - if err != nil { + case normal: + if err = rv.Update(vars); err != nil { return err } - if paused { - paused = false + ns.SetMode(normal) + case burst: + if err = rv.Start(); err != nil { + return err } + ns.SetMode(burst) + time.Sleep(rv.Config().BurstPeriod) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused) } } sleepTime, _ := strconv.Atoi(ns.Param("mp")) @@ -323,117 +341,6 @@ func send(ns *netsender.Sender, rv *revid.Revid) error { return nil } -func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars map[string]string, stop bool) (*revid.Revid, revid.Config, error) { - if stop { - err := stopRevid(rv) - if err != nil { - return nil, cfg, err - } - } - - //look through the vars and update revid where needed - for key, value := range vars { - switch key { - case "Output": - // FIXME(kortschak): There can be only one! - // How do we specify outputs after the first? - // - // Maybe we shouldn't be doing this! - switch value { - case "File": - cfg.Outputs[0] = revid.File - case "Http": - cfg.Outputs[0] = revid.Http - case "Rtmp": - cfg.Outputs[0] = revid.Rtmp - case "FfmpegRtmp": - cfg.Outputs[0] = revid.FfmpegRtmp - default: - log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) - continue - } - case "FramesPerClip": - f, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) - break - } - cfg.FramesPerClip = uint(f) - case "RtmpUrl": - cfg.RtmpUrl = value - case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.Bitrate = uint(r) - case "OutputFileName": - cfg.OutputFileName = value - case "InputFileName": - cfg.InputFileName = value - case "Height": - h, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) - break - } - cfg.Height = uint(h) - case "Width": - w, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) - break - } - cfg.Width = uint(w) - case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) - break - } - cfg.FrameRate = uint(r) - case "HttpAddress": - cfg.HttpAddress = value - case "Quantization": - q, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) - break - } - cfg.Quantization = uint(q) - case "IntraRefreshPeriod": - p, err := strconv.ParseUint(value, 10, 0) - if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) - break - } - cfg.IntraRefreshPeriod = uint(p) - case "HorizontalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipHorizontal = true - case "false": - cfg.FlipHorizontal = false - default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) - } - case "VerticalFlip": - switch strings.ToLower(value) { - case "true": - cfg.FlipVertical = true - case "false": - cfg.FlipVertical = false - default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) - } - default: - } - } - - return startRevid(ns, cfg) -} - // flagStrings implements an appending string set flag. type flagStrings []string diff --git a/revid/revid.go b/revid/revid.go index 33d847ea..c90ba8da 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -405,19 +405,19 @@ func (r *Revid) Update(vars map[string]string) error { case "FramesPerClip": f, err := strconv.ParseUint(value, 10, 0) if err != nil { - .Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framesperclip param", "value", value) break } r.config.FramesPerClip = uint(f) case "RtmpUrl": r.config.RtmpUrl = value case "Bitrate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.Bitrate = uint(r) + r.config.Bitrate = uint(v) case "OutputFileName": r.config.OutputFileName = value case "InputFileName": @@ -425,37 +425,37 @@ func (r *Revid) Update(vars map[string]string) error { case "Height": h, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid height param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid height param", "value", value) break } r.config.Height = uint(h) case "Width": w, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid width param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid width param", "value", value) break } r.config.Width = uint(w) case "FrameRate": - r, err := strconv.ParseUint(value, 10, 0) + v, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid framerate param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid framerate param", "value", value) break } - r.config.FrameRate = uint(r) + r.config.FrameRate = uint(v) case "HttpAddress": r.config.HttpAddress = value case "Quantization": q, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid quantization param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid quantization param", "value", value) break } r.config.Quantization = uint(q) case "IntraRefreshPeriod": p, err := strconv.ParseUint(value, 10, 0) if err != nil { - log.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid intrarefreshperiod param", "value", value) break } r.config.IntraRefreshPeriod = uint(p) @@ -466,7 +466,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipHorizontal = false default: - log.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid HorizontalFlip param", "value", value) } case "VerticalFlip": switch strings.ToLower(value) { @@ -475,7 +475,7 @@ func (r *Revid) Update(vars map[string]string) error { case "false": r.config.FlipVertical = false default: - log.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) + r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } default: } From 6171c4e9994a87722733249a11e0d607e8e788b5 Mon Sep 17 00:00:00 2001 From: saxon Date: Sun, 3 Feb 2019 23:43:51 +1030 Subject: [PATCH 47/58] revid: added handling of burstPeriod to config --- revid/config.go | 7 +++++++ revid/revid.go | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/revid/config.go b/revid/config.go index dc9c5a8d..b0ba2bc4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -68,6 +68,7 @@ type Config struct { RtpAddress string Logger Logger SendRetry bool + BurstPeriod uint } // Enums for config struct @@ -114,6 +115,7 @@ const ( defaultInputCodec = H264 defaultVerbosity = No // FIXME(kortschak): This makes no sense whatsoever. No is currently 15. defaultRtpAddr = "localhost:6970" + defaultBurstPeriod = 10 // Seconds ) // Validate checks for any errors in the config fields and defaults settings @@ -200,6 +202,11 @@ func (c *Config) Validate(r *Revid) error { } } + if c.BurstPeriod == 0 { + c.Logger.Log(logger.Warning, pkg+"no burst period defined, defaulting", "burstPeriod", defaultBurstPeriod) + c.BurstPeriod = defaultBurstPeriod + } + if c.FramesPerClip < 1 { c.Logger.Log(logger.Warning, pkg+"no FramesPerClip defined, defaulting", "framesPerClip", defaultFramesPerClip) diff --git a/revid/revid.go b/revid/revid.go index c90ba8da..9f1cc12c 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -477,6 +477,13 @@ func (r *Revid) Update(vars map[string]string) error { default: r.config.Logger.Log(logger.Warning, pkg+"invalid VerticalFlip param", "value", value) } + case "BurstPeriod": + v, err := strconv.ParseUint(value, 10, 0) + if err != nil { + r.config.Logger.Log(logger.Warning, pkg+"invalid BurstPeriod param", "value", value) + break + } + r.config.BurstPeriod = uint(v) default: } } From ee7eb84d26e29b3d9cab7005fddd48d1847a8211 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 13:25:37 +1030 Subject: [PATCH 48/58] revid-cli: correctly using ns.Mode() and ns.SetMode() --- cmd/revid-cli/main.go | 75 +++++++++++++++++++++++-------------------- revid/revid.go | 2 +- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 58f8e507..ac8fff12 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -266,52 +266,57 @@ func run(cfg revid.Config) error { log.Log(logger.Fatal, pkg+"could not initialise revid", "error", err.Error()) } + // Update revid to get latest config settings from netreceiver. if err = rv.Update(vars); err != nil { return err } + // If mode on netreceiver isn't paused then we can start revid. + if ns.Mode() != paused { + if err = rv.Start(); err != nil { + return err + } + } + for { - if err := send(&ns, rv); err != nil { - log.Log(logger.Error, pkg+"polling failed", "error", err.Error()) + if err := ns.Run(); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...") time.Sleep(netSendRetryTime) continue } - if vs != ns.VarSum() { - vars, err := ns.Vars() - if err != nil { - log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) - time.Sleep(netSendRetryTime) - continue - } - vs = ns.VarSum() + // If var sum hasn't change we continue + if vs == ns.VarSum() { + continue + } - switch ns.Mode() { - case paused: - if rv.IsRunning() { - log.Log(logger.Info, pkg+"pausing revid") - if err = rv.Stop(); err != nil { - log.Log(logger.Error, pkg+"failed to stop revid", "error", err.Error()) - continue - } - ns.SetMode(paused) - } - case normal: - if err = rv.Update(vars); err != nil { - return err - } - ns.SetMode(normal) - case burst: - if err = rv.Start(); err != nil { - return err - } - ns.SetMode(burst) - time.Sleep(rv.Config().BurstPeriod) - if err = rv.Stop(); err != nil { - return err - } - ns.SetMode(paused) + vars, err := ns.Vars() + if err != nil { + log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) + time.Sleep(netSendRetryTime) + continue + } + vs = ns.VarSum() + + if err = rv.Update(vars); err != nil { + return err + } + + switch ns.Mode() { + case paused: + case normal: + if err = rv.Start(); err != nil { + return err } + case burst: + if err = rv.Start(); err != nil { + return err + } + time.Sleep(time.Duration(rv.Config().BurstPeriod)) + if err = rv.Stop(); err != nil { + return err + } + ns.SetMode(paused, &vs) } sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) diff --git a/revid/revid.go b/revid/revid.go index 9f1cc12c..e4c7dbbe 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -488,7 +488,7 @@ func (r *Revid) Update(vars map[string]string) error { } } - return r.Start() + return nil } // outputClips takes the clips produced in the packClips method and outputs them From 93e3899725b8e6029c1cc80265f0ba1405eb4b7e Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:04:49 +1030 Subject: [PATCH 49/58] cmd/revid-cli: using ns.Send() rather than ns.Run() to poll --- cmd/revid-cli/main.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index ac8fff12..bcabf40d 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -272,15 +272,19 @@ func run(cfg revid.Config) error { } // If mode on netreceiver isn't paused then we can start revid. - if ns.Mode() != paused { + if ns.Mode() != paused && ns.Mode() != burst { if err = rv.Start(); err != nil { return err } } + if ns.Mode() == burst { + ns.SetMode(normal, &vs) + } + for { - if err := ns.Run(); err != nil { - log.Log(logger.Error, pkg+"Run Failed. Retrying...") + if err := send(&ns, rv); err != nil { + log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue } @@ -309,10 +313,12 @@ func run(cfg revid.Config) error { return err } case burst: + log.Log(logger.Info, pkg+"Starting burst...") if err = rv.Start(); err != nil { return err } - time.Sleep(time.Duration(rv.Config().BurstPeriod)) + time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) + log.Log(logger.Info, pkg+"Stopping burst...") if err = rv.Stop(); err != nil { return err } From 8978f9edc59d050a1ee590edfd58f59f6cf2ace5 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:12:30 +1030 Subject: [PATCH 50/58] cmd/revid-cli & revid: using goto to sleep for monitor period, and using wg.Done() at the end of output routine. --- cmd/revid-cli/main.go | 10 +++++++--- revid/revid.go | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index bcabf40d..1eafaae9 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,12 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") + var err error + var vars map[string]string + // initialize NetSender and use NetSender's logger var ns netsender.Sender if err := ns.Init(log, nil, nil, nil); err != nil { return err } - vars, _ := ns.Vars() + vars, _ = ns.Vars() vs := ns.VarSum() rv, err := revid.New(cfg, &ns) @@ -291,10 +294,10 @@ func run(cfg revid.Config) error { // If var sum hasn't change we continue if vs == ns.VarSum() { - continue + goto sleep } - vars, err := ns.Vars() + vars, err = ns.Vars() if err != nil { log.Log(logger.Error, pkg+"netSender failed to get vars", "error", err.Error()) time.Sleep(netSendRetryTime) @@ -324,6 +327,7 @@ func run(cfg revid.Config) error { } ns.SetMode(paused, &vs) } + sleep: sleepTime, _ := strconv.Atoi(ns.Param("mp")) time.Sleep(time.Duration(sleepTime) * time.Second) } diff --git a/revid/revid.go b/revid/revid.go index e4c7dbbe..d0f3abf6 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -582,6 +582,7 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } + r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts From 1cdbfa2c66d2ccae05cf86ed9e81e8d6a4bae9c1 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 17:14:19 +1030 Subject: [PATCH 51/58] cmd/revid-cli: setting mode to paused if ns is in burst mode. --- cmd/revid-cli/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 1eafaae9..b33ef124 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -282,7 +282,7 @@ func run(cfg revid.Config) error { } if ns.Mode() == burst { - ns.SetMode(normal, &vs) + ns.SetMode(paused, &vs) } for { From bd2958ba4e0ccc62fe8c5af4d97cf7951bfcaa59 Mon Sep 17 00:00:00 2001 From: saxon Date: Mon, 4 Feb 2019 19:14:02 +1030 Subject: [PATCH 52/58] cmd/revid-cli & revid: added TODO for the use of Run() instead of send in cmd/revid-cli/main.go. Fixed filename in revid/revid.go file header. Renamed ret to cfg in revid.Config(). Catching error from call to revid.Stop() in revid.Udate() --- cmd/revid-cli/main.go | 1 + revid/revid.go | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index b33ef124..29191d3a 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -286,6 +286,7 @@ func run(cfg revid.Config) error { } for { + // TODO(saxon): replace this call with call to ns.Run(). if err := send(&ns, rv); err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) diff --git a/revid/revid.go b/revid/revid.go index d0f3abf6..a3c10b66 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -1,6 +1,6 @@ /* NAME - r.go + revid.go DESCRIPTION See Readme.md @@ -329,9 +329,9 @@ func (r *Revid) IsRunning() bool { func (r *Revid) Config() Config { r.mu.Lock() - ret := r.config + cfg := r.config r.mu.Unlock() - return ret + return cfg } // setIsRunning sets r.isRunning using b. @@ -379,7 +379,9 @@ func (r *Revid) Stop() error { func (r *Revid) Update(vars map[string]string) error { if r.IsRunning() { - r.Stop() + if err := r.Stop(); err != nil { + return err + } } //look through the vars and update revid where needed for key, value := range vars { From 35344402b848ec34a81260002e0394a7452fd225 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:32:16 +1030 Subject: [PATCH 53/58] cmd/revid-cli/main.go: not using closed scope conditions anymore --- cmd/revid-cli/main.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 29191d3a..52bb283b 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -253,14 +253,15 @@ func handleFlags() revid.Config { func run(cfg revid.Config) error { log.Log(logger.Info, pkg+"running in NetSender mode") - var err error var vars map[string]string // initialize NetSender and use NetSender's logger var ns netsender.Sender - if err := ns.Init(log, nil, nil, nil); err != nil { + err := ns.Init(log, nil, nil, nil) + if err != nil { return err } + vars, _ = ns.Vars() vs := ns.VarSum() @@ -270,7 +271,8 @@ func run(cfg revid.Config) error { } // Update revid to get latest config settings from netreceiver. - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } @@ -287,7 +289,8 @@ func run(cfg revid.Config) error { for { // TODO(saxon): replace this call with call to ns.Run(). - if err := send(&ns, rv); err != nil { + err = send(&ns, rv) + if err != nil { log.Log(logger.Error, pkg+"Run Failed. Retrying...", "error", err.Error()) time.Sleep(netSendRetryTime) continue @@ -306,24 +309,28 @@ func run(cfg revid.Config) error { } vs = ns.VarSum() - if err = rv.Update(vars); err != nil { + err = rv.Update(vars) + if err != nil { return err } switch ns.Mode() { case paused: case normal: - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } case burst: log.Log(logger.Info, pkg+"Starting burst...") - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } time.Sleep(time.Duration(rv.Config().BurstPeriod) * time.Second) log.Log(logger.Info, pkg+"Stopping burst...") - if err = rv.Stop(); err != nil { + err = rv.Stop() + if err != nil { return err } ns.SetMode(paused, &vs) From 4dcbd904499c4b51516cba0c90f3eebeef94df53 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:40:08 +1030 Subject: [PATCH 54/58] cmd/revid-cli: removed another closed scope condition --- cmd/revid-cli/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 52bb283b..f37144a2 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -278,7 +278,8 @@ func run(cfg revid.Config) error { // If mode on netreceiver isn't paused then we can start revid. if ns.Mode() != paused && ns.Mode() != burst { - if err = rv.Start(); err != nil { + err = rv.Start() + if err != nil { return err } } From ea8572a777a90b90bb3e19564402426337b73de5 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:45:15 +1030 Subject: [PATCH 55/58] cmd/revid-cli: catching error in conversion of mp --- cmd/revid-cli/main.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index f37144a2..1c2b9c90 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -337,7 +337,10 @@ func run(cfg revid.Config) error { ns.SetMode(paused, &vs) } sleep: - sleepTime, _ := strconv.Atoi(ns.Param("mp")) + sleepTime, err := strconv.Atoi(ns.Param("mp")) + if err != nil { + return err + } time.Sleep(time.Duration(sleepTime) * time.Second) } } From a4d179039b4086facc14cb5eb6b719d09bc0873c Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:49:05 +1030 Subject: [PATCH 56/58] revid/revid.go: removed default case in switch with revid.Update() --- revid/revid.go | 1 - 1 file changed, 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index a3c10b66..8c4b57ed 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -486,7 +486,6 @@ func (r *Revid) Update(vars map[string]string) error { break } r.config.BurstPeriod = uint(v) - default: } } From de4f471201bc370186924b21e955884066e1c1d3 Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 5 Feb 2019 10:50:21 +1030 Subject: [PATCH 57/58] revid/revid.go: defer r.wg.Done() in revid.outputClips routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 8c4b57ed..02656d03 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -495,6 +495,7 @@ func (r *Revid) Update(vars map[string]string) error { // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revid config func (r *Revid) outputClips() { + defer r.wg.Done() lastTime := time.Now() var count int loop: @@ -583,7 +584,6 @@ loop: r.config.Logger.Log(logger.Error, pkg+"failed to close output"+strconv.Itoa(i)+" destination", "error", err.Error()) } } - r.wg.Done() } // startRaspivid sets up things for input from raspivid i.e. starts From 9ef5886d669c59b4a0dc69caa82a7867b6828af0 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 7 Feb 2019 19:58:08 +1030 Subject: [PATCH 58/58] created experimentation dir under av, and moved flac package here. created experimentation dir under av, and moved flac pkg here. experimentation/flac: removed wav file --- {stream => experimentation}/flac/decode.go | 0 {stream => experimentation}/flac/flac_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {stream => experimentation}/flac/decode.go (100%) rename {stream => experimentation}/flac/flac_test.go (100%) diff --git a/stream/flac/decode.go b/experimentation/flac/decode.go similarity index 100% rename from stream/flac/decode.go rename to experimentation/flac/decode.go diff --git a/stream/flac/flac_test.go b/experimentation/flac/flac_test.go similarity index 100% rename from stream/flac/flac_test.go rename to experimentation/flac/flac_test.go