mirror of https://bitbucket.org/ausocean/av.git
Merge branch 'master' into metadata-refactor
This commit is contained in:
commit
bad68923e6
|
@ -102,8 +102,6 @@ func handleFlags() revid.Config {
|
|||
|
||||
inputPtr = flag.String("Input", "", "The input type: Raspivid, File, Webcam")
|
||||
inputCodecPtr = flag.String("InputCodec", "", "The codec of the input: H264, Mjpeg")
|
||||
output1Ptr = flag.String("Output1", "", "The first output type: Http, Rtmp, File, Udp, Rtp")
|
||||
output2Ptr = flag.String("Output2", "", "The second output type: Http, Rtmp, File, Udp, Rtp")
|
||||
rtmpMethodPtr = flag.String("RtmpMethod", "", "The method used to send over rtmp: Ffmpeg, Librtmp")
|
||||
packetizationPtr = flag.String("Packetization", "", "The method of data packetisation: Flv, Mpegts, None")
|
||||
quantizePtr = flag.Bool("Quantize", false, "Quantize input (non-variable bitrate)")
|
||||
|
@ -126,6 +124,9 @@ func handleFlags() revid.Config {
|
|||
configFilePtr = flag.String("ConfigFile", "", "NetSender config file")
|
||||
)
|
||||
|
||||
var outputs flagStrings
|
||||
flag.Var(&outputs, "Output", "output type: Http, Rtmp, File, Udp, Rtp (may be used more than once)")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
log = logger.New(defaultLogVerbosity, &smartlogger.New(*logPathPtr).LogRoller)
|
||||
|
@ -167,40 +168,24 @@ func handleFlags() revid.Config {
|
|||
log.Log(logger.Error, pkg+"bad input codec argument")
|
||||
}
|
||||
|
||||
switch *output1Ptr {
|
||||
for _, o := range outputs {
|
||||
switch o {
|
||||
case "File":
|
||||
cfg.Output1 = revid.File
|
||||
cfg.Outputs = append(cfg.Outputs, revid.File)
|
||||
case "Http":
|
||||
cfg.Output1 = revid.Http
|
||||
cfg.Outputs = append(cfg.Outputs, revid.Http)
|
||||
case "Rtmp":
|
||||
cfg.Output1 = revid.Rtmp
|
||||
cfg.Outputs = append(cfg.Outputs, revid.Rtmp)
|
||||
case "FfmpegRtmp":
|
||||
cfg.Output1 = revid.FfmpegRtmp
|
||||
cfg.Outputs = append(cfg.Outputs, revid.FfmpegRtmp)
|
||||
case "Udp":
|
||||
cfg.Output1 = revid.Udp
|
||||
cfg.Outputs = append(cfg.Outputs, revid.Udp)
|
||||
case "Rtp":
|
||||
cfg.Output1 = revid.Rtp
|
||||
cfg.Outputs = append(cfg.Outputs, revid.Rtp)
|
||||
case "":
|
||||
default:
|
||||
log.Log(logger.Error, pkg+"bad output 1 argument")
|
||||
log.Log(logger.Error, pkg+"bad output argument", "arg", o)
|
||||
}
|
||||
|
||||
switch *output2Ptr {
|
||||
case "File":
|
||||
cfg.Output2 = revid.File
|
||||
case "Http":
|
||||
cfg.Output2 = revid.Http
|
||||
case "Rtmp":
|
||||
cfg.Output2 = revid.Rtmp
|
||||
case "FfmpegRtmp":
|
||||
cfg.Output2 = revid.FfmpegRtmp
|
||||
case "Udp":
|
||||
cfg.Output2 = revid.Udp
|
||||
case "Rtp":
|
||||
cfg.Output2 = revid.Rtp
|
||||
case "":
|
||||
default:
|
||||
log.Log(logger.Error, pkg+"bad output 2 argument")
|
||||
}
|
||||
|
||||
switch *rtmpMethodPtr {
|
||||
|
@ -380,15 +365,19 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
|
|||
for key, value := range vars {
|
||||
switch key {
|
||||
case "Output":
|
||||
// FIXME(kortschak): There can be only one!
|
||||
// How do we specify outputs after the first?
|
||||
//
|
||||
// Maybe we shouldn't be doing this!
|
||||
switch value {
|
||||
case "File":
|
||||
cfg.Output1 = revid.File
|
||||
cfg.Outputs[0] = revid.File
|
||||
case "Http":
|
||||
cfg.Output1 = revid.Http
|
||||
cfg.Outputs[0] = revid.Http
|
||||
case "Rtmp":
|
||||
cfg.Output1 = revid.Rtmp
|
||||
cfg.Outputs[0] = revid.Rtmp
|
||||
case "FfmpegRtmp":
|
||||
cfg.Output1 = revid.FfmpegRtmp
|
||||
cfg.Outputs[0] = revid.FfmpegRtmp
|
||||
default:
|
||||
log.Log(logger.Warning, pkg+"invalid Output1 param", "value", value)
|
||||
continue
|
||||
|
@ -474,3 +463,28 @@ func updateRevid(ns *netsender.Sender, rv *revid.Revid, cfg revid.Config, vars m
|
|||
|
||||
return startRevid(ns, cfg)
|
||||
}
|
||||
|
||||
// flagStrings implements an appending string set flag.
|
||||
type flagStrings []string
|
||||
|
||||
func (v *flagStrings) String() string {
|
||||
if *v != nil {
|
||||
return strings.Join(*v, ",")
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (v *flagStrings) Set(s string) error {
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
for _, e := range *v {
|
||||
if e == s {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
*v = append(*v, s)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *flagStrings) Get() interface{} { return *v }
|
||||
|
|
|
@ -58,7 +58,7 @@ func main() {
|
|||
Input: revid.File,
|
||||
InputFileName: inputFile,
|
||||
InputCodec: revid.H264,
|
||||
Output1: revid.Rtmp,
|
||||
Outputs: []byte{revid.Rtmp},
|
||||
RtmpMethod: revid.LibRtmp,
|
||||
RtmpUrl: *rtmpUrlPtr,
|
||||
Packetization: revid.Flv,
|
||||
|
|
|
@ -50,7 +50,7 @@ func main() {
|
|||
Input: revid.File,
|
||||
InputFileName: inputFile,
|
||||
InputCodec: revid.H264,
|
||||
Output1: revid.File,
|
||||
Outputs: []byte{revid.File},
|
||||
OutputFileName: outputFile,
|
||||
Packetization: revid.Mpegts,
|
||||
Logger: logger.New(logger.Info, &smartlogger.New(logPath).LogRoller),
|
||||
|
|
|
@ -40,8 +40,7 @@ type Config struct {
|
|||
|
||||
Input uint8
|
||||
InputCodec uint8
|
||||
Output1 uint8
|
||||
Output2 uint8
|
||||
Outputs []uint8
|
||||
RtmpMethod uint8
|
||||
Packetization uint8
|
||||
|
||||
|
@ -172,13 +171,16 @@ func (c *Config) Validate(r *Revid) error {
|
|||
return errors.New("bad input codec defined in config")
|
||||
}
|
||||
|
||||
switch c.Output1 {
|
||||
for i, o := range c.Outputs {
|
||||
switch o {
|
||||
case File:
|
||||
case Udp:
|
||||
case Rtmp, FfmpegRtmp:
|
||||
if c.RtmpUrl == "" {
|
||||
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
|
||||
c.Output1 = Http
|
||||
c.Outputs[i] = Http
|
||||
// FIXME(kortschak): Does this want the same line as below?
|
||||
// c.FramesPerClip = httpFramesPerClip
|
||||
break
|
||||
}
|
||||
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for rtmp out",
|
||||
|
@ -187,7 +189,7 @@ func (c *Config) Validate(r *Revid) error {
|
|||
case NothingDefined:
|
||||
c.Logger.Log(logger.Warning, pkg+"no output defined, defaulting", "output",
|
||||
defaultOutput)
|
||||
c.Output1 = defaultOutput
|
||||
c.Outputs[i] = defaultOutput
|
||||
fallthrough
|
||||
case Http, Rtp:
|
||||
c.Logger.Log(logger.Info, pkg+"defaulting frames per clip for http out",
|
||||
|
@ -196,21 +198,6 @@ func (c *Config) Validate(r *Revid) error {
|
|||
default:
|
||||
return errors.New("bad output type defined in config")
|
||||
}
|
||||
|
||||
switch c.Output2 {
|
||||
case File:
|
||||
case Rtp:
|
||||
case Udp:
|
||||
case Rtmp, FfmpegRtmp:
|
||||
if c.RtmpUrl == "" {
|
||||
c.Logger.Log(logger.Info, pkg+"no RTMP URL: falling back to HTTP")
|
||||
c.Output2 = Http
|
||||
break
|
||||
}
|
||||
case NothingDefined:
|
||||
case Http:
|
||||
default:
|
||||
return errors.New("bad output2 type defined in config")
|
||||
}
|
||||
|
||||
if c.FramesPerClip < 1 {
|
||||
|
|
105
revid/revid.go
105
revid/revid.go
|
@ -37,6 +37,7 @@ import (
|
|||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"bitbucket.org/ausocean/av/stream"
|
||||
|
@ -119,7 +120,10 @@ type Revid struct {
|
|||
bitrate int
|
||||
|
||||
// isRunning is a loaded and cocked foot-gun.
|
||||
mu sync.Mutex
|
||||
isRunning bool
|
||||
|
||||
err chan error
|
||||
}
|
||||
|
||||
// packer takes data segments and packs them into clips
|
||||
|
@ -157,8 +161,15 @@ func (p *packer) Write(frame []byte) (int, error) {
|
|||
return n, err
|
||||
}
|
||||
p.packetCount++
|
||||
var hasRtmp bool
|
||||
for _, d := range p.owner.config.Outputs {
|
||||
if d == Rtmp {
|
||||
hasRtmp = true
|
||||
break
|
||||
}
|
||||
}
|
||||
now := time.Now()
|
||||
if (p.owner.config.Output1 != Rtmp && now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) || p.owner.config.Output1 == Rtmp {
|
||||
if hasRtmp || (now.Sub(p.lastTime) > clipDuration && p.packetCount%7 == 0) {
|
||||
p.owner.buffer.Flush()
|
||||
p.packetCount = 0
|
||||
p.lastTime = now
|
||||
|
@ -169,16 +180,35 @@ func (p *packer) Write(frame []byte) (int, error) {
|
|||
// New returns a pointer to a new Revid with the desired configuration, and/or
|
||||
// an error if construction of the new instance was not successful.
|
||||
func New(c Config, ns *netsender.Sender) (*Revid, error) {
|
||||
r := Revid{ns: ns}
|
||||
r := Revid{ns: ns, err: make(chan error)}
|
||||
r.buffer = ring.NewBuffer(ringBufferSize, ringBufferElementSize, writeTimeout)
|
||||
r.packer.owner = &r
|
||||
err := r.reset(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go r.handleErrors()
|
||||
return &r, nil
|
||||
}
|
||||
|
||||
// TODO(Saxon): put more thought into error severity.
|
||||
func (r *Revid) handleErrors() {
|
||||
for {
|
||||
err := <-r.err
|
||||
if err != nil {
|
||||
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
|
||||
err = r.Stop()
|
||||
if err != nil {
|
||||
r.config.Logger.Log(logger.Fatal, pkg+"failed to stop", "error", err.Error())
|
||||
}
|
||||
err = r.Start()
|
||||
if err != nil {
|
||||
r.config.Logger.Log(logger.Fatal, pkg+"failed to restart", "error", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Bitrate returns the result of the most recent bitrate check.
|
||||
func (r *Revid) Bitrate() int {
|
||||
return r.bitrate
|
||||
|
@ -203,40 +233,35 @@ func (r *Revid) reset(config Config) error {
|
|||
}
|
||||
}
|
||||
|
||||
n := 1
|
||||
if r.config.Output2 != 0 && r.config.Output2 != Rtp {
|
||||
n = 2
|
||||
}
|
||||
r.destination = make([]loadSender, n)
|
||||
|
||||
for outNo, outType := range []uint8{r.config.Output1, r.config.Output2} {
|
||||
switch outType {
|
||||
r.destination = r.destination[:0]
|
||||
for _, typ := range r.config.Outputs {
|
||||
switch typ {
|
||||
case File:
|
||||
s, err := newFileSender(config.OutputFileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination[outNo] = s
|
||||
r.destination = append(r.destination, s)
|
||||
case FfmpegRtmp:
|
||||
s, err := newFfmpegSender(config.RtmpUrl, fmt.Sprint(r.config.FrameRate))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination[outNo] = s
|
||||
r.destination = append(r.destination, s)
|
||||
case Rtmp:
|
||||
s, err := newRtmpSender(config.RtmpUrl, rtmpConnectionTimeout, rtmpConnectionMaxTries, r.config.Logger.Log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination[outNo] = s
|
||||
r.destination = append(r.destination, s)
|
||||
case Http:
|
||||
r.destination[outNo] = newHttpSender(r.ns, r.config.Logger.Log)
|
||||
r.destination = append(r.destination, newHttpSender(r.ns, r.config.Logger.Log))
|
||||
case Udp:
|
||||
s, err := newUdpSender(r.config.RtpAddress, r.config.Logger.Log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.destination[outNo] = s
|
||||
r.destination = append(r.destination, s)
|
||||
case Rtp:
|
||||
r.rtpSender, err = newRtpSender(r.config.RtpAddress, r.config.Logger.Log, r.config.FrameRate)
|
||||
if err != nil {
|
||||
|
@ -293,20 +318,30 @@ func (r *Revid) reset(config Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// IsRunning returns whether the receiver is running.
|
||||
// IsRunning returns true if revid is running.
|
||||
func (r *Revid) IsRunning() bool {
|
||||
return r.isRunning
|
||||
r.mu.Lock()
|
||||
ret := r.isRunning
|
||||
r.mu.Unlock()
|
||||
return ret
|
||||
}
|
||||
|
||||
// setIsRunning sets revid.isRunning using b.
|
||||
func (r *Revid) setIsRunning(b bool) {
|
||||
r.mu.Lock()
|
||||
r.isRunning = b
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Start invokes a Revid to start processing video from a defined input
|
||||
// and packetising (if theres packetization) to a defined output.
|
||||
func (r *Revid) Start() error {
|
||||
if r.isRunning {
|
||||
if r.IsRunning() {
|
||||
return errors.New(pkg + "start called but revid is already running")
|
||||
}
|
||||
r.config.Logger.Log(logger.Info, pkg+"starting Revid")
|
||||
r.config.Logger.Log(logger.Debug, pkg+"setting up output")
|
||||
r.isRunning = true
|
||||
r.setIsRunning(true)
|
||||
r.config.Logger.Log(logger.Info, pkg+"starting output routine")
|
||||
go r.outputClips()
|
||||
r.config.Logger.Log(logger.Info, pkg+"setting up input and receiving content")
|
||||
|
@ -316,12 +351,12 @@ func (r *Revid) Start() error {
|
|||
|
||||
// Stop halts any processing of video data from a camera or file
|
||||
func (r *Revid) Stop() error {
|
||||
if !r.isRunning {
|
||||
if !r.IsRunning() {
|
||||
return errors.New(pkg + "stop called but revid is already stopped")
|
||||
}
|
||||
|
||||
r.config.Logger.Log(logger.Info, pkg+"stopping revid")
|
||||
r.isRunning = false
|
||||
r.setIsRunning(false)
|
||||
|
||||
r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
|
||||
// If a cmd process is running, we kill!
|
||||
|
@ -337,7 +372,7 @@ func (r *Revid) outputClips() {
|
|||
lastTime := time.Now()
|
||||
var count int
|
||||
loop:
|
||||
for r.isRunning {
|
||||
for r.IsRunning() {
|
||||
// If the ring buffer has something we can read and send off
|
||||
chunk, err := r.buffer.Next(readTimeout)
|
||||
switch err {
|
||||
|
@ -381,7 +416,7 @@ loop:
|
|||
err = rs.restart()
|
||||
if err != nil {
|
||||
r.config.Logger.Log(logger.Error, pkg+"failed to restart rtmp session", "error", err.Error())
|
||||
r.isRunning = false
|
||||
r.setIsRunning(false)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -472,11 +507,8 @@ func (r *Revid) startRaspivid() error {
|
|||
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error())
|
||||
}
|
||||
|
||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
||||
err = r.lexTo(r.encoder, stdout, delay)
|
||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
||||
return err
|
||||
go r.processFrom(stdout, 0)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Revid) startV4L() error {
|
||||
|
@ -512,7 +544,6 @@ func (r *Revid) startV4L() error {
|
|||
r.config.Logger.Log(logger.Info, pkg+"ffmpeg args", "args", strings.Join(args, " "))
|
||||
r.cmd = exec.Command("ffmpeg", args...)
|
||||
|
||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
||||
stdout, err := r.cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -524,15 +555,12 @@ func (r *Revid) startV4L() error {
|
|||
return err
|
||||
}
|
||||
|
||||
r.config.Logger.Log(logger.Info, pkg+"reading camera data")
|
||||
err = r.lexTo(r.encoder, stdout, delay)
|
||||
r.config.Logger.Log(logger.Info, pkg+"finished reading camera data")
|
||||
return err
|
||||
go r.processFrom(stdout, time.Duration(0))
|
||||
return nil
|
||||
}
|
||||
|
||||
// setupInputForFile sets things up for getting input from a file
|
||||
func (r *Revid) setupInputForFile() error {
|
||||
delay := time.Second / time.Duration(r.config.FrameRate)
|
||||
f, err := os.Open(r.config.InputFileName)
|
||||
if err != nil {
|
||||
r.config.Logger.Log(logger.Error, err.Error())
|
||||
|
@ -542,5 +570,12 @@ func (r *Revid) setupInputForFile() error {
|
|||
defer f.Close()
|
||||
|
||||
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
|
||||
return r.lexTo(r.encoder, f, delay)
|
||||
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Revid) processFrom(read io.Reader, delay time.Duration) {
|
||||
r.config.Logger.Log(logger.Info, pkg+"reading input data")
|
||||
r.err <- r.lexTo(r.encoder, read, delay)
|
||||
r.config.Logger.Log(logger.Info, pkg+"finished reading input data")
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ import (
|
|||
const (
|
||||
typeNumber = 0x00
|
||||
typeBoolean = 0x01
|
||||
typeString = 0x02
|
||||
TypeString = 0x02
|
||||
TypeObject = 0x03
|
||||
typeMovieClip = 0x04
|
||||
TypeNull = 0x05
|
||||
|
@ -93,7 +93,7 @@ type Property struct {
|
|||
var (
|
||||
ErrShortBuffer = errors.New("amf: short buffer") // The supplied buffer was too short.
|
||||
ErrInvalidType = errors.New("amf: invalid type") // An invalid type was supplied to the encoder.
|
||||
ErrUnexpectedType = errors.New("amf: unexpected end") // An unexpected type was encountered while decoding.
|
||||
ErrUnexpectedType = errors.New("amf: unexpected type") // An unexpected type was encountered while decoding.
|
||||
ErrPropertyNotFound = errors.New("amf: property not found") // The requested property was not found.
|
||||
)
|
||||
|
||||
|
@ -160,6 +160,7 @@ func EncodeInt32(buf []byte, val uint32) ([]byte, error) {
|
|||
}
|
||||
|
||||
// EncodeString encodes a string.
|
||||
// Strings less than 65536 in length are encoded as TypeString, while longer strings are ecodeded as typeLongString.
|
||||
func EncodeString(buf []byte, val string) ([]byte, error) {
|
||||
const typeSize = 1
|
||||
if len(val) < 65536 && len(val)+typeSize+binary.Size(int16(0)) > len(buf) {
|
||||
|
@ -171,7 +172,7 @@ func EncodeString(buf []byte, val string) ([]byte, error) {
|
|||
}
|
||||
|
||||
if len(val) < 65536 {
|
||||
buf[0] = typeString
|
||||
buf[0] = TypeString
|
||||
buf = buf[1:]
|
||||
binary.BigEndian.PutUint16(buf[:2], uint16(len(val)))
|
||||
buf = buf[2:]
|
||||
|
@ -263,7 +264,7 @@ func EncodeProperty(prop *Property, buf []byte) ([]byte, error) {
|
|||
return EncodeNumber(buf, prop.Number)
|
||||
case typeBoolean:
|
||||
return EncodeBoolean(buf, prop.Number != 0)
|
||||
case typeString:
|
||||
case TypeString:
|
||||
return EncodeString(buf, prop.String)
|
||||
case TypeNull:
|
||||
if len(buf) < 2 {
|
||||
|
@ -320,7 +321,7 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
|
|||
prop.Number = float64(buf[0])
|
||||
buf = buf[1:]
|
||||
|
||||
case typeString:
|
||||
case TypeString:
|
||||
n := DecodeInt16(buf[:2])
|
||||
if len(buf) < int(n+2) {
|
||||
return 0, ErrShortBuffer
|
||||
|
@ -354,7 +355,6 @@ func DecodeProperty(prop *Property, buf []byte, decodeName bool) (int, error) {
|
|||
}
|
||||
|
||||
// Encode encodes an Object into its AMF representation.
|
||||
// This is the top-level encoding function and is typically the only function callers will need to use.
|
||||
func Encode(obj *Object, buf []byte) ([]byte, error) {
|
||||
if len(buf) < 5 {
|
||||
return nil, ErrShortBuffer
|
||||
|
@ -481,7 +481,7 @@ func (obj *Object) NumberProperty(name string, idx int) (float64, error) {
|
|||
|
||||
// StringProperty is a wrapper for Property that returns a String property's value, if any.
|
||||
func (obj *Object) StringProperty(name string, idx int) (string, error) {
|
||||
prop, err := obj.Property(name, idx, typeString)
|
||||
prop, err := obj.Property(name, idx, TypeString)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestSanity(t *testing.T) {
|
|||
// TestStrings tests string encoding and decoding.
|
||||
func TestStrings(t *testing.T) {
|
||||
// Short string encoding is as follows:
|
||||
// enc[0] = data type (typeString)
|
||||
// enc[0] = data type (TypeString)
|
||||
// end[1:3] = size
|
||||
// enc[3:] = data
|
||||
for _, s := range testStrings {
|
||||
|
@ -67,8 +67,8 @@ func TestStrings(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("EncodeString failed")
|
||||
}
|
||||
if buf[0] != typeString {
|
||||
t.Errorf("Expected typeString, got %v", buf[0])
|
||||
if buf[0] != TypeString {
|
||||
t.Errorf("Expected TypeString, got %v", buf[0])
|
||||
}
|
||||
ds := DecodeString(buf[1:])
|
||||
if s != ds {
|
||||
|
@ -76,7 +76,7 @@ func TestStrings(t *testing.T) {
|
|||
}
|
||||
}
|
||||
// Long string encoding is as follows:
|
||||
// enc[0] = data type (typeString)
|
||||
// enc[0] = data type (TypeString)
|
||||
// end[1:5] = size
|
||||
// enc[5:] = data
|
||||
s := string(make([]byte, 65536))
|
||||
|
@ -148,7 +148,7 @@ func TestProperties(t *testing.T) {
|
|||
// Encode/decode string properties.
|
||||
enc = buf[:]
|
||||
for i := range testStrings {
|
||||
enc, err = EncodeProperty(&Property{Type: typeString, String: testStrings[i]}, enc)
|
||||
enc, err = EncodeProperty(&Property{Type: TypeString, String: testStrings[i]}, enc)
|
||||
if err != nil {
|
||||
t.Errorf("EncodeProperty of string failed")
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ func TestObject(t *testing.T) {
|
|||
// Construct a more complicated object that includes a nested object.
|
||||
var obj2 Object
|
||||
for i := range testStrings {
|
||||
obj2.Properties = append(obj2.Properties, Property{Type: typeString, String: testStrings[i]})
|
||||
obj2.Properties = append(obj2.Properties, Property{Type: TypeString, String: testStrings[i]})
|
||||
obj2.Properties = append(obj2.Properties, Property{Type: typeNumber, Number: float64(testNumbers[i])})
|
||||
}
|
||||
obj2.Properties = append(obj2.Properties, Property{Type: TypeObject, Object: obj1})
|
||||
|
|
14
rtmp/conn.go
14
rtmp/conn.go
|
@ -121,20 +121,6 @@ func Dial(url string, timeout uint, log Log) (*Conn, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.link.app == "" {
|
||||
return nil, errInvalidURL
|
||||
}
|
||||
if c.link.port == 0 {
|
||||
switch {
|
||||
case (c.link.protocol & featureSSL) != 0:
|
||||
c.link.port = 433
|
||||
c.log(FatalLevel, pkg+"SSL not supported")
|
||||
case (c.link.protocol & featureHTTP) != 0:
|
||||
c.link.port = 80
|
||||
default:
|
||||
c.link.port = 1935
|
||||
}
|
||||
}
|
||||
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
|
||||
c.link.protocol |= featureWrite
|
||||
|
||||
|
|
|
@ -81,7 +81,7 @@ const (
|
|||
// 3: basic header (chunk type and stream ID) (1 byte)
|
||||
var headerSizes = [...]int{12, 8, 4, 1}
|
||||
|
||||
// packet defines an RTMP packet.
|
||||
// packet represents an RTMP packet.
|
||||
type packet struct {
|
||||
headerType uint8
|
||||
packetType uint8
|
||||
|
@ -90,7 +90,6 @@ type packet struct {
|
|||
timestamp uint32
|
||||
streamID uint32
|
||||
bodySize uint32
|
||||
bytesRead uint32
|
||||
buf []byte
|
||||
body []byte
|
||||
}
|
||||
|
@ -179,7 +178,6 @@ func (pkt *packet) readFrom(c *Conn) error {
|
|||
pkt.timestamp = amf.DecodeInt24(header[:3])
|
||||
if size >= 6 {
|
||||
pkt.bodySize = amf.DecodeInt24(header[3:6])
|
||||
pkt.bytesRead = 0
|
||||
|
||||
if size > 6 {
|
||||
pkt.packetType = header[6]
|
||||
|
@ -201,25 +199,18 @@ func (pkt *packet) readFrom(c *Conn) error {
|
|||
hSize += 4
|
||||
}
|
||||
|
||||
if pkt.bodySize > 0 && pkt.body == nil {
|
||||
pkt.resize(pkt.bodySize, (hbuf[0]&0xc0)>>6)
|
||||
pkt.resize(pkt.bodySize, pkt.headerType)
|
||||
|
||||
if pkt.bodySize > c.inChunkSize {
|
||||
c.log(WarnLevel, pkg+"reading large packet", "size", int(pkt.bodySize))
|
||||
}
|
||||
|
||||
toRead := pkt.bodySize - pkt.bytesRead
|
||||
chunkSize := c.inChunkSize
|
||||
|
||||
if toRead < chunkSize {
|
||||
chunkSize = toRead
|
||||
}
|
||||
|
||||
_, err = c.read(pkt.body[pkt.bytesRead:][:chunkSize])
|
||||
_, err = c.read(pkt.body[:pkt.bodySize])
|
||||
if err != nil {
|
||||
c.log(DebugLevel, pkg+"failed to read packet body", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
pkt.bytesRead += uint32(chunkSize)
|
||||
|
||||
// Keep the packet as a reference for other packets on this channel.
|
||||
if c.channelsIn[pkt.channel] == nil {
|
||||
c.channelsIn[pkt.channel] = &packet{}
|
||||
|
@ -237,15 +228,16 @@ func (pkt *packet) readFrom(c *Conn) error {
|
|||
c.channelTimestamp[pkt.channel] = int32(pkt.timestamp)
|
||||
|
||||
c.channelsIn[pkt.channel].body = nil
|
||||
c.channelsIn[pkt.channel].bytesRead = 0
|
||||
c.channelsIn[pkt.channel].hasAbsTimestamp = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// resize adjusts the packet's storage to accommodate a body of the given size and header type.
|
||||
// resize adjusts the packet's storage (if necessary) to accommodate a body of the given size and header type.
|
||||
// When headerSizeAuto is specified, the header type is computed based on packet type.
|
||||
func (pkt *packet) resize(size uint32, ht uint8) {
|
||||
if cap(pkt.buf) < fullHeaderSize+int(size) {
|
||||
pkt.buf = make([]byte, fullHeaderSize+size)
|
||||
}
|
||||
pkt.body = pkt.buf[fullHeaderSize:]
|
||||
if ht != headerSizeAuto {
|
||||
pkt.headerType = ht
|
||||
|
@ -407,7 +399,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
|||
return nil
|
||||
}
|
||||
} else {
|
||||
// Send previously deferrd packet if combining it with the next one would exceed the chunk size.
|
||||
// Send previously deferred packet if combining it with the next one would exceed the chunk size.
|
||||
if len(c.deferred)+size+hSize > chunkSize {
|
||||
c.log(DebugLevel, pkg+"sending deferred packet separately", "size", len(c.deferred))
|
||||
_, err := c.write(c.deferred)
|
||||
|
@ -419,7 +411,7 @@ func (pkt *packet) writeTo(c *Conn, queue bool) error {
|
|||
}
|
||||
|
||||
// TODO(kortschak): Rewrite this horrific peice of premature optimisation.
|
||||
c.log(DebugLevel, pkg+"sending packet", "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr(), "size", size)
|
||||
c.log(DebugLevel, pkg+"sending packet", "size", size, "la", c.link.conn.LocalAddr(), "ra", c.link.conn.RemoteAddr())
|
||||
for size+hSize != 0 {
|
||||
if chunkSize > size {
|
||||
chunkSize = size
|
||||
|
|
|
@ -41,7 +41,6 @@ import (
|
|||
)
|
||||
|
||||
// parseURL parses an RTMP URL (ok, technically it is lexing).
|
||||
//
|
||||
func parseURL(addr string) (protocol int32, host string, port uint16, app, playpath string, err error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
|
@ -81,6 +80,9 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
|||
}
|
||||
elems := strings.SplitN(u.Path[1:], "/", 3)
|
||||
app = elems[0]
|
||||
if app == "" {
|
||||
return protocol, host, port, app, playpath, errInvalidURL
|
||||
}
|
||||
playpath = elems[1]
|
||||
if len(elems) == 3 && len(elems[2]) != 0 {
|
||||
playpath = path.Join(elems[1:]...)
|
||||
|
@ -97,5 +99,15 @@ func parseURL(addr string) (protocol int32, host string, port uint16, app, playp
|
|||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case port != 0:
|
||||
case (protocol & featureSSL) != 0:
|
||||
return protocol, host, port, app, playpath, errUnimplemented // port = 433
|
||||
case (protocol & featureHTTP) != 0:
|
||||
port = 80
|
||||
default:
|
||||
port = 1935
|
||||
}
|
||||
|
||||
return protocol, host, port, app, playpath, nil
|
||||
}
|
||||
|
|
43
rtmp/rtmp.go
43
rtmp/rtmp.go
|
@ -174,6 +174,13 @@ func connect(c *Conn) error {
|
|||
return err
|
||||
}
|
||||
c.log(DebugLevel, pkg+"connected")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
c.link.conn.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
err = handshake(c)
|
||||
if err != nil {
|
||||
c.log(WarnLevel, pkg+"handshake failed", "error", err.Error())
|
||||
|
@ -185,12 +192,14 @@ func connect(c *Conn) error {
|
|||
c.log(WarnLevel, pkg+"sendConnect failed", "error", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
c.log(DebugLevel, pkg+"negotiating")
|
||||
var buf [256]byte
|
||||
for !c.isPlaying {
|
||||
pkt := packet{}
|
||||
pkt := packet{buf: buf[:]}
|
||||
err = pkt.readFrom(c)
|
||||
if err != nil {
|
||||
break
|
||||
return err
|
||||
}
|
||||
|
||||
switch pkt.packetType {
|
||||
|
@ -199,14 +208,10 @@ func connect(c *Conn) error {
|
|||
default:
|
||||
err = handlePacket(c, &pkt)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !c.isPlaying {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -276,26 +281,18 @@ func sendConnectPacket(c *Conn) error {
|
|||
return err
|
||||
}
|
||||
|
||||
enc[0] = amf.TypeObject
|
||||
enc = enc[1:]
|
||||
enc, err = amf.EncodeNamedString(enc, avApp, c.link.app)
|
||||
if err != nil {
|
||||
return err
|
||||
// required link info
|
||||
info := amf.Object{Properties: []amf.Property{
|
||||
amf.Property{Type: amf.TypeString, Name: avApp, String: c.link.app},
|
||||
amf.Property{Type: amf.TypeString, Name: avType, String: avNonprivate},
|
||||
amf.Property{Type: amf.TypeString, Name: avTcUrl, String: c.link.url}},
|
||||
}
|
||||
enc, err = amf.EncodeNamedString(enc, avType, avNonprivate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
enc, err = amf.EncodeNamedString(enc, avTcUrl, c.link.url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
enc, err = amf.EncodeInt24(enc, amf.TypeObjectEnd)
|
||||
enc, err = amf.Encode(&info, enc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// add auth string, if any
|
||||
// optional link auth info
|
||||
if c.link.auth != "" {
|
||||
enc, err = amf.EncodeBoolean(enc, c.link.flags&linkAuth != 0)
|
||||
if err != nil {
|
||||
|
|
|
@ -243,8 +243,9 @@ func TestFromFile(t *testing.T) {
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
rs := &rtmpSender{conn: c}
|
||||
// Pass RTMP session, true for audio, true for video, and 25 FPS
|
||||
flvEncoder, err := flv.NewEncoder(c, true, true, 25)
|
||||
flvEncoder, err := flv.NewEncoder(rs, true, true, 25)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create encoder: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue