/* NAME audio-netsender - NetSender client for sending audio to NetReceiver AUTHORS Alan Noble Trek Hopton ACKNOWLEDGEMENTS A special thanks to Joel Jensen for his Go ALSA package. LICENSE audio-netsender is Copyright (C) 2018 the Australian Ocean Lab (AusOcean). It is free software: you can redistribute it and/or modify them under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. It is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with https://bitbucket.org/ausocean/iot/src/master/gpl.txt. If not, see http://www.gnu.org/licenses. */ // Package audio-netsender is a NetSender client for sending audio to // NetReceiver. Audio is captured by means of an ALSA recording // device, specified by the NetReceiver "source" variable. It sent via // HTTP to NetReceiver in raw audio form, i.e., as PCM data, where it // is stored as BinaryData objects. Other NetReceiver variables are // "rate", "period", "channels" and "bits", for specifiying the frame // rate (Hz), audio period (seconds), number of channels and sample // bit size respectively. For a description of NetReceiver see // http://netreceiver.appspot.com/help. package main import ( "errors" "flag" "io" "strconv" "sync" "time" yalsa "github.com/yobert/alsa" "bitbucket.org/ausocean/av/codec/pcm" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/sds" "bitbucket.org/ausocean/iot/pi/smartlogger" "bitbucket.org/ausocean/utils/logging" "bitbucket.org/ausocean/utils/pool" ) const ( progName = "audio-netsender" logPath = "/var/log/netsender" retryPeriod = 5 * time.Second defaultFrameRate = 48000 defaultPeriod = 5 // seconds defaultChannels = 2 defaultBits = 16 rbDuration = 300 // seconds rbTimeout = 100 * time.Millisecond rbNextTimeout = 100 * time.Millisecond ) // audioClient holds everything we need to know about the client. // NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds // results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit. type audioClient struct { mu sync.Mutex // mu protects the audioClient. parameters // internals dev *yalsa.Device // audio input device pb pcm.Buffer // Buffer to contain the direct audio from ALSA. buf *pool.Buffer // Ring buffer to contain processed audio ready to be read. ns *netsender.Sender // our NetSender vs int // our "var sum" to track var changes } type parameters struct { mode string // operating mode, either "Normal" or "Paused" source string // name of audio source, or empty for the default source rate int // frame rate in Hz, 44100Hz by default period int // audio period in seconds, 5s by default channels int // number of audio channels, 1 for mono, 2 for stereo bits int // sample bit size, 16 by default } var log logging.Logger func main() { var logLevel int flag.IntVar(&logLevel, "LogLevel", int(logging.Debug), "Specifies log level") flag.Parse() validLogLevel := true if logLevel < int(logging.Debug) || logLevel > int(logging.Fatal) { logLevel = int(logging.Info) validLogLevel = false } logSender := smartlogger.New(logPath) log = logging.New(int8(logLevel), &logSender.LogRoller, true) log.Info("log-netsender: Logger Initialized") if !validLogLevel { log.Error("invalid log level was defaulted to Info") } var ac audioClient var err error ac.ns, err = netsender.New(log, nil, sds.ReadSystem, nil) if err != nil { log.Fatal("netsender.Init failed", "error", err.Error()) } // Get audio params and store the current var sum. vars, err := ac.ns.Vars() if err != nil { log.Warning("netsender.Vars failed; using defaults", "error", err.Error()) } ac.params(vars) ac.vs = ac.ns.VarSum() // Open the requested audio device. err = ac.open() if err != nil { log.Fatal("yalsa.open failed", "error", err.Error()) } // Capture audio in periods of ac.period seconds, and buffer rbDuration seconds in total. ab := ac.dev.NewBufferDuration(time.Second * time.Duration(ac.period)) sf, err := pcm.SFFromString(ab.Format.SampleFormat.String()) if err != nil { log.Error(err.Error()) } cf := pcm.BufferFormat{ SFormat: sf, Channels: uint(ab.Format.Channels), Rate: uint(ab.Format.Rate), } ac.pb = pcm.Buffer{ Format: cf, Data: ab.Data, } cs := pcm.DataSize( uint(ac.parameters.rate), uint(ac.parameters.channels), uint(ac.parameters.bits), float64(ac.parameters.period), ) rbLen := rbDuration / ac.period ac.buf = pool.NewBuffer(int(rbLen), cs, rbTimeout) go ac.input() ac.output() } // params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed. // See audioClient for a description of the params and their limits. func (ac *audioClient) params(vars map[string]string) bool { // We are the only writers to this field // so we don't need to lock here. p := ac.parameters changed := false mode := vars["mode"] if p.mode != mode { p.mode = mode changed = true } source := vars["source"] if p.source != source { p.source = source changed = true } val, err := strconv.Atoi(vars["rate"]) if err != nil { val = defaultFrameRate } if p.rate != val { p.rate = val changed = true } val, err = strconv.Atoi(vars["period"]) if err != nil || val < 1 || 5 < val { val = defaultPeriod } if p.period != val { p.period = val changed = true } val, err = strconv.Atoi(vars["channels"]) if err != nil || (val != 1 && val != 2) { val = defaultChannels } if p.channels != val { p.channels = val changed = true } val, err = strconv.Atoi(vars["bits"]) if err != nil || (val != 16 && val != 32) { val = defaultBits } if p.bits != val { p.bits = val changed = true } if changed { ac.mu.Lock() ac.parameters = p ac.mu.Unlock() log.Debug("params changed") } log.Debug("parameters", "mode", p.mode, "source", p.source, "rate", p.rate, "period", p.period, "channels", p.channels, "bits", p.bits) return changed } // open or re-open the recording device with the given name and prepare it to record. // If name is empty, the first recording device is used. func (ac *audioClient) open() error { if ac.dev != nil { log.Debug("closing", "source", ac.source) ac.dev.Close() ac.dev = nil } log.Debug("opening", "source", ac.source) cards, err := yalsa.OpenCards() if err != nil { return err } defer yalsa.CloseCards(cards) for _, card := range cards { devices, err := card.Devices() if err != nil { return err } for _, dev := range devices { if dev.Type != yalsa.PCM || !dev.Record { continue } if dev.Title == ac.source || ac.source == "" { ac.dev = dev break } } } if ac.dev == nil { return errors.New("No audio source found") } log.Debug("found audio source", "source", ac.dev.Title) // ToDo: time out if Open takes too long. err = ac.dev.Open() if err != nil { return err } log.Debug("opened audio source") _, err = ac.dev.NegotiateChannels(defaultChannels) if err != nil { return err } // Try to negotiate a rate to record in that is divisible by the wanted rate // so that it can be easily downsampled to the wanted rate. // Note: if a card thinks it can record at a rate but can't actually, this can cause a failure. Eg. // the audioinjector is supposed to record at 8000Hz and 16000Hz but it can't due to a firmware issue, // to fix this 8000 and 16000 must be removed from this slice. rates := [8]int{8000, 16000, 32000, 44100, 48000, 88200, 96000, 192000} foundRate := false for r := range rates { if r < ac.rate { continue } if r%ac.rate == 0 { _, err = ac.dev.NegotiateRate(r) if err == nil { foundRate = true log.Debug("sample rate set", "rate", r) break } } } // If no easily divisible rate is found, then use the default rate. if !foundRate { log.Warning("no available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail.", "rateRequested", ac.rate) _, err = ac.dev.NegotiateRate(defaultFrameRate) if err != nil { return err } log.Debug("sample rate set", "rate", defaultFrameRate) } var fmt yalsa.FormatType switch ac.bits { case 16: fmt = yalsa.S16_LE case 32: fmt = yalsa.S32_LE default: return errors.New("unsupported sample bits") } _, err = ac.dev.NegotiateFormat(fmt) if err != nil { return err } // Either 8192 or 16384 bytes is a reasonable ALSA buffer size. _, err = ac.dev.NegotiateBufferSize(8192, 16384) if err != nil { return err } if err = ac.dev.Prepare(); err != nil { return err } log.Debug("successfully negotiated ALSA params") return nil } // input continously records audio and writes it to the ringbuffer. // Re-opens the device and tries again if ASLA returns an error. // Spends a lot of time sleeping in Paused mode. // ToDo: Currently, reading audio and writing to the ringbuffer are synchronous. // Need a way to asynchronously read from the buf, i.e., _while_ it is recording to avoid any gaps. func (ac *audioClient) input() { for { ac.mu.Lock() mode := ac.mode ac.mu.Unlock() if mode == "Paused" { time.Sleep(time.Duration(ac.period) * time.Second) continue } log.Debug("recording audio for period", "seconds", ac.period) ac.mu.Lock() err := ac.dev.Read(ac.pb.Data) ac.mu.Unlock() if err != nil { log.Debug("device.Read failed", "error", err.Error()) ac.mu.Lock() err = ac.open() // re-open if err != nil { log.Fatal("yalsa.open failed", "error", err.Error()) } ac.mu.Unlock() continue } toWrite := ac.formatBuffer() log.Debug("audio format conversion has been performed where needed") var n int n, err = ac.buf.Write(toWrite.Data) switch err { case nil: log.Debug("wrote audio to ringbuffer", "length", n) case pool.ErrDropped: log.Warning("dropped audio") default: log.Error("unexpected ringbuffer error", "error", err.Error()) return } } } // output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests. // When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0". // When "B0" is not an input, the poll request happens without any audio data // (although other inputs may still be present via URL parameters). // When paused, polling continues but without sending audio (B0) data. // Sending is throttled so as to complete one pass of this loop approximately every audio period, // since cycling more frequently is pointless. // Finally while audio data is sent every audio period, other data is reported only every monitor period. // This function also handles NetReceiver configuration requests and updating of NetReceiver vars. func (ac *audioClient) output() { // Calculate the size of the output data based on wanted channels and rate. outLen := (((len(ac.pb.Data) / int(ac.pb.Format.Channels)) * ac.channels) / int(ac.pb.Format.Rate)) * ac.rate buf := make([]byte, outLen) mime := "audio/x-wav;codec=pcm;rate=" + strconv.Itoa(ac.rate) + ";channels=" + strconv.Itoa(ac.channels) + ";bits=" + strconv.Itoa(ac.bits) ip := ac.ns.Param("ip") mp, err := strconv.Atoi(ac.ns.Param("mp")) if err != nil { log.Fatal("mp not an integer") } report := true // Report non-audio data. reported := time.Now() // When we last did so. for { var rc int start := time.Now() audio := false var pins []netsender.Pin if ac.mode == "Paused" { // Only send X data when paused (if any). if report { pins = netsender.MakePins(ip, "X") } } else { n, err := read(ac.buf, buf) if err != nil { return } if n == 0 { goto sleep } if n != len(buf) { log.Error("unexpected length from read", "length", n) return } if report { pins = netsender.MakePins(ip, "") } else { pins = netsender.MakePins(ip, "B") } for i, pin := range pins { if pin.Name == "B0" { audio = true pins[i].Value = n pins[i].Data = buf pins[i].MimeType = mime } } } if !(report || audio) { goto sleep // nothing to do } // Populate X pins, if any. for i, pin := range pins { if pin.Name[0] == 'X' { err := sds.ReadSystem(&pins[i]) if err != nil { log.Warning("sds.ReadSystem failed", "error", err.Error()) // Pin.Value defaults to -1 upon error, so OK to continue. } } } _, rc, err = ac.ns.Send(netsender.RequestPoll, pins) if err != nil { log.Debug("netsender.Send failed", "error", err.Error()) goto sleep } if report { reported = start report = false } if rc == netsender.ResponseUpdate { _, err = ac.ns.Config() if err != nil { log.Warning("netsender.Config failed", "error", err.Error()) goto sleep } ip = ac.ns.Param("ip") mp, err = strconv.Atoi(ac.ns.Param("mp")) if err != nil { log.Fatal("mp not an integer") } } if ac.vs != ac.ns.VarSum() { vars, err := ac.ns.Vars() if err != nil { log.Error("netsender.Vars failed", "error", err.Error()) goto sleep } ac.params(vars) // ToDo: re-open device if audio params have changed. ac.vs = ac.ns.VarSum() } sleep: pause := ac.period*1000 - int(time.Since(start).Seconds()*1000) if pause > 0 { time.Sleep(time.Duration(pause) * time.Millisecond) } if time.Since(reported).Seconds() >= float64(mp) { report = true } } } // read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Any errors returned are unexpected and should be considered fatal. func read(rb *pool.Buffer, buf []byte) (int, error) { chunk, err := rb.Next(rbNextTimeout) switch err { case nil: // Do nothing. case pool.ErrTimeout: return 0, nil case io.EOF: log.Error("unexpected EOF from pool.Next") return 0, io.ErrUnexpectedEOF default: log.Error("unexpected error from pool.Next", "error", err.Error()) return 0, err } n, err := io.ReadFull(rb, buf[:chunk.Len()]) if err != nil { log.Error("unexpected error from pool.Read", "error", err.Error()) return n, err } log.Debug("read audio from ringbuffer", "length", n) return n, nil } // formatBuffer returns a Buffer that has the recording data from the ac's original Buffer but stored // in the desired format specified by the ac's parameters. func (ac *audioClient) formatBuffer() pcm.Buffer { var err error ac.mu.Lock() wantChannels := ac.channels wantRate := ac.rate ac.mu.Unlock() // If nothing needs to be changed, return the original. if int(ac.pb.Format.Channels) == wantChannels && int(ac.pb.Format.Rate) == wantRate { return ac.pb } formatted := pcm.Buffer{Format: ac.pb.Format} bufCopied := false if int(ac.pb.Format.Channels) != wantChannels { // Convert channels. if ac.pb.Format.Channels == 2 && wantChannels == 1 { if formatted, err = pcm.StereoToMono(ac.pb); err != nil { log.Warning("channel conversion failed, audio has remained stereo", "error", err.Error()) } else { formatted.Format.Channels = 1 } bufCopied = true } } if int(ac.pb.Format.Rate) != wantRate { // Convert rate. if bufCopied { formatted, err = pcm.Resample(formatted, uint(wantRate)) } else { formatted, err = pcm.Resample(ac.pb, uint(wantRate)) } if err != nil { log.Warning("rate conversion failed, audio has remained original rate", "error", err.Error()) } else { formatted.Format.Rate = uint(wantRate) } } return formatted }