diff --git a/basicDriver.go b/basicDriver.go new file mode 100644 index 00000000..c129a91b --- /dev/null +++ b/basicDriver.go @@ -0,0 +1,31 @@ +package main + +import ( + "bitbucket.org/ausocean/av/revid" + "fmt" + "time" +) + +func main() { + // Give the platform some time to set itself up + time.Sleep(30 * time.Second) + config := revid.Config{ + Input: revid.Raspivid, + Output: revid.Rtmp, + RtmpMethod: revid.LibRtmp, + QuantizationMode: revid.QuantizationOff, + RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/xt13-r4dh-f2w1-bh4s", + Bitrate: "500000", + Packetization: revid.Flv, + } + revidInst, err := revid.NewRevidInstance(config) + if err != nil { + fmt.Println("Should not have got error!") + return + } + // Run this instance for 2 days! Power to the pi will surely turn itself + // off before this time is up. + revidInst.Start() + time.Sleep(2 * 43200 * time.Second) + revidInst.Stop() +} diff --git a/flv/audioTag.go b/flv/audioTag.go index e2ce0a1f..be8edce4 100644 --- a/flv/audioTag.go +++ b/flv/audioTag.go @@ -1,15 +1,15 @@ /* NAME - FLV.go + audioTag.go DESCRIPTION See Readme.md AUTHORS - Saxon A. Nelson-Milton + Saxon Nelson-Milton LICENSE - FLV.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + audioTag.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the diff --git a/flv/videoTag.go b/flv/videoTag.go index 5d2c2e38..6c1f658a 100644 --- a/flv/videoTag.go +++ b/flv/videoTag.go @@ -1,15 +1,15 @@ /* NAME - FLV.go + videoTag.go DESCRIPTION See Readme.md AUTHORS - Saxon A. Nelson-Milton + Saxon Nelson-Milton LICENSE - FLV.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) + videoTag.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the diff --git a/generator/FLVGenerator.go b/generator/FLVGenerator.go index 14666c68..abeb60af 100644 --- a/generator/FLVGenerator.go +++ b/generator/FLVGenerator.go @@ -57,7 +57,6 @@ type flvGenerator struct { audioFlag bool videoFlag bool lastTagSize int - currentTimestamp uint32 header flv.Header startTime time.Time firstTag bool @@ -81,7 +80,6 @@ func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) { g.fps = fps g.audioFlag = audio g.videoFlag = video - g.currentTimestamp = 0 g.lastTagSize = 0 g.inputChan = make(chan []byte, inputChanLength) g.outputChan = make(chan []byte, outputChanLength) @@ -89,7 +87,7 @@ func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) { return } -// Start beings the generation routine - i.e. if raw data is given to the input +// Start begins the generation routine - i.e. if raw data is given to the input // channel flv tags will be produced and available from the output channel. func (g *flvGenerator) Start() { go g.generate() @@ -105,8 +103,7 @@ func (g *flvGenerator) GenHeader() { g.outputChan <- header.ToByteSlice() } -// getNextTimestamp generates and returns the next timestamp based on the given -// fps rate +// getNextTimestamp generates and returns the next timestamp based on current time func (g *flvGenerator) getNextTimestamp() (timestamp uint32) { if g.firstTag { g.startTime = time.Now() @@ -114,19 +111,12 @@ func (g *flvGenerator) getNextTimestamp() (timestamp uint32) { timestamp = 0 return } - //timestamp = g.currentTimestamp - //g.currentTimestamp += 40 timestamp = uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000)) - //fmt.Printf("timestamp: %v", timestamp) return } -// ResetTimestamp resets the current timestamp to 0 i.e. equivalent to start of -// transmission -func (g *flvGenerator) ResetTimestamp() { - g.currentTimestamp = 0 -} - +// isKeyFrame returns true if the passed frame data represents that of a keyframe +// TODO: clean this up and use conts for naltype codes func isKeyFrame(frame []byte) bool { byteChannel := make(chan byte, len(frame)) for i := range frame { @@ -153,6 +143,9 @@ func isKeyFrame(frame []byte) bool { return false } +// isSequenceHeader returns true if the passed frame data represents that of a +// a sequence header. +// TODO: clean this up and use consts for the nalTypes func isSequenceHeader(frame []byte) bool { byteChannel := make(chan byte, len(frame)) for i := range frame { @@ -222,6 +215,7 @@ func (g *flvGenerator) generate() { // Do we even have some audio to send off ? if g.audioFlag { // Not sure why but we need two audio tags for dummy silent audio + // TODO: create constants or SoundSize and SoundType parameters tag := flv.AudioTag{ TagType: uint8(flv.AudioTagType), DataSize: 7, diff --git a/generator/MPEGTSGenerator.go b/generator/MPEGTSGenerator.go index 8077b450..bca41712 100644 --- a/generator/MPEGTSGenerator.go +++ b/generator/MPEGTSGenerator.go @@ -177,6 +177,7 @@ func (g *tsGenerator) generate() { } pkt.FillPayload(g.payloadByteChan) + // TODO: create consts for AFC parameters if pusi { // Create pat table patPkt := mpegts.MpegTsPacket{ diff --git a/revid/Config.go b/revid/Config.go index e9a7c354..c5bb93ab 100644 --- a/revid/Config.go +++ b/revid/Config.go @@ -45,6 +45,7 @@ type Config struct { RtmpMethod uint8 Packetization uint8 QuantizationMode uint8 + Verbosity uint8 FramesPerClip int RtmpUrl string Bitrate string @@ -79,6 +80,8 @@ const ( LibRtmp = 14 QuantizationOn = 15 QuantizationOff = 16 + Yes = 17 + No = 18 ) // Default config settings @@ -91,11 +94,22 @@ const ( defaultQuantization = "40" defaultBitrate = "0" defaultQuantizationMode = QuantizationOff + defaultFramesPerClip = 1 ) -// Validate checks for any errors in the config files and defaults settings +// Validate checks for any errors in the config fields and defaults settings // if particular parameters have not been defined. func (config *Config) Validate(r *revidInst) error { + switch config.Verbosity { + case Yes: + case No: + case NothingDefined: + config.Verbosity = No + r.Log(Warning, "No verbosity mode defined, defaulting to no Verbosity!") + default: + return errors.New("Bad Verbosity defined in config!") + } + switch config.QuantizationMode { case QuantizationOn: case QuantizationOff: @@ -103,7 +117,7 @@ func (config *Config) Validate(r *revidInst) error { r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!") config.QuantizationMode = QuantizationOff default: - return errors.New("Bad QuantizationMode define in config!") + return errors.New("Bad QuantizationMode defined in config!") } switch config.Input { @@ -120,15 +134,24 @@ func (config *Config) Validate(r *revidInst) error { switch config.InputCodec { case H264: if config.Bitrate != "" && config.Quantization != "" { - bitrate, _ := strconv.Atoi(config.Bitrate) - quantization, _ := strconv.Atoi(config.Quantization) + bitrate, err := strconv.Atoi(config.Bitrate) + if err != nil { + return errors.New("Something is wrong with bitrate in conig!") + } + quantization, err := strconv.Atoi(config.Quantization) + if err != nil { + return errors.New("Something is wrong with quantization in config!") + } if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) { return errors.New("Bad bitrate and quantization combination for H264 input!") } } case Mjpeg: if config.Quantization != "" { - quantization, _ := strconv.Atoi(config.Quantization) + quantization, err := strconv.Atoi(config.Quantization) + if err != nil { + return errors.New("Something is wrong with quantization in config!") + } if quantization > 0 || config.Bitrate == "" { return errors.New("Bad bitrate or quantization for mjpeg input!") } @@ -150,6 +173,8 @@ func (config *Config) Validate(r *revidInst) error { if config.RtmpUrl == "" { return errors.New("Bad RTMP URL") } + r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!") + config.FramesPerClip = 1 case NothingDefined: r.Log(Warning, "No output defined, defaulting to httpOut!") config.Output = Http @@ -182,7 +207,7 @@ func (config *Config) Validate(r *revidInst) error { case config.FramesPerClip > 0: case config.FramesPerClip == 0: r.Log(Warning, "No frames per clip defined, defaulting to 1!") - config.FramesPerClip = 1 + config.FramesPerClip = defaultFramesPerClip case config.FramesPerClip < 0: return errors.New("Bad frames per clip given!") } diff --git a/revid/RevidInstance.go b/revid/RevidInstance.go index c64f2fcf..5a3454e0 100644 --- a/revid/RevidInstance.go +++ b/revid/RevidInstance.go @@ -41,6 +41,7 @@ import ( "os/exec" "strconv" "time" + "sync" "bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/parser" @@ -84,7 +85,7 @@ const ( type RevidInst interface { Start() Stop() - ChangeState(newconfig Config) error + changeState(newconfig Config) error GetConfigRef() *Config Log(logType, m string) IsRunning() bool @@ -111,6 +112,7 @@ type revidInst struct { getFrame func() []byte sendClip func(clip []byte) error rtmpInst rtmp.RTMPSession + mutex sync.Mutex } // NewRevidInstance returns a pointer to a new revidInst with the desired @@ -118,10 +120,12 @@ type revidInst struct { // successful. func NewRevidInstance(config Config) (r *revidInst, err error) { r = new(revidInst) + r.mutex = sync.Mutex{} r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) - err = r.ChangeState(config) + err = r.changeState(config) if err != nil { - return nil, err + r = nil + return } r.outputChan = make(chan []byte, outputChanSize) r.parser.Start() @@ -136,9 +140,9 @@ func (r *revidInst) GetConfigRef() *Config { return &r.config } -// ChangeState swaps the current config of a revidInst with the passed +// changeState swaps the current config of a revidInst with the passed // configuration; checking validity and returning errors if not valid. -func (r *revidInst) ChangeState(config Config) error { +func (r *revidInst) changeState(config Config) error { r.config.Logger = config.Logger err := config.Validate(r) if err != nil { @@ -204,6 +208,17 @@ noPacketizationSetup: return nil } +// ChangeConfig changes the current configuration of the revid instance. +func (r *revidInst) ChangeConfig(config Config) (err error) { + r.Stop() + r, err = NewRevidInstance(config) + if err != nil { + return + } + r.Start() + return +} + // Log takes a logtype and message and tries to send this information to the // logger provided in the revidInst config - if there is one, otherwise the message // is sent to stdout @@ -211,7 +226,9 @@ func (r *revidInst) Log(logType, m string) { if r.config.Logger != nil { r.config.Logger.Log(logType, m) } else { - // fmt.Println(logType + ": " + m) + if r.config.Verbosity == Yes { + fmt.Println(logType + ": " + m) + } } } @@ -223,6 +240,8 @@ func (r *revidInst) IsRunning() bool { // Start invokes a revidInst to start processing video from a defined input // and packetising (if theres packetization) to a defined output. func (r *revidInst) Start() { + r.mutex.Lock() + defer r.mutex.Unlock() if r.isRunning { r.Log(Warning, "revidInst.Start() called but revid already running!") return @@ -242,16 +261,18 @@ func (r *revidInst) Start() { // Stop halts any processing of video data from a camera or file func (r *revidInst) Stop() { - if r.isRunning { + r.mutex.Lock() + defer r.mutex.Unlock() + if !r.isRunning { + r.Log(Warning, "revidInst.Stop() called but revid not running!") + return + } r.Log(Info, "Stopping revid!") r.isRunning = false // If a cmd process is running, we kill! if r.cmd != nil && r.cmd.Process != nil { r.cmd.Process.Kill() } - } else { - r.Log(Warning, "revidInst.Stop() called but revid not running!") - } } // getFrameNoPacketization gets a frame directly from the revid output chan @@ -306,6 +327,13 @@ func (r *revidInst) packClips() { } } +func reboot() { + cmd := exec.Command("sudo", "reboot") + err := cmd.Run() + fmt.Println(err) + fmt.Println("Rebooting!") +} + // outputClips takes the clips produced in the packClips method and outputs them // to the desired output defined in the revidInst config func (r *revidInst) outputClips() { @@ -331,6 +359,9 @@ func (r *revidInst) outputClips() { bytes += len(clip) for err := r.sendClip(clip); err != nil; { r.Log(Error, err.Error()) + if r.config.RtmpMethod == LibRtmp { + reboot() + } if len(clip) >= 11 { r.Log(Warning, "Send failed trying again!") err = r.sendClip(clip) @@ -374,7 +405,7 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error { client := http.Client{Timeout: timeout} url := r.config.HttpAddress + strconv.Itoa(len(clip)) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) - resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer + resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) if err != nil { return fmt.Errorf("Error posting to %s: %s", url, err) } diff --git a/testDriver.go b/testDriver.go deleted file mode 100644 index bc77736e..00000000 --- a/testDriver.go +++ /dev/null @@ -1,30 +0,0 @@ -package main - -import ( - "time" - "fmt" - "bitbucket.org/ausocean/av/revid" -) - -func main(){ - time.Sleep(30*time.Second) - config := revid.Config{ - Input: revid.Raspivid, - Output: revid.Rtmp, - RtmpMethod: revid.LibRtmp, - QuantizationMode: revid.QuantizationOff, - RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1", - Bitrate: "500000", - FramesPerClip: 1, - Packetization: revid.Flv, - FrameRate: "25", - } - revidInst, err := revid.NewRevidInstance(config) - if err != nil { - fmt.Println("Should not have got error!") - return - } - revidInst.Start() - time.Sleep(2*43200*time.Second) - revidInst.Stop() -}