diff --git a/cmd/revid-cli/main.go b/cmd/revid-cli/main.go index 7e375992..5b826b91 100644 --- a/cmd/revid-cli/main.go +++ b/cmd/revid-cli/main.go @@ -102,8 +102,6 @@ func handleFlags() revid.Config { inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam") inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg") - output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp") - output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp") rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp") packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None") quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)") @@ -126,6 +124,9 @@ func handleFlags() revid.Config { configFilePtr = flag.String("ConfigFile", "", "NetSender config file") ) + var outputs flagStrings + flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)") + flag.Parse() log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller) @@ -167,40 +168,24 @@ func handleFlags() revid.Config { log.Log(logger.Error, pkg+"bad input codec argument") } - switch *output1Ptr { - case "File": - cfg.Output1 = revid.File - case "Http": - cfg.Output1 = revid.Http - case "Rtmp": - cfg.Output1 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp - case "Udp": - cfg.Output1 = revid.Udp - case "Rtp": - cfg.Output1 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 1 argument") - } - - switch *output2Ptr { - case "File": - cfg.Output2 = revid.File - case "Http": - cfg.Output2 = revid.Http - case "Rtmp": - cfg.Output2 = revid.Rtmp - case "FfmpegRtmp": - cfg.Output2 = revid.FfmpegRtmp - case "Udp": - cfg.Output2 = revid.Udp - case "Rtp": - cfg.Output2 = revid.Rtp - case "": - default: - log.Log(logger.Error, pkg+"bad output 2 argument") + for _, o := range outputs { + switch o { + case "File": + cfg.Outputs = append(cfg.Outputs, revid.File) + case "Http": + cfg.Outputs = append(cfg.Outputs, revid.Http) + case "Rtmp": + cfg.Outputs = append(cfg.Outputs, revid.Rtmp) + case "FfmpegRtmp": + cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp) + case "Udp": + cfg.Outputs = append(cfg.Outputs, revid.Udp) + case "Rtp": + cfg.Outputs = append(cfg.Outputs, revid.Rtp) + case "": + default: + log.Log(logger.Error, pkg+"bad output argument", "arg", o) + } } switch *rtmpMethodPtr { @@ -380,15 +365,19 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m for key, value := range vars { switch key { case "Output": + // FIXME(kortschak): There can be only one! + // How do we specify outputs after the first? + // + // Maybe we shouldn't be doing this! switch value { case "File": - cfg.Output1 = revid.File + cfg.Outputs[0] = revid.File case "Http": - cfg.Output1 = revid.Http + cfg.Outputs[0] = revid.Http case "Rtmp": - cfg.Output1 = revid.Rtmp + cfg.Outputs[0] = revid.Rtmp case "FfmpegRtmp": - cfg.Output1 = revid.FfmpegRtmp + cfg.Outputs[0] = revid.FfmpegRtmp default: log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value) continue @@ -474,3 +463,28 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m return startRevid(ns, cfg) } + +// flagStrings implements an appending string set flag. +type flagStrings []string + +func (v *flagStrings) String() string { + if *v != nil { + return strings.Join(*v, ",") + } + return "" +} + +func (v *flagStrings) Set(s string) error { + if s == "" { + return nil + } + for _, e := range *v { + if e == s { + return nil + } + } + *v = append(*v, s) + return nil +} + +func (v *flagStrings) Get() interface{} { return *v } diff --git a/revid/cmd/h264-file-to-flv-rtmp/main.go b/revid/cmd/h264-file-to-flv-rtmp/main.go index 5f79df4a..4f7c9d7c 100644 --- a/revid/cmd/h264-file-to-flv-rtmp/main.go +++ b/revid/cmd/h264-file-to-flv-rtmp/main.go @@ -58,7 +58,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.Rtmp, + Outputs: []byte{revid.Rtmp}, RtmpMethod: revid.LibRtmp, RtmpUrl: *rtmpUrlPtr, Packetization: revid.Flv, diff --git a/revid/cmd/h264-file-to-mpgets-file/main.go b/revid/cmd/h264-file-to-mpgets-file/main.go index 03a39fde..768f560b 100644 --- a/revid/cmd/h264-file-to-mpgets-file/main.go +++ b/revid/cmd/h264-file-to-mpgets-file/main.go @@ -50,7 +50,7 @@ func main() { Input: revid.File, InputFileName: inputFile, InputCodec: revid.H264, - Output1: revid.File, + Outputs: []byte{revid.File}, OutputFileName: outputFile, Packetization: revid.Mpegts, Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller), diff --git a/revid/config.go b/revid/config.go index 304d3e64..dc9c5a8d 100644 --- a/revid/config.go +++ b/revid/config.go @@ -40,8 +40,7 @@ type Config struct { Input uint8 InputCodec uint8 - Output1 uint8 - Output2 uint8 + Outputs []uint8 RtmpMethod uint8 Packetization uint8 @@ -172,45 +171,33 @@ func (c *Config) Validate(r *Revid) error { return errors.New("bad input codec defined in config") } - switch c.Output1 { - case File: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output1 = Http - break + for i, o := range c.Outputs { + switch o { + case File: + case Udp: + case Rtmp, FfmpegRtmp: + if c.RtmpUrl == "" { + c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") + c.Outputs[i] = Http + // FIXME(kortschak): Does this want the same line as below? + // c.FramesPerClip = httpFramesPerClip + break + } + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", + "framesPerClip", defaultFramesPerClip) + c.FramesPerClip = defaultFramesPerClip + case NothingDefined: + c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", + defaultOutput) + c.Outputs[i] = defaultOutput + fallthrough + case Http, Rtp: + c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", + "framesPerClip", httpFramesPerClip) + c.FramesPerClip = httpFramesPerClip + default: + return errors.New("bad output type defined in config") } - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out", - "framesPerClip", defaultFramesPerClip) - c.FramesPerClip = defaultFramesPerClip - case NothingDefined: - c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output", - defaultOutput) - c.Output1 = defaultOutput - fallthrough - case Http, Rtp: - c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out", - "framesPerClip", httpFramesPerClip) - c.FramesPerClip = httpFramesPerClip - default: - return errors.New("bad output type defined in config") - } - - switch c.Output2 { - case File: - case Rtp: - case Udp: - case Rtmp, FfmpegRtmp: - if c.RtmpUrl == "" { - c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP") - c.Output2 = Http - break - } - case NothingDefined: - case Http: - default: - return errors.New("bad output2 type defined in config") } if c.FramesPerClip < 1 { diff --git a/revid/revid.go b/revid/revid.go index ea1c0adc..6833ec2b 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -37,6 +37,7 @@ import ( "os/exec" "strconv" "strings" + "sync" "time" "bitbucket.org/ausocean/av/stream" @@ -119,7 +120,10 @@ type Revid struct { bitrate int // isRunning is a loaded and cocked foot-gun. + mu sync.Mutex isRunning bool + + err chan error } // packer takes data segments and packs them into clips @@ -157,8 +161,15 @@ func (p *packer) Write(frame []byte) (int, error) { return n, err } p.packetCount++ + var hasRtmp bool + for _, d := range p.owner.config.Outputs { + if d == Rtmp { + hasRtmp = true + break + } + } now := time.Now() - if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp { + if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) { p.owner.buffer.Flush() p.packetCount = 0 p.lastTime = now @@ -169,16 +180,35 @@ func (p *packer) Write(frame []byte) (int, error) { // New returns a pointer to a new Revid with the desired configuration, and/or // an error if construction of the new instance was not successful. func New(c Config, ns *netsender.Sender) (*Revid, error) { - r := Revid{ns: ns} + r := Revid{ns: ns, err: make(chan error)} r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout) r.packer.owner = &r err := r.reset(c) if err != nil { return nil, err } + go r.handleErrors() return &r, nil } +// TODO(Saxon): put more thought into error severity. +func (r *Revid) handleErrors() { + for { + err := <-r.err + if err != nil { + r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) + err = r.Stop() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error()) + } + err = r.Start() + if err != nil { + r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error()) + } + } + } +} + // Bitrate returns the result of the most recent bitrate check. func (r *Revid) Bitrate() int { return r.bitrate @@ -203,40 +233,35 @@ func (r *Revid) reset(config Config) error { } } - n := 1 - if r.config.Output2 != 0 && r.config.Output2 != Rtp { - n = 2 - } - r.destination = make([]loadSender, n) - - for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} { - switch outType { + r.destination = r.destination[:0] + for _, typ := range r.config.Outputs { + switch typ { case File: s, err := newFileSender(config.OutputFileName) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case FfmpegRtmp: s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate)) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtmp: s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Http: - r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log) + r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log)) case Udp: s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log) if err != nil { return err } - r.destination[outNo] = s + r.destination = append(r.destination, s) case Rtp: r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate) if err != nil { @@ -293,20 +318,30 @@ func (r *Revid) reset(config Config) error { return nil } -// IsRunning returns whether the receiver is running. +// IsRunning returns true if revid is running. func (r *Revid) IsRunning() bool { - return r.isRunning + r.mu.Lock() + ret := r.isRunning + r.mu.Unlock() + return ret +} + +// setIsRunning sets revid.isRunning using b. +func (r *Revid) setIsRunning(b bool) { + r.mu.Lock() + r.isRunning = b + r.mu.Unlock() } // Start invokes a Revid to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *Revid) Start() error { - if r.isRunning { + if r.IsRunning() { return errors.New(pkg + "start called but revid is already running") } r.config.Logger.Log(logger.Info, pkg+"starting Revid") r.config.Logger.Log(logger.Debug, pkg+"setting up output") - r.isRunning = true + r.setIsRunning(true) r.config.Logger.Log(logger.Info, pkg+"starting output routine") go r.outputClips() r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content") @@ -316,12 +351,12 @@ func (r *Revid) Start() error { // Stop halts any processing of video data from a camera or file func (r *Revid) Stop() error { - if !r.isRunning { + if !r.IsRunning() { return errors.New(pkg + "stop called but revid is already stopped") } r.config.Logger.Log(logger.Info, pkg+"stopping revid") - r.isRunning = false + r.setIsRunning(false) r.config.Logger.Log(logger.Info, pkg+"killing input proccess") // If a cmd process is running, we kill! @@ -337,7 +372,7 @@ func (r *Revid) outputClips() { lastTime := time.Now() var count int loop: - for r.isRunning { + for r.IsRunning() { // If the ring buffer has something we can read and send off chunk, err := r.buffer.Next(readTimeout) switch err { @@ -381,7 +416,7 @@ loop: err = rs.restart() if err != nil { r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error()) - r.isRunning = false + r.setIsRunning(false) return } @@ -472,11 +507,8 @@ func (r *Revid) startRaspivid() error { r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - delay := time.Second / time.Duration(r.config.FrameRate) - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go r.processFrom(stdout, 0) + return nil } func (r *Revid) startV4L() error { @@ -512,7 +544,6 @@ func (r *Revid) startV4L() error { r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " ")) r.cmd = exec.Command("ffmpeg", args...) - delay := time.Second / time.Duration(r.config.FrameRate) stdout, err := r.cmd.StdoutPipe() if err != nil { return err @@ -524,15 +555,12 @@ func (r *Revid) startV4L() error { return err } - r.config.Logger.Log(logger.Info, pkg+"reading camera data") - err = r.lexTo(r.encoder, stdout, delay) - r.config.Logger.Log(logger.Info, pkg+"finished reading camera data") - return err + go r.processFrom(stdout, time.Duration(0)) + return nil } // setupInputForFile sets things up for getting input from a file func (r *Revid) setupInputForFile() error { - delay := time.Second / time.Duration(r.config.FrameRate) f, err := os.Open(r.config.InputFileName) if err != nil { r.config.Logger.Log(logger.Error, err.Error()) @@ -542,5 +570,12 @@ func (r *Revid) setupInputForFile() error { defer f.Close() // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. - return r.lexTo(r.encoder, f, delay) + go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate)) + return nil +} + +func (r *Revid) processFrom(read io.Reader, delay time.Duration) { + r.config.Logger.Log(logger.Info, pkg+"reading input data") + r.err <- r.lexTo(r.encoder, read, delay) + r.config.Logger.Log(logger.Info, pkg+"finished reading input data") } diff --git a/rtmp/amf/amf.go b/rtmp/amf/amf.go index 837531bb..7ccfd932 100644 --- a/rtmp/amf/amf.go +++ b/rtmp/amf/amf.go @@ -51,7 +51,7 @@ import ( const ( typeNumber = 0x00 typeBoolean = 0x01 - typeString = 0x02 + TypeString = 0x02 TypeObject = 0x03 typeMovieClip = 0x04 TypeNull = 0x05 @@ -93,7 +93,7 @@ type Property struct { var ( ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short. ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder. - ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding. + ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding. ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found. ) @@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) { } // EncodeString encodes a string. +// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString. func EncodeString(buf []byte, val string) ([]byte, error) { const typeSize = 1 if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) { @@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) { } if len(val) < 65536 { - buf[0] = typeString + buf[0] = TypeString buf = buf[1:] binary.BigEndian.PutUint16(buf[:2], uint16(len(val))) buf = buf[2:] @@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) { return EncodeNumber(buf, prop.Number) case typeBoolean: return EncodeBoolean(buf, prop.Number != 0) - case typeString: + case TypeString: return EncodeString(buf, prop.String) case TypeNull: if len(buf) < 2 { @@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { prop.Number = float64(buf[0]) buf = buf[1:] - case typeString: + case TypeString: n := DecodeInt16(buf[:2]) if len(buf) < int(n+2) { return 0, ErrShortBuffer @@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) { } // Encode encodes an Object into its AMF representation. -// This is the top-level encoding function and is typically the only function callers will need to use. func Encode(obj *Object, buf []byte) ([]byte, error) { if len(buf) < 5 { return nil, ErrShortBuffer @@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) { // StringProperty is a wrapper for Property that returns a String property's value, if any. func (obj *Object) StringProperty(name string, idx int) (string, error) { - prop, err := obj.Property(name, idx, typeString) + prop, err := obj.Property(name, idx, TypeString) if err != nil { return "", err } diff --git a/rtmp/amf/amf_test.go b/rtmp/amf/amf_test.go index 59548c09..957e1c7e 100644 --- a/rtmp/amf/amf_test.go +++ b/rtmp/amf/amf_test.go @@ -58,7 +58,7 @@ func TestSanity(t *testing.T) { // TestStrings tests string encoding and decoding. func TestStrings(t *testing.T) { // Short string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:3] = size // enc[3:] = data for _, s := range testStrings { @@ -67,8 +67,8 @@ func TestStrings(t *testing.T) { if err != nil { t.Errorf("EncodeString failed") } - if buf[0] != typeString { - t.Errorf("Expected typeString, got %v", buf[0]) + if buf[0] != TypeString { + t.Errorf("Expected TypeString, got %v", buf[0]) } ds := DecodeString(buf[1:]) if s != ds { @@ -76,7 +76,7 @@ func TestStrings(t *testing.T) { } } // Long string encoding is as follows: - // enc[0] = data type (typeString) + // enc[0] = data type (TypeString) // end[1:5] = size // enc[5:] = data s := string(make([]byte, 65536)) @@ -148,7 +148,7 @@ func TestProperties(t *testing.T) { // Encode/decode string properties. enc = buf[:] for i := range testStrings { - enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc) + enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc) if err != nil { t.Errorf("EncodeProperty of string failed") } @@ -235,7 +235,7 @@ func TestObject(t *testing.T) { // Construct a more complicated object that includes a nested object. var obj2 Object for i := range testStrings { - obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]}) + obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]}) obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])}) } obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1}) diff --git a/rtmp/conn.go b/rtmp/conn.go index 2554d092..3864df98 100644 --- a/rtmp/conn.go +++ b/rtmp/conn.go @@ -121,20 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) { if err != nil { return nil, err } - if c.link.app == "" { - return nil, errInvalidURL - } - if c.link.port == 0 { - switch { - case (c.link.protocol & featureSSL) != 0: - c.link.port = 433 - c.log(FatalLevel, pkg+"SSL not supported") - case (c.link.protocol & featureHTTP) != 0: - c.link.port = 80 - default: - c.link.port = 1935 - } - } c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.protocol |= featureWrite diff --git a/rtmp/packet.go b/rtmp/packet.go index 3cf18f14..b28a1cba 100644 --- a/rtmp/packet.go +++ b/rtmp/packet.go @@ -81,7 +81,7 @@ const ( // 3: basic header (chunk type and stream ID) (1 byte) var headerSizes = [...]int{12, 8, 4, 1} -// packet defines an RTMP packet. +// packet represents an RTMP packet. type packet struct { headerType uint8 packetType uint8 @@ -90,7 +90,6 @@ type packet struct { timestamp uint32 streamID uint32 bodySize uint32 - bytesRead uint32 buf []byte body []byte } @@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error { pkt.timestamp = amf.DecodeInt24(header[:3]) if size >= 6 { pkt.bodySize = amf.DecodeInt24(header[3:6]) - pkt.bytesRead = 0 if size > 6 { pkt.packetType = header[6] @@ -201,25 +199,18 @@ func (pkt *packet) readFrom(c *Conn) error { hSize += 4 } - if pkt.bodySize > 0 && pkt.body == nil { - pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6) + pkt.resize(pkt.bodySize, pkt.headerType) + + if pkt.bodySize > c.inChunkSize { + c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize)) } - toRead := pkt.bodySize - pkt.bytesRead - chunkSize := c.inChunkSize - - if toRead < chunkSize { - chunkSize = toRead - } - - _, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize]) + _, err = c.read(pkt.body[:pkt.bodySize]) if err != nil { c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error()) return err } - pkt.bytesRead += uint32(chunkSize) - // Keep the packet as a reference for other packets on this channel. if c.channelsIn[pkt.channel] == nil { c.channelsIn[pkt.channel] = &packet{} @@ -237,15 +228,16 @@ func (pkt *packet) readFrom(c *Conn) error { c.channelTimestamp[pkt.channel] = int32(pkt.timestamp) c.channelsIn[pkt.channel].body = nil - c.channelsIn[pkt.channel].bytesRead = 0 c.channelsIn[pkt.channel].hasAbsTimestamp = false return nil } -// resize adjusts the packet's storage to accommodate a body of the given size and header type. +// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type. // When headerSizeAuto is specified, the header type is computed based on packet type. func (pkt *packet) resize(size uint32, ht uint8) { - pkt.buf = make([]byte, fullHeaderSize+size) + if cap(pkt.buf) < fullHeaderSize+int(size) { + pkt.buf = make([]byte, fullHeaderSize+size) + } pkt.body = pkt.buf[fullHeaderSize:] if ht != headerSizeAuto { pkt.headerType = ht @@ -407,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { return nil } } else { - // Send previously deferrd packet if combining it with the next one would exceed the chunk size. + // Send previously deferred packet if combining it with the next one would exceed the chunk size. if len(c.deferred)+size+hSize > chunkSize { c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred)) _, err := c.write(c.deferred) @@ -419,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error { } // TODO(kortschak): Rewrite this horrific peice of premature optimisation. - c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size) + c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr()) for size+hSize != 0 { if chunkSize > size { chunkSize = size diff --git a/rtmp/parseurl.go b/rtmp/parseurl.go index eae4277e..4fea7d82 100644 --- a/rtmp/parseurl.go +++ b/rtmp/parseurl.go @@ -41,7 +41,6 @@ import ( ) // parseURL parses an RTMP URL (ok, technically it is lexing). -// func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) { u, err := url.Parse(addr) if err != nil { @@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } elems := strings.SplitN(u.Path[1:], "/", 3) app = elems[0] + if app == "" { + return protocol, host, port, app, playpath, errInvalidURL + } playpath = elems[1] if len(elems) == 3 && len(elems[2]) != 0 { playpath = path.Join(elems[1:]...) @@ -97,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp } } + switch { + case port != 0: + case (protocol & featureSSL) != 0: + return protocol, host, port, app, playpath, errUnimplemented // port = 433 + case (protocol & featureHTTP) != 0: + port = 80 + default: + port = 1935 + } + return protocol, host, port, app, playpath, nil } diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index bfee3e37..6f666785 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -174,6 +174,13 @@ func connect(c *Conn) error { return err } c.log(DebugLevel, pkg+"connected") + + defer func() { + if err != nil { + c.link.conn.Close() + } + }() + err = handshake(c) if err != nil { c.log(WarnLevel, pkg+"handshake failed", "error", err.Error()) @@ -185,12 +192,14 @@ func connect(c *Conn) error { c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error()) return err } + c.log(DebugLevel, pkg+"negotiating") + var buf [256]byte for !c.isPlaying { - pkt := packet{} + pkt := packet{buf: buf[:]} err = pkt.readFrom(c) if err != nil { - break + return err } switch pkt.packetType { @@ -199,14 +208,10 @@ func connect(c *Conn) error { default: err = handlePacket(c, &pkt) if err != nil { - break + return err } } } - - if !c.isPlaying { - return err - } return nil } @@ -276,26 +281,18 @@ func sendConnectPacket(c *Conn) error { return err } - enc[0] = amf.TypeObject - enc = enc[1:] - enc, err = amf.EncodeNamedString(enc, avApp, c.link.app) - if err != nil { - return err + // required link info + info := amf.Object{Properties: []amf.Property{ + amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app}, + amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate}, + amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}}, } - enc, err = amf.EncodeNamedString(enc, avType, avNonprivate) - if err != nil { - return err - } - enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url) - if err != nil { - return err - } - enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd) + enc, err = amf.Encode(&info, enc) if err != nil { return err } - // add auth string, if any + // optional link auth info if c.link.auth != "" { enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0) if err != nil { diff --git a/rtmp/rtmp_test.go b/rtmp/rtmp_test.go index 618f2b95..be4908a6 100644 --- a/rtmp/rtmp_test.go +++ b/rtmp/rtmp_test.go @@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) { } defer f.Close() + rs := &rtmpSender{conn: c} // Pass RTMP session, true for audio, true for video, and 25 FPS - flvEncoder, err := flv.NewEncoder(c, true, true, 25) + flvEncoder, err := flv.NewEncoder(rs, true, true, 25) if err != nil { t.Fatalf("failed to create encoder: %v", err) }