Merge branch 'rtmpOutputFunctionality' of https://bitbucket.org/ausocean/av/src into rtmpOutputFunctionality

This commit is contained in:
Saxon Milton 2018-04-14 19:04:28 +09:30
commit a27b92ade8
8 changed files with 119 additions and 67 deletions

31
basicDriver.go Normal file
View File

@ -0,0 +1,31 @@
package main
import (
"bitbucket.org/ausocean/av/revid"
"fmt"
"time"
)
func main() {
// Give the platform some time to set itself up
time.Sleep(30 * time.Second)
config := revid.Config{
Input: revid.Raspivid,
Output: revid.Rtmp,
RtmpMethod: revid.LibRtmp,
QuantizationMode: revid.QuantizationOff,
RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/xt13-r4dh-f2w1-bh4s",
Bitrate: "500000",
Packetization: revid.Flv,
}
revidInst, err := revid.NewRevidInstance(config)
if err != nil {
fmt.Println("Should not have got error!")
return
}
// Run this instance for 2 days! Power to the pi will surely turn itself
// off before this time is up.
revidInst.Start()
time.Sleep(2 * 43200 * time.Second)
revidInst.Stop()
}

View File

@ -1,15 +1,15 @@
/* /*
NAME NAME
FLV.go audioTag.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
AUTHORS AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
FLV.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) audioTag.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the

View File

@ -1,15 +1,15 @@
/* /*
NAME NAME
FLV.go videoTag.go
DESCRIPTION DESCRIPTION
See Readme.md See Readme.md
AUTHORS AUTHORS
Saxon A. Nelson-Milton <saxon@ausocean.org> Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE LICENSE
FLV.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean) videoTag.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the under the terms of the GNU General Public License as published by the

View File

@ -57,7 +57,6 @@ type flvGenerator struct {
audioFlag bool audioFlag bool
videoFlag bool videoFlag bool
lastTagSize int lastTagSize int
currentTimestamp uint32
header flv.Header header flv.Header
startTime time.Time startTime time.Time
firstTag bool firstTag bool
@ -81,7 +80,6 @@ func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) {
g.fps = fps g.fps = fps
g.audioFlag = audio g.audioFlag = audio
g.videoFlag = video g.videoFlag = video
g.currentTimestamp = 0
g.lastTagSize = 0 g.lastTagSize = 0
g.inputChan = make(chan []byte, inputChanLength) g.inputChan = make(chan []byte, inputChanLength)
g.outputChan = make(chan []byte, outputChanLength) g.outputChan = make(chan []byte, outputChanLength)
@ -89,7 +87,7 @@ func NewFlvGenerator(audio bool, video bool, fps uint) (g *flvGenerator) {
return return
} }
// Start beings the generation routine - i.e. if raw data is given to the input // Start begins the generation routine - i.e. if raw data is given to the input
// channel flv tags will be produced and available from the output channel. // channel flv tags will be produced and available from the output channel.
func (g *flvGenerator) Start() { func (g *flvGenerator) Start() {
go g.generate() go g.generate()
@ -105,8 +103,7 @@ func (g *flvGenerator) GenHeader() {
g.outputChan <- header.ToByteSlice() g.outputChan <- header.ToByteSlice()
} }
// getNextTimestamp generates and returns the next timestamp based on the given // getNextTimestamp generates and returns the next timestamp based on current time
// fps rate
func (g *flvGenerator) getNextTimestamp() (timestamp uint32) { func (g *flvGenerator) getNextTimestamp() (timestamp uint32) {
if g.firstTag { if g.firstTag {
g.startTime = time.Now() g.startTime = time.Now()
@ -114,19 +111,12 @@ func (g *flvGenerator) getNextTimestamp() (timestamp uint32) {
timestamp = 0 timestamp = 0
return return
} }
//timestamp = g.currentTimestamp
//g.currentTimestamp += 40
timestamp = uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000)) timestamp = uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000))
//fmt.Printf("timestamp: %v", timestamp)
return return
} }
// ResetTimestamp resets the current timestamp to 0 i.e. equivalent to start of // isKeyFrame returns true if the passed frame data represents that of a keyframe
// transmission // TODO: clean this up and use conts for naltype codes
func (g *flvGenerator) ResetTimestamp() {
g.currentTimestamp = 0
}
func isKeyFrame(frame []byte) bool { func isKeyFrame(frame []byte) bool {
byteChannel := make(chan byte, len(frame)) byteChannel := make(chan byte, len(frame))
for i := range frame { for i := range frame {
@ -153,6 +143,9 @@ func isKeyFrame(frame []byte) bool {
return false return false
} }
// isSequenceHeader returns true if the passed frame data represents that of a
// a sequence header.
// TODO: clean this up and use consts for the nalTypes
func isSequenceHeader(frame []byte) bool { func isSequenceHeader(frame []byte) bool {
byteChannel := make(chan byte, len(frame)) byteChannel := make(chan byte, len(frame))
for i := range frame { for i := range frame {
@ -222,6 +215,7 @@ func (g *flvGenerator) generate() {
// Do we even have some audio to send off ? // Do we even have some audio to send off ?
if g.audioFlag { if g.audioFlag {
// Not sure why but we need two audio tags for dummy silent audio // Not sure why but we need two audio tags for dummy silent audio
// TODO: create constants or SoundSize and SoundType parameters
tag := flv.AudioTag{ tag := flv.AudioTag{
TagType: uint8(flv.AudioTagType), TagType: uint8(flv.AudioTagType),
DataSize: 7, DataSize: 7,

View File

@ -177,6 +177,7 @@ func (g *tsGenerator) generate() {
} }
pkt.FillPayload(g.payloadByteChan) pkt.FillPayload(g.payloadByteChan)
// TODO: create consts for AFC parameters
if pusi { if pusi {
// Create pat table // Create pat table
patPkt := mpegts.MpegTsPacket{ patPkt := mpegts.MpegTsPacket{

View File

@ -45,6 +45,7 @@ type Config struct {
RtmpMethod uint8 RtmpMethod uint8
Packetization uint8 Packetization uint8
QuantizationMode uint8 QuantizationMode uint8
Verbosity uint8
FramesPerClip int FramesPerClip int
RtmpUrl string RtmpUrl string
Bitrate string Bitrate string
@ -79,6 +80,8 @@ const (
LibRtmp = 14 LibRtmp = 14
QuantizationOn = 15 QuantizationOn = 15
QuantizationOff = 16 QuantizationOff = 16
Yes = 17
No = 18
) )
// Default config settings // Default config settings
@ -91,11 +94,22 @@ const (
defaultQuantization = "40" defaultQuantization = "40"
defaultBitrate = "0" defaultBitrate = "0"
defaultQuantizationMode = QuantizationOff defaultQuantizationMode = QuantizationOff
defaultFramesPerClip = 1
) )
// Validate checks for any errors in the config files and defaults settings // Validate checks for any errors in the config fields and defaults settings
// if particular parameters have not been defined. // if particular parameters have not been defined.
func (config *Config) Validate(r *revidInst) error { func (config *Config) Validate(r *revidInst) error {
switch config.Verbosity {
case Yes:
case No:
case NothingDefined:
config.Verbosity = No
r.Log(Warning, "No verbosity mode defined, defaulting to no Verbosity!")
default:
return errors.New("Bad Verbosity defined in config!")
}
switch config.QuantizationMode { switch config.QuantizationMode {
case QuantizationOn: case QuantizationOn:
case QuantizationOff: case QuantizationOff:
@ -103,7 +117,7 @@ func (config *Config) Validate(r *revidInst) error {
r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!") r.Log(Warning, "No quantization mode defined, defaulting to QuantizationOff!")
config.QuantizationMode = QuantizationOff config.QuantizationMode = QuantizationOff
default: default:
return errors.New("Bad QuantizationMode define in config!") return errors.New("Bad QuantizationMode defined in config!")
} }
switch config.Input { switch config.Input {
@ -120,15 +134,24 @@ func (config *Config) Validate(r *revidInst) error {
switch config.InputCodec { switch config.InputCodec {
case H264: case H264:
if config.Bitrate != "" && config.Quantization != "" { if config.Bitrate != "" && config.Quantization != "" {
bitrate, _ := strconv.Atoi(config.Bitrate) bitrate, err := strconv.Atoi(config.Bitrate)
quantization, _ := strconv.Atoi(config.Quantization) if err != nil {
return errors.New("Something is wrong with bitrate in conig!")
}
quantization, err := strconv.Atoi(config.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
}
if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) { if (bitrate > 0 && quantization > 0) || (bitrate == 0 && quantization == 0) {
return errors.New("Bad bitrate and quantization combination for H264 input!") return errors.New("Bad bitrate and quantization combination for H264 input!")
} }
} }
case Mjpeg: case Mjpeg:
if config.Quantization != "" { if config.Quantization != "" {
quantization, _ := strconv.Atoi(config.Quantization) quantization, err := strconv.Atoi(config.Quantization)
if err != nil {
return errors.New("Something is wrong with quantization in config!")
}
if quantization > 0 || config.Bitrate == "" { if quantization > 0 || config.Bitrate == "" {
return errors.New("Bad bitrate or quantization for mjpeg input!") return errors.New("Bad bitrate or quantization for mjpeg input!")
} }
@ -150,6 +173,8 @@ func (config *Config) Validate(r *revidInst) error {
if config.RtmpUrl == "" { if config.RtmpUrl == "" {
return errors.New("Bad RTMP URL") return errors.New("Bad RTMP URL")
} }
r.Log(Info, "Defaulting frames per clip to 1 for rtmp output!")
config.FramesPerClip = 1
case NothingDefined: case NothingDefined:
r.Log(Warning, "No output defined, defaulting to httpOut!") r.Log(Warning, "No output defined, defaulting to httpOut!")
config.Output = Http config.Output = Http
@ -182,7 +207,7 @@ func (config *Config) Validate(r *revidInst) error {
case config.FramesPerClip > 0: case config.FramesPerClip > 0:
case config.FramesPerClip == 0: case config.FramesPerClip == 0:
r.Log(Warning, "No frames per clip defined, defaulting to 1!") r.Log(Warning, "No frames per clip defined, defaulting to 1!")
config.FramesPerClip = 1 config.FramesPerClip = defaultFramesPerClip
case config.FramesPerClip < 0: case config.FramesPerClip < 0:
return errors.New("Bad frames per clip given!") return errors.New("Bad frames per clip given!")
} }

View File

@ -41,6 +41,7 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"time" "time"
"sync"
"bitbucket.org/ausocean/av/generator" "bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parser" "bitbucket.org/ausocean/av/parser"
@ -84,7 +85,7 @@ const (
type RevidInst interface { type RevidInst interface {
Start() Start()
Stop() Stop()
ChangeState(newconfig Config) error changeState(newconfig Config) error
GetConfigRef() *Config GetConfigRef() *Config
Log(logType, m string) Log(logType, m string)
IsRunning() bool IsRunning() bool
@ -111,6 +112,7 @@ type revidInst struct {
getFrame func() []byte getFrame func() []byte
sendClip func(clip []byte) error sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession rtmpInst rtmp.RTMPSession
mutex sync.Mutex
} }
// NewRevidInstance returns a pointer to a new revidInst with the desired // NewRevidInstance returns a pointer to a new revidInst with the desired
@ -118,10 +120,12 @@ type revidInst struct {
// successful. // successful.
func NewRevidInstance(config Config) (r *revidInst, err error) { func NewRevidInstance(config Config) (r *revidInst, err error) {
r = new(revidInst) r = new(revidInst)
r.mutex = sync.Mutex{}
r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize) r.ringBuffer = ringbuffer.NewRingBuffer(ringBufferSize, ringBufferElementSize)
err = r.ChangeState(config) err = r.changeState(config)
if err != nil { if err != nil {
return nil, err r = nil
return
} }
r.outputChan = make(chan []byte, outputChanSize) r.outputChan = make(chan []byte, outputChanSize)
r.parser.Start() r.parser.Start()
@ -136,9 +140,9 @@ func (r *revidInst) GetConfigRef() *Config {
return &r.config return &r.config
} }
// ChangeState swaps the current config of a revidInst with the passed // changeState swaps the current config of a revidInst with the passed
// configuration; checking validity and returning errors if not valid. // configuration; checking validity and returning errors if not valid.
func (r *revidInst) ChangeState(config Config) error { func (r *revidInst) changeState(config Config) error {
r.config.Logger = config.Logger r.config.Logger = config.Logger
err := config.Validate(r) err := config.Validate(r)
if err != nil { if err != nil {
@ -204,6 +208,17 @@ noPacketizationSetup:
return nil return nil
} }
// ChangeConfig changes the current configuration of the revid instance.
func (r *revidInst) ChangeConfig(config Config) (err error) {
r.Stop()
r, err = NewRevidInstance(config)
if err != nil {
return
}
r.Start()
return
}
// Log takes a logtype and message and tries to send this information to the // Log takes a logtype and message and tries to send this information to the
// logger provided in the revidInst config - if there is one, otherwise the message // logger provided in the revidInst config - if there is one, otherwise the message
// is sent to stdout // is sent to stdout
@ -211,7 +226,9 @@ func (r *revidInst) Log(logType, m string) {
if r.config.Logger != nil { if r.config.Logger != nil {
r.config.Logger.Log(logType, m) r.config.Logger.Log(logType, m)
} else { } else {
// fmt.Println(logType + ": " + m) if r.config.Verbosity == Yes {
fmt.Println(logType + ": " + m)
}
} }
} }
@ -223,6 +240,8 @@ func (r *revidInst) IsRunning() bool {
// Start invokes a revidInst to start processing video from a defined input // Start invokes a revidInst to start processing video from a defined input
// and packetising (if theres packetization) to a defined output. // and packetising (if theres packetization) to a defined output.
func (r *revidInst) Start() { func (r *revidInst) Start() {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.isRunning { if r.isRunning {
r.Log(Warning, "revidInst.Start() called but revid already running!") r.Log(Warning, "revidInst.Start() called but revid already running!")
return return
@ -242,16 +261,18 @@ func (r *revidInst) Start() {
// Stop halts any processing of video data from a camera or file // Stop halts any processing of video data from a camera or file
func (r *revidInst) Stop() { func (r *revidInst) Stop() {
if r.isRunning { r.mutex.Lock()
defer r.mutex.Unlock()
if !r.isRunning {
r.Log(Warning, "revidInst.Stop() called but revid not running!")
return
}
r.Log(Info, "Stopping revid!") r.Log(Info, "Stopping revid!")
r.isRunning = false r.isRunning = false
// If a cmd process is running, we kill! // If a cmd process is running, we kill!
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
} else {
r.Log(Warning, "revidInst.Stop() called but revid not running!")
}
} }
// getFrameNoPacketization gets a frame directly from the revid output chan // getFrameNoPacketization gets a frame directly from the revid output chan
@ -306,6 +327,13 @@ func (r *revidInst) packClips() {
} }
} }
func reboot() {
cmd := exec.Command("sudo", "reboot")
err := cmd.Run()
fmt.Println(err)
fmt.Println("Rebooting!")
}
// outputClips takes the clips produced in the packClips method and outputs them // outputClips takes the clips produced in the packClips method and outputs them
// to the desired output defined in the revidInst config // to the desired output defined in the revidInst config
func (r *revidInst) outputClips() { func (r *revidInst) outputClips() {
@ -331,6 +359,9 @@ func (r *revidInst) outputClips() {
bytes += len(clip) bytes += len(clip)
for err := r.sendClip(clip); err != nil; { for err := r.sendClip(clip); err != nil; {
r.Log(Error, err.Error()) r.Log(Error, err.Error())
if r.config.RtmpMethod == LibRtmp {
reboot()
}
if len(clip) >= 11 { if len(clip) >= 11 {
r.Log(Warning, "Send failed trying again!") r.Log(Warning, "Send failed trying again!")
err = r.sendClip(clip) err = r.sendClip(clip)
@ -374,7 +405,7 @@ func (r *revidInst) sendClipToHTTP(clip []byte) error {
client := http.Client{Timeout: timeout} client := http.Client{Timeout: timeout}
url := r.config.HttpAddress + strconv.Itoa(len(clip)) url := r.config.HttpAddress + strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip))) r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip))
if err != nil { if err != nil {
return fmt.Errorf("Error posting to %s: %s", url, err) return fmt.Errorf("Error posting to %s: %s", url, err)
} }

View File

@ -1,30 +0,0 @@
package main
import (
"time"
"fmt"
"bitbucket.org/ausocean/av/revid"
)
func main(){
time.Sleep(30*time.Second)
config := revid.Config{
Input: revid.Raspivid,
Output: revid.Rtmp,
RtmpMethod: revid.LibRtmp,
QuantizationMode: revid.QuantizationOff,
RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1",
Bitrate: "500000",
FramesPerClip: 1,
Packetization: revid.Flv,
FrameRate: "25",
}
revidInst, err := revid.NewRevidInstance(config)
if err != nil {
fmt.Println("Should not have got error!")
return
}
revidInst.Start()
time.Sleep(2*43200*time.Second)
revidInst.Stop()
}