From 46ca3e2611d92be0ea4149e7ffc5bbf2ecd9b387 Mon Sep 17 00:00:00 2001 From: Trek H Date: Fri, 19 Apr 2019 01:20:48 +0930 Subject: [PATCH] revid: created an audioInput struct to record audio that acts as a reader for revid --- go.mod | 3 + revid/audio.go | 240 ++++++++---------------------------------------- revid/config.go | 2 + revid/revid.go | 8 +- 4 files changed, 50 insertions(+), 203 deletions(-) diff --git a/go.mod b/go.mod index 1280e1a3..5d3fde42 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,12 @@ require ( bitbucket.org/ausocean/iot v1.2.4 bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 + github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect + github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/mewkiz/flac v1.0.5 + github.com/sergi/go-diff v1.0.0 // indirect github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect ) diff --git a/revid/audio.go b/revid/audio.go index 5fe6aaef..f34d9736 100644 --- a/revid/audio.go +++ b/revid/audio.go @@ -3,21 +3,20 @@ package revid import ( "errors" "io" - "strconv" "sync" "time" "github.com/yobert/alsa" "bitbucket.org/ausocean/av/codec/pcm" - "bitbucket.org/ausocean/iot/pi/netsender" - "bitbucket.org/ausocean/iot/pi/sds" + "bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) const ( - defaultRate = 48000 + logPath = "/var/log/netsender" + defaultSampRate = 48000 defaultPeriod = 5 // seconds defaultChannels = 2 defaultBits = 16 @@ -26,6 +25,8 @@ const ( rbNextTimeout = 100 * time.Millisecond ) +var log *logger.Logger + // audioInput holds everything we need to know about the audio input stream. // NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds // results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit. @@ -37,7 +38,7 @@ type audioInput struct { // internals dev *alsa.Device // audio input device ab alsa.Buffer // ALSA's buffer - rb *ring.Buffer // our buffer //TODO: change this to output stream, doesn't have to be ring buffer + rb *ring.Buffer // our buffer vs int // our "var sum" to track var changes } @@ -50,19 +51,29 @@ type parameters struct { bits int // sample bit size, 16 by default } -func NewAudioInput() { +// NewAudioInput starts recording audio and returns an AudioInput which the audio can be read from. +func NewAudioInput() io.Reader { + logLevel := int(logger.Debug) + + validLogLevel := true + if logLevel < int(logger.Debug) || logLevel > int(logger.Fatal) { + logLevel = int(logger.Info) + validLogLevel = false + } + + logSender := smartlogger.New(logPath) + log = logger.New(int8(logLevel), &logSender.LogRoller) + log.Log(logger.Info, "log-netsender: Logger Initialized") + if !validLogLevel { + log.Log(logger.Error, "Invalid log level was defaulted to Info") + } + var ac audioInput - // Get audio params and store the current var sum. - vars, err := ac.ns.Vars() - if err != nil { - log.Log(logger.Warning, "netsender.Vars failed; using defaults", "error", err.Error()) - } - ac.params(vars) - ac.vs = ac.ns.VarSum() + ac.setParams() // Open the requested audio device. - err = ac.open() + err := ac.open() if err != nil { log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) } @@ -75,70 +86,20 @@ func NewAudioInput() { go ac.input() - ac.output() - - return stream + return ac } -// params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed. -// See audioInput for a description of the params and their limits. -func (ac *audioInput) params(vars map[string]string) bool { - // We are the only writers to this field - // so we don't need to lock here. +func (ac *audioInput) setParams() { p := ac.parameters - changed := false - mode := vars["mode"] - if p.mode != mode { - p.mode = mode - changed = true - } - source := vars["source"] - if p.source != source { - p.source = source - changed = true - } - val, err := strconv.Atoi(vars["rate"]) - if err != nil { - val = defaultRate - } - if p.rate != val { - p.rate = val - changed = true - } - val, err = strconv.Atoi(vars["period"]) - if err != nil || val < 1 || 5 < val { - val = defaultPeriod - } - if p.period != val { - p.period = val - changed = true - } - val, err = strconv.Atoi(vars["channels"]) - if err != nil || (val != 1 && val != 2) { - val = defaultChannels - } - if p.channels != val { - p.channels = val - changed = true - } - val, err = strconv.Atoi(vars["bits"]) - if err != nil || (val != 16 && val != 32) { - val = defaultBits - } - if p.bits != val { - p.bits = val - changed = true - } + p.rate = defaultSampRate + p.period = defaultPeriod + p.channels = defaultChannels + p.bits = defaultBits - if changed { - ac.mu.Lock() - ac.parameters = p - ac.mu.Unlock() - log.Log(logger.Debug, "Params changed") - } - log.Log(logger.Debug, "Parameters", "mode", p.mode, "source", p.source, "rate", p.rate, "period", p.period, "channels", p.channels, "bits", p.bits) - return changed + ac.mu.Lock() + ac.parameters = p + ac.mu.Unlock() } // open or re-open the recording device with the given name and prepare it to record. @@ -212,12 +173,12 @@ func (ac *audioInput) open() error { // If no easily divisible rate is found, then use the default rate. if !foundRate { - log.Log(logger.Warning, "No available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail.", "rateRequested", ac.rate) - _, err = ac.dev.NegotiateRate(defaultRate) + log.Log(logger.Warning, "Unable to sample at requested rate, default used.", "rateRequested", ac.rate) + _, err = ac.dev.NegotiateRate(defaultSampRate) if err != nil { return err } - log.Log(logger.Debug, "Sample rate set", "rate", defaultRate) + log.Log(logger.Debug, "Sample rate set", "rate", defaultSampRate) } var fmt alsa.FormatType @@ -294,131 +255,10 @@ func (ac *audioInput) input() { } } -// output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests. -// When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0". -// When "B0" is not an input, the poll request happens without any audio data -// (although other inputs may still be present via URL parameters). -// When paused, polling continues but without sending audio (B0) data. -// Sending is throttled so as to complete one pass of this loop approximately every audio period, -// since cycling more frequently is pointless. -// Finally while audio data is sent every audio period, other data is reported only every monitor period. -// This function also handles NetReceiver configuration requests and updating of NetReceiver vars. -func (ac *audioInput) output() { - // Calculate the size of the output data based on wanted channels and rate. - outLen := (((len(ac.ab.Data) / ac.ab.Format.Channels) * ac.channels) / ac.ab.Format.Rate) * ac.rate - buf := make([]byte, outLen) - - mime := "audio/x-wav;codec=pcm;rate=" + strconv.Itoa(ac.rate) + ";channels=" + strconv.Itoa(ac.channels) + ";bits=" + strconv.Itoa(ac.bits) - ip := ac.ns.Param("ip") - mp, err := strconv.Atoi(ac.ns.Param("mp")) - if err != nil { - log.Log(logger.Fatal, "mp not an integer") - } - - report := true // Report non-audio data. - reported := time.Now() // When we last did so. - - for { - var reconfig bool - start := time.Now() - audio := false - var pins []netsender.Pin - - if ac.mode == "Paused" { - - // Only send X data when paused (if any). - if report { - pins = netsender.MakePins(ip, "X") - } - } else { - n, err := read(ac.rb, buf) - if err != nil { - return - } - if n == 0 { - goto sleep - } - if n != len(buf) { - log.Log(logger.Error, "Unexpected length from read", "length", n) - return - } - if report { - pins = netsender.MakePins(ip, "") - } else { - pins = netsender.MakePins(ip, "B") - } - for i, pin := range pins { - if pin.Name == "B0" { - audio = true - pins[i].Value = n - pins[i].Data = buf - pins[i].MimeType = mime - } - } - } - - if !(report || audio) { - goto sleep // nothing to do - } - - // Populate X pins, if any. - for i, pin := range pins { - if pin.Name[0] == 'X' { - err := sds.ReadSystem(&pins[i]) - if err != nil { - log.Log(logger.Warning, "sds.ReadSystem failed", "error", err.Error()) - // Pin.Value defaults to -1 upon error, so OK to continue. - } - } - } - _, reconfig, err = ac.ns.Send(netsender.RequestPoll, pins) - if err != nil { - log.Log(logger.Debug, "netsender.Send failed", "error", err.Error()) - goto sleep - } - if report { - reported = start - report = false - } - if reconfig { - err = ac.ns.Config() - if err != nil { - log.Log(logger.Warning, "netsender.Config failed", "error", err.Error()) - goto sleep - } - ip = ac.ns.Param("ip") - mp, err = strconv.Atoi(ac.ns.Param("mp")) - if err != nil { - log.Log(logger.Fatal, "mp not an integer") - } - } - - if ac.vs != ac.ns.VarSum() { - vars, err := ac.ns.Vars() - if err != nil { - log.Log(logger.Error, "netsender.Vars failed", "error", err.Error()) - goto sleep - } - ac.params(vars) // ToDo: re-open device if audio params have changed. - ac.vs = ac.ns.VarSum() - } - - sleep: - pause := ac.period*1000 - int(time.Since(start).Seconds()*1000) - if pause > 0 { - time.Sleep(time.Duration(pause) * time.Millisecond) - } - if time.Since(reported).Seconds() >= float64(mp) { - report = true - } - - } -} - // read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Any errors returned are unexpected and should be considered fatal. -func read(rb *ring.Buffer, buf []byte) (int, error) { - chunk, err := rb.Next(rbNextTimeout) +func (ac audioInput) Read(p []byte) (n int, err error) { + chunk, err := ac.rb.Next(rbNextTimeout) switch err { case nil: // Do nothing. @@ -432,7 +272,7 @@ func read(rb *ring.Buffer, buf []byte) (int, error) { return 0, err } - n, err := io.ReadFull(rb, buf[:chunk.Len()]) + n, err = io.ReadFull(ac.rb, p[:chunk.Len()]) if err != nil { log.Log(logger.Error, "Unexpected error from ring.Read", "error", err.Error()) return n, err diff --git a/revid/config.go b/revid/config.go index 6f5a0c8a..a38d9956 100644 --- a/revid/config.go +++ b/revid/config.go @@ -62,6 +62,7 @@ type Config struct { Height uint Width uint FrameRate uint + Rate uint HttpAddress string Quantization uint IntraRefreshPeriod uint @@ -138,6 +139,7 @@ const ( defaultOutput = Http defaultPacketization = Flv defaultFrameRate = 25 + defaultRate = 25 defaultWidth = 1280 defaultHeight = 720 defaultIntraRefreshPeriod = 100 diff --git a/revid/revid.go b/revid/revid.go index 64fad64b..75424779 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -668,10 +668,12 @@ func (r *Revid) setupInputForFile() error { return nil } -// startMic is used to start capturing audio from an audio device and processing it. -func startMic() { +// startAudioInput is used to start capturing audio from an audio device and processing it. +func (r *Revid) startAudioInput() error { - go processFrom(stream, r.config.Rate) + ai := NewAudioInput() + + go r.processFrom(ai, time.Second/time.Duration(r.config.Rate)) return nil }