2019-09-14 12:53:44 +03:00
/ *
NAME
audio - netsender - NetSender client for sending audio to NetReceiver
AUTHORS
Alan Noble < alan @ ausocean . org >
Trek Hopton < trek @ ausocean . org >
ACKNOWLEDGEMENTS
A special thanks to Joel Jensen for his Go ALSA package .
LICENSE
audio - netsender is Copyright ( C ) 2018 the Australian Ocean Lab ( AusOcean ) .
It is free software : you can redistribute it and / or modify them under
the terms of the GNU General Public License as published by the
Free Software Foundation , either version 3 of the License , or ( at your
option ) any later version .
It is distributed in the hope that it will be useful , but WITHOUT
ANY WARRANTY ; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE . See the GNU General Public License
for more details .
You should have received a copy of the GNU General Public License
along with https : //bitbucket.org/ausocean/iot/src/master/gpl.txt.
If not , see http : //www.gnu.org/licenses.
* /
// audio-netsender is a NetSender client for sending audio to
// NetReceiver. Audio is captured by means of an ALSA recording
// device, specified by the NetReceiver "source" variable. It sent via
// HTTP to NetReceiver in raw audio form, i.e., as PCM data, where it
// is stored as BinaryData objects. Other NetReceiver variables are
// "rate", "period", "channels" and "bits", for specifiying the frame
// rate (Hz), audio period (seconds), number of channels and sample
// bit size respectively. For a description of NetReceiver see
// http://netreceiver.appspot.com/help.
package main
import (
"errors"
"flag"
"io"
"strconv"
"sync"
"time"
2019-11-12 13:54:08 +03:00
yalsa "github.com/yobert/alsa"
2019-09-14 12:53:44 +03:00
"bitbucket.org/ausocean/av/codec/pcm"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/iot/pi/sds"
"bitbucket.org/ausocean/iot/pi/smartlogger"
"bitbucket.org/ausocean/utils/logger"
"bitbucket.org/ausocean/utils/ring"
)
const (
progName = "audio-netsender"
logPath = "/var/log/netsender"
retryPeriod = 5 * time . Second
defaultFrameRate = 48000
defaultPeriod = 5 // seconds
defaultChannels = 2
defaultBits = 16
rbDuration = 300 // seconds
rbTimeout = 100 * time . Millisecond
rbNextTimeout = 100 * time . Millisecond
)
// audioClient holds everything we need to know about the client.
// NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds
// results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit.
type audioClient struct {
mu sync . Mutex // mu protects the audioClient.
parameters
// internals
2019-11-13 09:24:41 +03:00
dev * yalsa . Device // audio input device
2019-11-13 09:41:35 +03:00
pb pcm . Buffer // Buffer to contain the direct audio from ALSA.
rb * ring . Buffer // Ring buffer to contain processed audio ready to be read.
2019-11-13 09:24:41 +03:00
ns * netsender . Sender // our NetSender
vs int // our "var sum" to track var changes
2019-09-14 12:53:44 +03:00
}
type parameters struct {
mode string // operating mode, either "Normal" or "Paused"
source string // name of audio source, or empty for the default source
rate int // frame rate in Hz, 44100Hz by default
period int // audio period in seconds, 5s by default
channels int // number of audio channels, 1 for mono, 2 for stereo
bits int // sample bit size, 16 by default
}
var log * logger . Logger
func main ( ) {
var logLevel int
flag . IntVar ( & logLevel , "LogLevel" , int ( logger . Debug ) , "Specifies log level" )
flag . Parse ( )
validLogLevel := true
if logLevel < int ( logger . Debug ) || logLevel > int ( logger . Fatal ) {
logLevel = int ( logger . Info )
validLogLevel = false
}
logSender := smartlogger . New ( logPath )
2019-10-23 05:29:12 +03:00
log = logger . New ( int8 ( logLevel ) , & logSender . LogRoller , true )
2019-09-14 12:53:44 +03:00
log . Log ( logger . Info , "log-netsender: Logger Initialized" )
if ! validLogLevel {
log . Log ( logger . Error , "invalid log level was defaulted to Info" )
}
var ac audioClient
var err error
ac . ns , err = netsender . New ( log , nil , sds . ReadSystem , nil )
if err != nil {
log . Log ( logger . Fatal , "netsender.Init failed" , "error" , err . Error ( ) )
}
// Get audio params and store the current var sum.
vars , err := ac . ns . Vars ( )
if err != nil {
log . Log ( logger . Warning , "netsender.Vars failed; using defaults" , "error" , err . Error ( ) )
}
ac . params ( vars )
ac . vs = ac . ns . VarSum ( )
// Open the requested audio device.
err = ac . open ( )
if err != nil {
2019-11-12 13:54:08 +03:00
log . Log ( logger . Fatal , "yalsa.open failed" , "error" , err . Error ( ) )
2019-09-14 12:53:44 +03:00
}
// Capture audio in periods of ac.period seconds, and buffer rbDuration seconds in total.
2019-11-12 13:54:08 +03:00
ab := ac . dev . NewBufferDuration ( time . Second * time . Duration ( ac . period ) )
sf , err := pcm . SFFromString ( ab . Format . SampleFormat . String ( ) )
if err != nil {
log . Log ( logger . Error , err . Error ( ) )
}
2019-11-13 09:24:41 +03:00
cf := pcm . BufferFormat {
2019-11-12 13:54:08 +03:00
SFormat : sf ,
Channels : ab . Format . Channels ,
Rate : ab . Format . Rate ,
}
2019-11-13 09:41:35 +03:00
ac . pb = pcm . Buffer {
2019-11-12 13:54:08 +03:00
Format : cf ,
Data : ab . Data ,
}
2019-11-13 12:26:13 +03:00
cs := pcm . DataSize ( ac . parameters . rate , ac . parameters . channels , ac . parameters . bits , float64 ( ac . parameters . period ) , 0 )
2019-09-14 12:53:44 +03:00
rbLen := rbDuration / ac . period
2019-11-13 12:26:13 +03:00
ac . rb = ring . NewBuffer ( rbLen , cs , rbTimeout )
2019-09-14 12:53:44 +03:00
go ac . input ( )
ac . output ( )
}
// params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed.
// See audioClient for a description of the params and their limits.
func ( ac * audioClient ) params ( vars map [ string ] string ) bool {
// We are the only writers to this field
// so we don't need to lock here.
p := ac . parameters
changed := false
mode := vars [ "mode" ]
if p . mode != mode {
p . mode = mode
changed = true
}
source := vars [ "source" ]
if p . source != source {
p . source = source
changed = true
}
val , err := strconv . Atoi ( vars [ "rate" ] )
if err != nil {
val = defaultFrameRate
}
if p . rate != val {
p . rate = val
changed = true
}
val , err = strconv . Atoi ( vars [ "period" ] )
if err != nil || val < 1 || 5 < val {
val = defaultPeriod
}
if p . period != val {
p . period = val
changed = true
}
val , err = strconv . Atoi ( vars [ "channels" ] )
if err != nil || ( val != 1 && val != 2 ) {
val = defaultChannels
}
if p . channels != val {
p . channels = val
changed = true
}
val , err = strconv . Atoi ( vars [ "bits" ] )
if err != nil || ( val != 16 && val != 32 ) {
val = defaultBits
}
if p . bits != val {
p . bits = val
changed = true
}
if changed {
ac . mu . Lock ( )
ac . parameters = p
ac . mu . Unlock ( )
log . Log ( logger . Debug , "params changed" )
}
log . Log ( logger . Debug , "parameters" , "mode" , p . mode , "source" , p . source , "rate" , p . rate , "period" , p . period , "channels" , p . channels , "bits" , p . bits )
return changed
}
// open or re-open the recording device with the given name and prepare it to record.
// If name is empty, the first recording device is used.
func ( ac * audioClient ) open ( ) error {
if ac . dev != nil {
log . Log ( logger . Debug , "closing" , "source" , ac . source )
ac . dev . Close ( )
ac . dev = nil
}
log . Log ( logger . Debug , "opening" , "source" , ac . source )
2019-11-12 13:54:08 +03:00
cards , err := yalsa . OpenCards ( )
2019-09-14 12:53:44 +03:00
if err != nil {
return err
}
2019-11-12 13:54:08 +03:00
defer yalsa . CloseCards ( cards )
2019-09-14 12:53:44 +03:00
for _ , card := range cards {
devices , err := card . Devices ( )
if err != nil {
return err
}
for _ , dev := range devices {
2019-11-12 13:54:08 +03:00
if dev . Type != yalsa . PCM || ! dev . Record {
2019-09-14 12:53:44 +03:00
continue
}
if dev . Title == ac . source || ac . source == "" {
ac . dev = dev
break
}
}
}
if ac . dev == nil {
return errors . New ( "No audio source found" )
}
log . Log ( logger . Debug , "found audio source" , "source" , ac . dev . Title )
// ToDo: time out if Open takes too long.
err = ac . dev . Open ( )
if err != nil {
return err
}
log . Log ( logger . Debug , "opened audio source" )
_ , err = ac . dev . NegotiateChannels ( defaultChannels )
if err != nil {
return err
}
// Try to negotiate a rate to record in that is divisible by the wanted rate
// so that it can be easily downsampled to the wanted rate.
// Note: if a card thinks it can record at a rate but can't actually, this can cause a failure. Eg.
// the audioinjector is supposed to record at 8000Hz and 16000Hz but it can't due to a firmware issue,
// to fix this 8000 and 16000 must be removed from this slice.
rates := [ 8 ] int { 8000 , 16000 , 32000 , 44100 , 48000 , 88200 , 96000 , 192000 }
foundRate := false
for r := range rates {
if r < ac . rate {
continue
}
if r % ac . rate == 0 {
_ , err = ac . dev . NegotiateRate ( r )
if err == nil {
foundRate = true
log . Log ( logger . Debug , "sample rate set" , "rate" , r )
break
}
}
}
// If no easily divisible rate is found, then use the default rate.
if ! foundRate {
log . Log ( logger . Warning , "no available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail." , "rateRequested" , ac . rate )
_ , err = ac . dev . NegotiateRate ( defaultFrameRate )
if err != nil {
return err
}
log . Log ( logger . Debug , "sample rate set" , "rate" , defaultFrameRate )
}
2019-11-12 13:54:08 +03:00
var fmt yalsa . FormatType
2019-09-14 12:53:44 +03:00
switch ac . bits {
case 16 :
2019-11-12 13:54:08 +03:00
fmt = yalsa . S16_LE
2019-09-14 12:53:44 +03:00
case 32 :
2019-11-12 13:54:08 +03:00
fmt = yalsa . S32_LE
2019-09-14 12:53:44 +03:00
default :
return errors . New ( "unsupported sample bits" )
}
_ , err = ac . dev . NegotiateFormat ( fmt )
if err != nil {
return err
}
// Either 8192 or 16384 bytes is a reasonable ALSA buffer size.
_ , err = ac . dev . NegotiateBufferSize ( 8192 , 16384 )
if err != nil {
return err
}
if err = ac . dev . Prepare ( ) ; err != nil {
return err
}
log . Log ( logger . Debug , "successfully negotiated ALSA params" )
return nil
}
// input continously records audio and writes it to the ringbuffer.
// Re-opens the device and tries again if ASLA returns an error.
// Spends a lot of time sleeping in Paused mode.
// ToDo: Currently, reading audio and writing to the ringbuffer are synchronous.
2019-11-13 09:24:41 +03:00
// Need a way to asynchronously read from the buf, i.e., _while_ it is recording to avoid any gaps.
2019-09-14 12:53:44 +03:00
func ( ac * audioClient ) input ( ) {
for {
ac . mu . Lock ( )
mode := ac . mode
ac . mu . Unlock ( )
if mode == "Paused" {
time . Sleep ( time . Duration ( ac . period ) * time . Second )
continue
}
log . Log ( logger . Debug , "recording audio for period" , "seconds" , ac . period )
ac . mu . Lock ( )
2019-11-13 09:41:35 +03:00
err := ac . dev . Read ( ac . pb . Data )
2019-09-14 12:53:44 +03:00
ac . mu . Unlock ( )
if err != nil {
log . Log ( logger . Debug , "device.Read failed" , "error" , err . Error ( ) )
ac . mu . Lock ( )
err = ac . open ( ) // re-open
if err != nil {
2019-11-12 13:54:08 +03:00
log . Log ( logger . Fatal , "yalsa.open failed" , "error" , err . Error ( ) )
2019-09-14 12:53:44 +03:00
}
ac . mu . Unlock ( )
continue
}
toWrite := ac . formatBuffer ( )
log . Log ( logger . Debug , "audio format conversion has been performed where needed" )
var n int
n , err = ac . rb . Write ( toWrite . Data )
switch err {
case nil :
log . Log ( logger . Debug , "wrote audio to ringbuffer" , "length" , n )
case ring . ErrDropped :
log . Log ( logger . Warning , "dropped audio" )
default :
log . Log ( logger . Error , "unexpected ringbuffer error" , "error" , err . Error ( ) )
return
}
}
}
// output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests.
// When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0".
// When "B0" is not an input, the poll request happens without any audio data
// (although other inputs may still be present via URL parameters).
// When paused, polling continues but without sending audio (B0) data.
// Sending is throttled so as to complete one pass of this loop approximately every audio period,
// since cycling more frequently is pointless.
// Finally while audio data is sent every audio period, other data is reported only every monitor period.
// This function also handles NetReceiver configuration requests and updating of NetReceiver vars.
func ( ac * audioClient ) output ( ) {
// Calculate the size of the output data based on wanted channels and rate.
2019-11-13 09:41:35 +03:00
outLen := ( ( ( len ( ac . pb . Data ) / ac . pb . Format . Channels ) * ac . channels ) / ac . pb . Format . Rate ) * ac . rate
2019-09-14 12:53:44 +03:00
buf := make ( [ ] byte , outLen )
mime := "audio/x-wav;codec=pcm;rate=" + strconv . Itoa ( ac . rate ) + ";channels=" + strconv . Itoa ( ac . channels ) + ";bits=" + strconv . Itoa ( ac . bits )
ip := ac . ns . Param ( "ip" )
mp , err := strconv . Atoi ( ac . ns . Param ( "mp" ) )
if err != nil {
log . Log ( logger . Fatal , "mp not an integer" )
}
report := true // Report non-audio data.
reported := time . Now ( ) // When we last did so.
for {
2019-09-14 15:35:51 +03:00
var rc int
2019-09-14 12:53:44 +03:00
start := time . Now ( )
audio := false
var pins [ ] netsender . Pin
if ac . mode == "Paused" {
// Only send X data when paused (if any).
if report {
pins = netsender . MakePins ( ip , "X" )
}
} else {
n , err := read ( ac . rb , buf )
if err != nil {
return
}
if n == 0 {
goto sleep
}
if n != len ( buf ) {
log . Log ( logger . Error , "unexpected length from read" , "length" , n )
return
}
if report {
pins = netsender . MakePins ( ip , "" )
} else {
pins = netsender . MakePins ( ip , "B" )
}
for i , pin := range pins {
if pin . Name == "B0" {
audio = true
pins [ i ] . Value = n
pins [ i ] . Data = buf
pins [ i ] . MimeType = mime
}
}
}
if ! ( report || audio ) {
goto sleep // nothing to do
}
// Populate X pins, if any.
for i , pin := range pins {
if pin . Name [ 0 ] == 'X' {
err := sds . ReadSystem ( & pins [ i ] )
if err != nil {
log . Log ( logger . Warning , "sds.ReadSystem failed" , "error" , err . Error ( ) )
// Pin.Value defaults to -1 upon error, so OK to continue.
}
}
}
_ , rc , err = ac . ns . Send ( netsender . RequestPoll , pins )
if err != nil {
log . Log ( logger . Debug , "netsender.Send failed" , "error" , err . Error ( ) )
goto sleep
}
if report {
reported = start
report = false
}
2019-09-14 15:35:51 +03:00
if rc == netsender . ResponseUpdate {
_ , err = ac . ns . Config ( )
2019-09-14 12:53:44 +03:00
if err != nil {
log . Log ( logger . Warning , "netsender.Config failed" , "error" , err . Error ( ) )
goto sleep
}
ip = ac . ns . Param ( "ip" )
mp , err = strconv . Atoi ( ac . ns . Param ( "mp" ) )
if err != nil {
log . Log ( logger . Fatal , "mp not an integer" )
}
}
if ac . vs != ac . ns . VarSum ( ) {
vars , err := ac . ns . Vars ( )
if err != nil {
log . Log ( logger . Error , "netsender.Vars failed" , "error" , err . Error ( ) )
goto sleep
}
ac . params ( vars ) // ToDo: re-open device if audio params have changed.
ac . vs = ac . ns . VarSum ( )
}
sleep :
pause := ac . period * 1000 - int ( time . Since ( start ) . Seconds ( ) * 1000 )
if pause > 0 {
time . Sleep ( time . Duration ( pause ) * time . Millisecond )
}
if time . Since ( reported ) . Seconds ( ) >= float64 ( mp ) {
report = true
}
}
}
// read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success.
// Any errors returned are unexpected and should be considered fatal.
func read ( rb * ring . Buffer , buf [ ] byte ) ( int , error ) {
chunk , err := rb . Next ( rbNextTimeout )
switch err {
case nil :
// Do nothing.
case ring . ErrTimeout :
return 0 , nil
case io . EOF :
log . Log ( logger . Error , "unexpected EOF from ring.Next" )
return 0 , io . ErrUnexpectedEOF
default :
log . Log ( logger . Error , "unexpected error from ring.Next" , "error" , err . Error ( ) )
return 0 , err
}
n , err := io . ReadFull ( rb , buf [ : chunk . Len ( ) ] )
if err != nil {
log . Log ( logger . Error , "unexpected error from ring.Read" , "error" , err . Error ( ) )
return n , err
}
log . Log ( logger . Debug , "read audio from ringbuffer" , "length" , n )
return n , nil
}
2019-11-13 09:24:41 +03:00
// formatBuffer returns a Buffer that has the recording data from the ac's original Buffer but stored
2019-09-14 12:53:44 +03:00
// in the desired format specified by the ac's parameters.
2019-11-13 09:24:41 +03:00
func ( ac * audioClient ) formatBuffer ( ) pcm . Buffer {
2019-09-14 12:53:44 +03:00
var err error
ac . mu . Lock ( )
wantChannels := ac . channels
wantRate := ac . rate
ac . mu . Unlock ( )
// If nothing needs to be changed, return the original.
2019-11-13 09:41:35 +03:00
if ac . pb . Format . Channels == wantChannels && ac . pb . Format . Rate == wantRate {
return ac . pb
2019-09-14 12:53:44 +03:00
}
2019-11-13 09:41:35 +03:00
formatted := pcm . Buffer { Format : ac . pb . Format }
2019-09-14 12:53:44 +03:00
bufCopied := false
2019-11-13 09:41:35 +03:00
if ac . pb . Format . Channels != wantChannels {
2019-09-14 12:53:44 +03:00
// Convert channels.
2019-11-13 09:41:35 +03:00
if ac . pb . Format . Channels == 2 && wantChannels == 1 {
if formatted , err = pcm . StereoToMono ( ac . pb ) ; err != nil {
2019-09-14 12:53:44 +03:00
log . Log ( logger . Warning , "channel conversion failed, audio has remained stereo" , "error" , err . Error ( ) )
} else {
formatted . Format . Channels = 1
}
bufCopied = true
}
}
2019-11-13 09:41:35 +03:00
if ac . pb . Format . Rate != wantRate {
2019-09-14 12:53:44 +03:00
// Convert rate.
if bufCopied {
2019-09-14 12:54:47 +03:00
formatted , err = pcm . Resample ( formatted , wantRate )
2019-09-14 12:53:44 +03:00
} else {
2019-11-13 09:41:35 +03:00
formatted , err = pcm . Resample ( ac . pb , wantRate )
2019-09-14 12:53:44 +03:00
}
if err != nil {
log . Log ( logger . Warning , "rate conversion failed, audio has remained original rate" , "error" , err . Error ( ) )
} else {
formatted . Format . Rate = wantRate
}
}
return formatted
}