From 92ba9c89a275868e0bc295cd1dcca1a6a526cb70 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 19 Jan 2019 19:39:43 +1030 Subject: [PATCH 01/25] cmd/revid-cli,revid: use a slice for output destination flags --- cmd/revid-cli/main.go | 94 +++++++++++++--------- revid/cmd/h264-file-to-flv-rtmp/main.go | 2 +- revid/cmd/h264-file-to-mpgets-file/main.go | 2 +- revid/config.go | 67 +++++++-------- revid/revid.go | 18 +++-- 5 files changed, 93 insertions(+), 90 deletions(-) diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 7e375992..5b826b91 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -102,8 +102,6 @@ func handleFlags() revid.Config { inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") - output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp") - output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -126,6 +124,9 @@ func handleFlags() revid.Config { configFilePtr = flag.String("ConfigFile", "", "NetSender config file") ) + var outputs flagStrings + flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)") + flag.Parse() log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) @@ -167,40 +168,24 @@ func handleFlags() revid.Config { log.Log(logger.Error, pkg+"bad input codec argument") } - switch *output1Ptr { - case "File": - cfg.Output1 = revid.File - case "Http": - cfg.Output1 = revid.Http - case "Rtmp": - cfg.Output1 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp - case "Udp": - cfg.Output1 = revid.Udp - case "Rtp": - cfg.Output1 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 1 argument") - } - - switch *output2Ptr { - case "File": - cfg.Output2 = revid.File - case "Http": - cfg.Output2 = revid.Http - case "Rtmp": - cfg.Output2 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output2 = revid.FfmpegRtmp - case "Udp": - cfg.Output2 = revid.Udp - case "Rtp": - cfg.Output2 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 2 argument") + for _, o := range outputs { + switch o { + case "File": + cfg.Outputs = append(cfg.Outputs, revid.File) + case "Http": + cfg.Outputs = append(cfg.Outputs, revid.Http) + case "Rtmp": + cfg.Outputs = append(cfg.Outputs, revid.Rtmp) + case "FfmpegRtmp": + cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp) + case "Udp": + cfg.Outputs = append(cfg.Outputs, revid.Udp) + case "Rtp": + cfg.Outputs = append(cfg.Outputs, revid.Rtp) + case "": + default: + log.Log(logger.Error, pkg+"bad output argument", "arg", o) + } } switch *rtmpMethodPtr { @@ -380,15 +365,19 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m for key, value := range vars { switch key { case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Output1 = revid.File + cfg.Outputs[0] = revid.File case "Http": - cfg.Output1 = revid.Http + cfg.Outputs[0] = revid.Http case "Rtmp": - cfg.Output1 = revid.Rtmp + cfg.Outputs[0] = revid.Rtmp case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp + cfg.Outputs[0] = revid.FfmpegRtmp default: log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue @@ -474,3 +463,28 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m return startRevid(ns, cfg) } + +// flagStrings implements an appending string set flag. +type flagStrings []string + +func (v *flagStrings) String() string { + if *v != nil { + return strings.Join(*v, ",") + } + return "" +} + +func (v *flagStrings) Set(s string) error { + if s == "" { + return nil + } + for _, e := range *v { + if e == s { + return nil + } + } + *v = append(*v, s) + return nil +} + +func (v *flagStrings) Get() interface{} { return *v } diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go index 5f79df4a..4f7c9d7c 100644 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ b/revid/cmd/h264-file-to-flv-rtmp/main.go @@ -58,7 +58,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.Rtmp, + Outputs: []byte{revid.Rtmp}, RtmpMethod: revid.LibRtmp, RtmpUrl: *rtmpUrlPtr, Packetization: revid.Flv, diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go index 03a39fde..768f560b 100644 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ b/revid/cmd/h264-file-to-mpgets-file/main.go @@ -50,7 +50,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.File, + Outputs: []byte{revid.File}, OutputFileName: outputFile, Packetization: revid.Mpegts, Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), diff --git a/revid/config.go b/revid/config.go index 304d3e64..dc9c5a8d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -40,8 +40,7 @@ type Config struct { Input uint8 InputCodec uint8 - Output1 uint8 - Output2 uint8 + Outputs []uint8 RtmpMethod uint8 Packetization uint8 @@ -172,45 +171,33 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output1 { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output1 = Http - break + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", + "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + case NothingDefined: + c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", + defaultOutput) + c.Outputs[i] = defaultOutput + fallthrough + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", + "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - case NothingDefined: - c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Output1 = defaultOutput - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - default: - return errors.New("bad output type defined in config") - } - - switch c.Output2 { - case File: - case Rtp: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output2 = Http - break - } - case NothingDefined: - case Http: - default: - return errors.New("bad output2 type defined in config") } if c.FramesPerClip < 1 { diff --git a/revid/revid.go b/revid/revid.go index ea1c0adc..a15fab3a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -157,8 +157,15 @@ func (p *packer) Write(frame []byte) (int, error) { return n, err } p.packetCount++ + var hasRtmp bool + for _, d := range p.owner.config.Outputs { + if d == Rtmp { + hasRtmp = true + break + } + } now := time.Now() - if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { + if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { p.owner.buffer.Flush() p.packetCount = 0 p.lastTime = now @@ -203,13 +210,8 @@ func (r *Revid) reset(config Config) error { } } - n := 1 - if r.config.Output2 != 0 && r.config.Output2 != Rtp { - n = 2 - } - r.destination = make([]loadSender, n) - - for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} { + r.destination = make([]loadSender, len(r.config.Outputs)) + for outNo, outType := range r.config.Outputs { switch outType { case File: s, err := newFileSender(config.OutputFileName) From adfb87dcf972c9933a06b70c50ee73af8f3886d1 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:30:24 +1030 Subject: [PATCH 02/25] TypeString now exported (was typeString). --- rtmp/amf/amf.go | 14 +++++++------- rtmp/amf/amf_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go index 837531bb..7ccfd932 100644 --- a/rtmp/amf/amf.go +++ b/rtmp/amf/amf.go @@ -51,7 +51,7 @@ import ( const ( typeNumber = 0x00 typeBoolean = 0x01 - typeString = 0x02 + TypeString = 0x02 TypeObject = 0x03 typeMovieClip = 0x04 TypeNull = 0x05 @@ -93,7 +93,7 @@ type Property struct { var ( ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. - ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ) @@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) { } // EncodeString encodes a string. +// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString. func EncodeString(buf []byte, val string) ([]byte, error) { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { @@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) { } if len(val) < 65536 { - buf[0] = typeString + buf[0] = TypeString buf = buf[1:] binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) buf = buf[2:] @@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { return EncodeNumber(buf, prop.Number) case typeBoolean: return EncodeBoolean(buf, prop.Number != 0) - case typeString: + case TypeString: return EncodeString(buf, prop.String) case TypeNull: if len(buf) < 2 { @@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { prop.Number = float64(buf[0]) buf = buf[1:] - case typeString: + case TypeString: n := DecodeInt16(buf[:2]) if len(buf) < int(n+2) { return 0, ErrShortBuffer @@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { } // Encode encodes an Object into its AMF representation. -// This is the top-level encoding function and is typically the only function callers will need to use. func Encode(obj *Object, buf []byte) ([]byte, error) { if len(buf) < 5 { return nil, ErrShortBuffer @@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) { // StringProperty is a wrapper for Property that returns a String property's value, if any. func (obj *Object) StringProperty(name string, idx int) (string, error) { - prop, err := obj.Property(name, idx, typeString) + prop, err := obj.Property(name, idx, TypeString) if err != nil { return "", err } diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go index 59548c09..957e1c7e 100644 --- a/rtmp/amf/amf_test.go +++ b/rtmp/amf/amf_test.go @@ -58,7 +58,7 @@ func TestSanity(t *testing.T) { // TestStrings tests string encoding and decoding. func TestStrings(t *testing.T) { // Short string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:3] = size // enc[3:] = data for _, s := range testStrings { @@ -67,8 +67,8 @@ func TestStrings(t *testing.T) { if err != nil { t.Errorf("EncodeString failed") } - if buf[0] != typeString { - t.Errorf("Expected typeString, got %v", buf[0]) + if buf[0] != TypeString { + t.Errorf("Expected TypeString, got %v", buf[0]) } ds := DecodeString(buf[1:]) if s != ds { @@ -76,7 +76,7 @@ func TestStrings(t *testing.T) { } } // Long string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:5] = size // enc[5:] = data s := string(make([]byte, 65536)) @@ -148,7 +148,7 @@ func TestProperties(t *testing.T) { // Encode/decode string properties. enc = buf[:] for i := range testStrings { - enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc) if err != nil { t.Errorf("EncodeProperty of string failed") } @@ -235,7 +235,7 @@ func TestObject(t *testing.T) { // Construct a more complicated object that includes a nested object. var obj2 Object for i := range testStrings { - obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) } obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) From 42be87d98a26684b2a40e51fadded6e3bd705587 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:33:43 +1030 Subject: [PATCH 03/25] sendConnectPacket() now encodes required link info in one go using amf.Encode(). --- rtmp/rtmp.go | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index bfee3e37..1175e8ce 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -276,26 +276,18 @@ func sendConnectPacket(c *Conn) error { return err } - enc[0] = amf.TypeObject - enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) - if err != nil { - return err + // required link info + info := amf.Object{Properties: []amf.Property{ + amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app}, + amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate}, + amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}}, } - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } - enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) - if err != nil { - return err - } - enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + enc, err = amf.Encode(&info, enc) if err != nil { return err } - // add auth string, if any + // optional link auth info if c.link.auth != "" { enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { From 828cc3780a9005282a5bbaa7a309747870104994 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 07:56:55 +1030 Subject: [PATCH 04/25] Removed superfluous packet.bytesRead. --- rtmp/packet.go | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 3cf18f14..642e6194 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -81,7 +81,7 @@ const ( // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} -// packet defines an RTMP packet. +// packet represents an RTMP packet. type packet struct { headerType uint8 packetType uint8 @@ -90,7 +90,6 @@ type packet struct { timestamp uint32 streamID uint32 bodySize uint32 - bytesRead uint32 buf []byte body []byte } @@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) - pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] @@ -205,21 +203,17 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - toRead := pkt.bodySize - pkt.bytesRead - chunkSize := c.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead + sz := c.inChunkSize + if pkt.bodySize < sz { + sz = pkt.bodySize } - _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[:sz]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } - pkt.bytesRead += uint32(chunkSize) - // Keep the packet as a reference for other packets on this channel. if c.channelsIn[pkt.channel] == nil { c.channelsIn[pkt.channel] = &packet{} @@ -237,7 +231,6 @@ func (pkt *packet) readFrom(c *Conn) error { c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelsIn[pkt.channel].body = nil - c.channelsIn[pkt.channel].bytesRead = 0 c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } From ddd1e4ab1789fe5a66577f3741e80bfbd393155b Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:09:49 +1030 Subject: [PATCH 05/25] readFrom() now logs a warning if it is reading a large packet (which I suspect never happens). --- rtmp/packet.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 642e6194..0f3ae241 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -203,12 +203,11 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) } - sz := c.inChunkSize - if pkt.bodySize < sz { - sz = pkt.bodySize + if pkt.bodySize > c.inChunkSize { + c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) } - _, err = c.read(pkt.body[:sz]) + _, err = c.read(pkt.body[:pkt.bodySize]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err From 969e2f4fa940719e14924faac0ec358066d7fb46 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:41:32 +1030 Subject: [PATCH 06/25] packet.resize() now only makes a new buf when necessary. --- rtmp/packet.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index 0f3ae241..a92f7022 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -199,9 +199,7 @@ func (pkt *packet) readFrom(c *Conn) error { hSize += 4 } - if pkt.bodySize > 0 && pkt.body == nil { - pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) - } + pkt.resize(pkt.bodySize, pkt.headerType) if pkt.bodySize > c.inChunkSize { c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) @@ -234,10 +232,12 @@ func (pkt *packet) readFrom(c *Conn) error { return nil } -// resize adjusts the packet's storage to accommodate a body of the given size and header type. +// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - pkt.buf = make([]byte, fullHeaderSize+size) + if cap(pkt.buf) < fullHeaderSize+int(size) { + pkt.buf = make([]byte, fullHeaderSize+size) + } pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht From fbad21bc76af71deca65e924229c6ef643130064 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 08:52:58 +1030 Subject: [PATCH 07/25] connect() now constructs its packet with a buf to avoid needless packet resizing later. --- rtmp/rtmp.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 1175e8ce..a6897eef 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -186,8 +186,10 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"negotiating") + + var buf [256]byte for !c.isPlaying { - pkt := packet{} + pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { break From 04ad1fec257ea38e9ad03eeeb0c17f6d16631080 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:20:56 +1030 Subject: [PATCH 08/25] Log packet size before la and ra. --- rtmp/packet.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rtmp/packet.go b/rtmp/packet.go index a92f7022..b28a1cba 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -399,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { return nil } } else { - // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + // Send previously deferred packet if combining it with the next one would exceed the chunk size. if len(c.deferred)+size+hSize > chunkSize { c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) _, err := c.write(c.deferred) @@ -411,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) for size+hSize != 0 { if chunkSize > size { chunkSize = size From a362d1d2abb6b02298b572ae793d7e10ab016a52 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:50:51 +1030 Subject: [PATCH 09/25] connect() now closes c.link.conn upon an error. --- rtmp/rtmp.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index a6897eef..3352558a 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -174,6 +174,13 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"connected") + + defer func() { + if err != nil { + c.link.conn.Close() + } + }() + err = handshake(c) if err != nil { c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) From efe40a6778e563f9e01447b8ebe015c362f48927 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 09:59:28 +1030 Subject: [PATCH 10/25] Move test for empty RTMP 'app' into parseURL(). --- rtmp/conn.go | 3 --- rtmp/parseurl.go | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 2554d092..1db2cb6e 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,9 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.app == "" { - return nil, errInvalidURL - } if c.link.port == 0 { switch { case (c.link.protocol & featureSSL) != 0: diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index eae4277e..ad3409b6 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -41,7 +41,6 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). -// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { @@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } elems := strings.SplitN(u.Path[1:], "/", 3) app = elems[0] + if app == "" { + return protocol, host, port, app, playpath, errInvalidURL + } playpath = elems[1] if len(elems) == 3 && len(elems[2]) != 0 { playpath = path.Join(elems[1:]...) From 89d9bf3eef7b91ae9d98f1e0bbafd5b0aa125926 Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 10:28:05 +1030 Subject: [PATCH 11/25] Return errors from the loop in connect() rather than breaking. --- rtmp/rtmp.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index 3352558a..6f666785 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -192,14 +192,14 @@ func connect(c *Conn) error { c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } - c.log(DebugLevel, pkg+"negotiating") + c.log(DebugLevel, pkg+"negotiating") var buf [256]byte for !c.isPlaying { pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { - break + return err } switch pkt.packetType { @@ -208,14 +208,10 @@ func connect(c *Conn) error { default: err = handlePacket(c, &pkt) if err != nil { - break + return err } } } - - if !c.isPlaying { - return err - } return nil } From 0105c76dc5e029d47f0d858805bfcbd12569bb2a Mon Sep 17 00:00:00 2001 From: scruzin Date: Sun, 20 Jan 2019 13:33:44 +1030 Subject: [PATCH 12/25] Move port defaulting logic to parseURL(). --- rtmp/conn.go | 11 ----------- rtmp/parseurl.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/rtmp/conn.go b/rtmp/conn.go index 1db2cb6e..3864df98 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,17 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.protocol |= featureWrite diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index ad3409b6..c2880368 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } + if port == 0 { + switch { + case (protocol & featureSSL) != 0: + port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 + } + } return protocol, host, port, app, playpath, nil } From 3a70dc6ddc447a8549a1c086aa658aa08fdfafc6 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:38:27 +1030 Subject: [PATCH 13/25] parseURL() now returns errUnimplemented for SSL. --- rtmp/parseurl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index c2880368..cc197fb9 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -103,6 +103,7 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp switch { case (protocol & featureSSL) != 0: port = 433 + return errUnimplemented case (protocol & featureHTTP) != 0: port = 80 default: From 2697dcf51520bb1b33ceb962e0d66f083aeca6e7 Mon Sep 17 00:00:00 2001 From: scruzin Date: Mon, 21 Jan 2019 10:57:40 +1030 Subject: [PATCH 14/25] More elegant switch in parseURL() for port logic. --- rtmp/parseurl.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index cc197fb9..4fea7d82 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -99,16 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } - if port == 0 { - switch { - case (protocol & featureSSL) != 0: - port = 433 - return errUnimplemented - case (protocol & featureHTTP) != 0: - port = 80 - default: - port = 1935 - } + switch { + case port != 0: + case (protocol & featureSSL) != 0: + return protocol, host, port, app, playpath, errUnimplemented // port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 } + return protocol, host, port, app, playpath, nil } From 2da8d2af85100797248dcd7a789777266a19e0de Mon Sep 17 00:00:00 2001 From: saxon Date: Tue, 22 Jan 2019 13:54:56 +1030 Subject: [PATCH 15/25] av/rtmp/rtmp_test.go: using local rtmpSender io.writer implemntation to handle errors from rtmp --- rtmp/rtmp_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 618f2b95..be4908a6 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) { } defer f.Close() + rs := &rtmpSender{conn: c} // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(c, true, true, 25) + flvEncoder, err := flv.NewEncoder(rs, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) } From 071e6fd0f7fe99d530f2f20e8ca241cf79a05587 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 29 Jan 2019 19:56:02 +1030 Subject: [PATCH 16/25] revid: honour difference between rtmp dst and others --- revid/revid.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index a15fab3a..5b37dc09 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -210,7 +210,7 @@ func (r *Revid) reset(config Config) error { } } - r.destination = make([]loadSender, len(r.config.Outputs)) + r.destination = r.destination[:0] for outNo, outType := range r.config.Outputs { switch outType { case File: @@ -218,19 +218,19 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Http: r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) case Udp: @@ -238,7 +238,7 @@ func (r *Revid) reset(config Config) error { if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtp: r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { From 58102e5587ad7cd1d6fe5f26576d390cad9a8465 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 30 Jan 2019 13:19:00 +1030 Subject: [PATCH 17/25] revid: fix http destination assignment --- revid/revid.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 5b37dc09..9de472e1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -211,8 +211,8 @@ func (r *Revid) reset(config Config) error { } r.destination = r.destination[:0] - for outNo, outType := range r.config.Outputs { - switch outType { + for _, typ := range r.config.Outputs { + switch typ { case File: s, err := newFileSender(config.OutputFileName) if err != nil { @@ -232,7 +232,7 @@ func (r *Revid) reset(config Config) error { } r.destination = append(r.destination, s) case Http: - r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { From ed4d97f893414b3d1f19edc305e4e82c382eaf73 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:03:50 +1030 Subject: [PATCH 18/25] stream/mts: patch for revid.Start() no exit bug --- revid/revid.go | 51 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 9de472e1..df1e111a 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,6 +120,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + + err chan error } // packer takes data segments and packs them into clips @@ -176,13 +178,29 @@ func (p *packer) Write(frame []byte) (int, error) { // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - r := Revid{ns: ns} + r := Revid{ns: ns, err: make(chan error)} r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } + go func() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) + } + } + } + }() return &r, nil } @@ -312,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - err := r.setupInput() - return err + go r.setupInput() + return nil } // Stop halts any processing of video data from a camera or file @@ -474,11 +492,13 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + delay := time.Second / time.Duration(r.config.FrameRate) + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } func (r *Revid) startV4L() error { @@ -526,10 +546,12 @@ func (r *Revid) startV4L() error { return err } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go func() { + r.config.Logger.Log(logger.Info, pkg+"reading camera data") + r.err <- r.lexTo(r.encoder, stdout, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") + }() + return nil } // setupInputForFile sets things up for getting input from a file @@ -544,5 +566,8 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.lexTo(r.encoder, f, delay) + go func() { + r.err <- r.lexTo(r.encoder, f, delay) + }() + return nil } From d53eafe215f430a480199ec9604b12b0276f83a3 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:15:38 +1030 Subject: [PATCH 19/25] revid/revid.go: not running r.setupInput() as routine - now getting error an returning --- revid/revid.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index df1e111a..febb05c1 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -330,8 +330,8 @@ func (r *Revid) Start() error { r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") - go r.setupInput() - return nil + err := r.setupInput() + return err } // Stop halts any processing of video data from a camera or file From e18b1f51f0d1557d0a5a56cd119bf2f31c9c5467 Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 15:49:44 +1030 Subject: [PATCH 20/25] revid/revid.go: capture error from r.Start() in error handling routine --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index febb05c1..2b146bf4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -194,7 +194,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) } - r.Start() + err = r.Start() if err != nil { r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } From 1e307fc37b4b971310466f5f204d3e08c24d539d Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:42:20 +1030 Subject: [PATCH 21/25] revid/revid.go: made routines named rather than anonymous --- revid/revid.go | 55 +++++++++++++++++++++++--------------------------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 2b146bf4..c8f6d217 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -185,23 +185,25 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { if err != nil { return nil, err } - go func() { - for { - err := <-r.err + go r.handleErrors() + return &r, nil +} + +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() if err != nil { - r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) - err = r.Stop() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) - } - err = r.Start() - if err != nil { - r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) - } + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + err = r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) } } - }() - return &r, nil + } } // Bitrate returns the result of the most recent bitrate check. @@ -492,12 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Duration(0)) return nil } @@ -534,7 +531,6 @@ func (r *Revid) startV4L() error { r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) - delay := time.Second / time.Duration(r.config.FrameRate) stdout, err := r.cmd.StdoutPipe() if err != nil { return err @@ -546,17 +542,12 @@ func (r *Revid) startV4L() error { return err } - go func() { - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - r.err <- r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - }() + go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { - delay := time.Second / time.Duration(r.config.FrameRate) f, err := os.Open(r.config.InputFileName) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) @@ -566,8 +557,12 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go func() { - r.err <- r.lexTo(r.encoder, f, delay) - }() + go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) return nil } + +func (r *Revid) lex(read io.Reader, delay time.Duration) { + r.config.Logger.Log(logger.Info, pkg+"reading input data") + r.err <- r.lexTo(r.encoder, read, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading input data") +} From 051263c144c195d15bea214327bbf259385fdc3c Mon Sep 17 00:00:00 2001 From: saxon Date: Thu, 31 Jan 2019 19:58:25 +1030 Subject: [PATCH 22/25] revid/revid.go: revid.lex to revid.transcode --- revid/revid.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index c8f6d217..6d52be59 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -494,7 +494,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.lex(stdout, time.Duration(0)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -542,7 +542,7 @@ func (r *Revid) startV4L() error { return err } - go r.lex(stdout, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(stdout, time.Duration(0)) return nil } @@ -557,11 +557,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.lex(f, time.Second/time.Duration(r.config.FrameRate)) + go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) lex(read io.Reader, delay time.Duration) { +func (r *Revid) transcode(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From d26aa8643aec5661ceca0ccc3866b1f090691dd7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 09:47:31 +1030 Subject: [PATCH 23/25] revid: renamed transcode to processFrom. Using mutex for isRunning flag. Created setIsRunning func to set state of isRunning. --- revid/revid.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index 6d52be59..f7313b57 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -121,6 +122,8 @@ type Revid struct { // isRunning is a loaded and cocked foot-gun. isRunning bool + mu sync.Mutex + err chan error } @@ -189,6 +192,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } +// TODO: put more thought into error severity func (r *Revid) handleErrors() { for { err := <-r.err @@ -317,18 +321,21 @@ func (r *Revid) reset(config Config) error { // IsRunning returns whether the receiver is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") @@ -338,12 +345,12 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! @@ -359,7 +366,7 @@ func (r *Revid) outputClips() { lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -403,7 +410,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -494,10 +501,16 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" @@ -542,7 +555,7 @@ func (r *Revid) startV4L() error { return err } - go r.transcode(stdout, time.Duration(0)) + go r.processFrom(stdout, time.Duration(0)) return nil } @@ -557,11 +570,11 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - go r.transcode(f, time.Second/time.Duration(r.config.FrameRate)) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) return nil } -func (r *Revid) transcode(read io.Reader, delay time.Duration) { +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { r.config.Logger.Log(logger.Info, pkg+"reading input data") r.err <- r.lexTo(r.encoder, read, delay) r.config.Logger.Log(logger.Info, pkg+"finished reading input data") From 9bddf343f589771be3f40e9ef6eebef0cbcd1de0 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:07:00 +1030 Subject: [PATCH 24/25] revid/revid.go: moved revid.mu declaration . Updated todo owner --- revid/revid.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/revid/revid.go b/revid/revid.go index f7313b57..3e658312 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -120,10 +120,9 @@ type Revid struct { bitrate int // isRunning is a loaded and cocked foot-gun. + mu sync.Mutex isRunning bool - mu sync.Mutex - err chan error } @@ -192,7 +191,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { return &r, nil } -// TODO: put more thought into error severity +// TODO(Saxon): put more thought into error severity. func (r *Revid) handleErrors() { for { err := <-r.err @@ -319,7 +318,7 @@ func (r *Revid) reset(config Config) error { return nil } -// IsRunning returns whether the receiver is running. +// IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { r.mu.Lock() ret := r.isRunning @@ -327,6 +326,13 @@ func (r *Revid) IsRunning() bool { return ret } +// setIsRunning sets revid.isRunning using b. +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() +} + // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { @@ -505,12 +511,6 @@ func (r *Revid) startRaspivid() error { return nil } -func (r *Revid) setIsRunning(b bool) { - r.mu.Lock() - r.isRunning = b - r.mu.Unlock() -} - func (r *Revid) startV4L() error { const defaultVideo = "/dev/video0" From c44d6bbfd3187cd51440186e1b571d9355dc74c7 Mon Sep 17 00:00:00 2001 From: saxon Date: Fri, 1 Feb 2019 10:08:49 +1030 Subject: [PATCH 25/25] revid/revid.go: not during time.Duration conversion --- revid/revid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/revid/revid.go b/revid/revid.go index 3e658312..6833ec2b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -507,7 +507,7 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - go r.processFrom(stdout, time.Duration(0)) + go r.processFrom(stdout, 0) return nil }