revid: created an audioInput struct to record audio that acts as a reader for revid

This commit is contained in:
Trek H 2019-04-19 01:20:48 +09:30
parent 8a1f35c0a5
commit 46ca3e2611
4 changed files with 50 additions and 203 deletions

3
go.mod
View File

@ -6,9 +6,12 @@ require (
bitbucket.org/ausocean/iot v1.2.4
bitbucket.org/ausocean/utils v0.0.0-20190408050157-66d3b4d4041e
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/Shopify/toxiproxy v2.1.4+incompatible // indirect
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 // indirect
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884
github.com/mewkiz/flac v1.0.5
github.com/sergi/go-diff v1.0.0 // indirect
github.com/yobert/alsa v0.0.0-20180630182551-d38d89fa843e
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)

View File

@ -3,21 +3,20 @@ package revid
import (
"errors"
"io"
"strconv"
"sync"
"time"
"github.com/yobert/alsa"
"bitbucket.org/ausocean/av/codec/pcm"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/iot/pi/sds"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
const (
defaultRate = 48000
logPath = "/var/log/netsender"
defaultSampRate = 48000
defaultPeriod = 5 // seconds
defaultChannels = 2
defaultBits = 16
@ -26,6 +25,8 @@ const (
rbNextTimeout = 100 * time.Millisecond
)
var log *logger.Logger
// audioInput holds everything we need to know about the audio input stream.
// NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds
// results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit.
@ -37,7 +38,7 @@ type audioInput struct {
// internals
dev *alsa.Device // audio input device
ab alsa.Buffer // ALSA's buffer
rb *ring.Buffer // our buffer //TODO: change this to output stream, doesn't have to be ring buffer
rb *ring.Buffer // our buffer
vs int // our "var sum" to track var changes
}
@ -50,19 +51,29 @@ type parameters struct {
bits int // sample bit size, 16 by default
}
func NewAudioInput() {
// NewAudioInput starts recording audio and returns an AudioInput which the audio can be read from.
func NewAudioInput() io.Reader {
logLevel := int(logger.Debug)
validLogLevel := true
if logLevel < int(logger.Debug) || logLevel > int(logger.Fatal) {
logLevel = int(logger.Info)
validLogLevel = false
}
logSender := smartlogger.New(logPath)
log = logger.New(int8(logLevel), &logSender.LogRoller)
log.Log(logger.Info, "log-netsender: Logger Initialized")
if !validLogLevel {
log.Log(logger.Error, "Invalid log level was defaulted to Info")
}
var ac audioInput
// Get audio params and store the current var sum.
vars, err := ac.ns.Vars()
if err != nil {
log.Log(logger.Warning, "netsender.Vars failed; using defaults", "error", err.Error())
}
ac.params(vars)
ac.vs = ac.ns.VarSum()
ac.setParams()
// Open the requested audio device.
err = ac.open()
err := ac.open()
if err != nil {
log.Log(logger.Fatal, "alsa.open failed", "error", err.Error())
}
@ -75,70 +86,20 @@ func NewAudioInput() {
go ac.input()
ac.output()
return stream
return ac
}
// params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed.
// See audioInput for a description of the params and their limits.
func (ac *audioInput) params(vars map[string]string) bool {
// We are the only writers to this field
// so we don't need to lock here.
func (ac *audioInput) setParams() {
p := ac.parameters
changed := false
mode := vars["mode"]
if p.mode != mode {
p.mode = mode
changed = true
}
source := vars["source"]
if p.source != source {
p.source = source
changed = true
}
val, err := strconv.Atoi(vars["rate"])
if err != nil {
val = defaultRate
}
if p.rate != val {
p.rate = val
changed = true
}
val, err = strconv.Atoi(vars["period"])
if err != nil || val < 1 || 5 < val {
val = defaultPeriod
}
if p.period != val {
p.period = val
changed = true
}
val, err = strconv.Atoi(vars["channels"])
if err != nil || (val != 1 && val != 2) {
val = defaultChannels
}
if p.channels != val {
p.channels = val
changed = true
}
val, err = strconv.Atoi(vars["bits"])
if err != nil || (val != 16 && val != 32) {
val = defaultBits
}
if p.bits != val {
p.bits = val
changed = true
}
p.rate = defaultSampRate
p.period = defaultPeriod
p.channels = defaultChannels
p.bits = defaultBits
if changed {
ac.mu.Lock()
ac.parameters = p
ac.mu.Unlock()
log.Log(logger.Debug, "Params changed")
}
log.Log(logger.Debug, "Parameters", "mode", p.mode, "source", p.source, "rate", p.rate, "period", p.period, "channels", p.channels, "bits", p.bits)
return changed
}
// open or re-open the recording device with the given name and prepare it to record.
@ -212,12 +173,12 @@ func (ac *audioInput) open() error {
// If no easily divisible rate is found, then use the default rate.
if !foundRate {
log.Log(logger.Warning, "No available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail.", "rateRequested", ac.rate)
_, err = ac.dev.NegotiateRate(defaultRate)
log.Log(logger.Warning, "Unable to sample at requested rate, default used.", "rateRequested", ac.rate)
_, err = ac.dev.NegotiateRate(defaultSampRate)
if err != nil {
return err
}
log.Log(logger.Debug, "Sample rate set", "rate", defaultRate)
log.Log(logger.Debug, "Sample rate set", "rate", defaultSampRate)
}
var fmt alsa.FormatType
@ -294,131 +255,10 @@ func (ac *audioInput) input() {
}
}
// output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests.
// When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0".
// When "B0" is not an input, the poll request happens without any audio data
// (although other inputs may still be present via URL parameters).
// When paused, polling continues but without sending audio (B0) data.
// Sending is throttled so as to complete one pass of this loop approximately every audio period,
// since cycling more frequently is pointless.
// Finally while audio data is sent every audio period, other data is reported only every monitor period.
// This function also handles NetReceiver configuration requests and updating of NetReceiver vars.
func (ac *audioInput) output() {
// Calculate the size of the output data based on wanted channels and rate.
outLen := (((len(ac.ab.Data) / ac.ab.Format.Channels) * ac.channels) / ac.ab.Format.Rate) * ac.rate
buf := make([]byte, outLen)
mime := "audio/x-wav;codec=pcm;rate=" + strconv.Itoa(ac.rate) + ";channels=" + strconv.Itoa(ac.channels) + ";bits=" + strconv.Itoa(ac.bits)
ip := ac.ns.Param("ip")
mp, err := strconv.Atoi(ac.ns.Param("mp"))
if err != nil {
log.Log(logger.Fatal, "mp not an integer")
}
report := true // Report non-audio data.
reported := time.Now() // When we last did so.
for {
var reconfig bool
start := time.Now()
audio := false
var pins []netsender.Pin
if ac.mode == "Paused" {
// Only send X data when paused (if any).
if report {
pins = netsender.MakePins(ip, "X")
}
} else {
n, err := read(ac.rb, buf)
if err != nil {
return
}
if n == 0 {
goto sleep
}
if n != len(buf) {
log.Log(logger.Error, "Unexpected length from read", "length", n)
return
}
if report {
pins = netsender.MakePins(ip, "")
} else {
pins = netsender.MakePins(ip, "B")
}
for i, pin := range pins {
if pin.Name == "B0" {
audio = true
pins[i].Value = n
pins[i].Data = buf
pins[i].MimeType = mime
}
}
}
if !(report || audio) {
goto sleep // nothing to do
}
// Populate X pins, if any.
for i, pin := range pins {
if pin.Name[0] == 'X' {
err := sds.ReadSystem(&pins[i])
if err != nil {
log.Log(logger.Warning, "sds.ReadSystem failed", "error", err.Error())
// Pin.Value defaults to -1 upon error, so OK to continue.
}
}
}
_, reconfig, err = ac.ns.Send(netsender.RequestPoll, pins)
if err != nil {
log.Log(logger.Debug, "netsender.Send failed", "error", err.Error())
goto sleep
}
if report {
reported = start
report = false
}
if reconfig {
err = ac.ns.Config()
if err != nil {
log.Log(logger.Warning, "netsender.Config failed", "error", err.Error())
goto sleep
}
ip = ac.ns.Param("ip")
mp, err = strconv.Atoi(ac.ns.Param("mp"))
if err != nil {
log.Log(logger.Fatal, "mp not an integer")
}
}
if ac.vs != ac.ns.VarSum() {
vars, err := ac.ns.Vars()
if err != nil {
log.Log(logger.Error, "netsender.Vars failed", "error", err.Error())
goto sleep
}
ac.params(vars) // ToDo: re-open device if audio params have changed.
ac.vs = ac.ns.VarSum()
}
sleep:
pause := ac.period*1000 - int(time.Since(start).Seconds()*1000)
if pause > 0 {
time.Sleep(time.Duration(pause) * time.Millisecond)
}
if time.Since(reported).Seconds() >= float64(mp) {
report = true
}
}
}
// read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success.
// Any errors returned are unexpected and should be considered fatal.
func read(rb *ring.Buffer, buf []byte) (int, error) {
chunk, err := rb.Next(rbNextTimeout)
func (ac audioInput) Read(p []byte) (n int, err error) {
chunk, err := ac.rb.Next(rbNextTimeout)
switch err {
case nil:
// Do nothing.
@ -432,7 +272,7 @@ func read(rb *ring.Buffer, buf []byte) (int, error) {
return 0, err
}
n, err := io.ReadFull(rb, buf[:chunk.Len()])
n, err = io.ReadFull(ac.rb, p[:chunk.Len()])
if err != nil {
log.Log(logger.Error, "Unexpected error from ring.Read", "error", err.Error())
return n, err

View File

@ -62,6 +62,7 @@ type Config struct {
Height uint
Width uint
FrameRate uint
Rate uint
HttpAddress string
Quantization uint
IntraRefreshPeriod uint
@ -138,6 +139,7 @@ const (
defaultOutput = Http
defaultPacketization = Flv
defaultFrameRate = 25
defaultRate = 25
defaultWidth = 1280
defaultHeight = 720
defaultIntraRefreshPeriod = 100

View File

@ -668,10 +668,12 @@ func (r *Revid) setupInputForFile() error {
return nil
}
// startMic is used to start capturing audio from an audio device and processing it.
func startMic() {
// startAudioInput is used to start capturing audio from an audio device and processing it.
func (r *Revid) startAudioInput() error {
go processFrom(stream, r.config.Rate)
ai := NewAudioInput()
go r.processFrom(ai, time.Second/time.Duration(r.config.Rate))
return nil
}