package revid import ( "errors" "io" "strconv" "sync" "time" "github.com/yobert/alsa" "bitbucket.org/ausocean/av/codec/pcm" "bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/sds" "bitbucket.org/ausocean/utils/logger" "bitbucket.org/ausocean/utils/ring" ) const ( defaultRate = 48000 defaultPeriod = 5 // seconds defaultChannels = 2 defaultBits = 16 rbDuration = 300 // seconds rbTimeout = 100 * time.Millisecond rbNextTimeout = 100 * time.Millisecond ) // audioInput holds everything we need to know about the audio input stream. // NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds // results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit. type audioInput struct { mu sync.Mutex // mu protects the audioInput. parameters // internals dev *alsa.Device // audio input device ab alsa.Buffer // ALSA's buffer rb *ring.Buffer // our buffer //TODO: change this to output stream, doesn't have to be ring buffer vs int // our "var sum" to track var changes } type parameters struct { mode string // operating mode, either "Normal" or "Paused" source string // name of audio source, or empty for the default source rate int // frame rate in Hz, 44100Hz by default period int // audio period in seconds, 5s by default channels int // number of audio channels, 1 for mono, 2 for stereo bits int // sample bit size, 16 by default } func NewAudioInput() { var ac audioInput // Get audio params and store the current var sum. vars, err := ac.ns.Vars() if err != nil { log.Log(logger.Warning, "netsender.Vars failed; using defaults", "error", err.Error()) } ac.params(vars) ac.vs = ac.ns.VarSum() // Open the requested audio device. err = ac.open() if err != nil { log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) } // Capture audio in periods of ac.period seconds, and buffer rbDuration seconds in total. ac.ab = ac.dev.NewBufferDuration(time.Second * time.Duration(ac.period)) recSize := (((len(ac.ab.Data) / ac.dev.BufferFormat().Channels) * ac.channels) / ac.dev.BufferFormat().Rate) * ac.rate rbLen := rbDuration / ac.period ac.rb = ring.NewBuffer(rbLen, recSize, rbTimeout) go ac.input() ac.output() return stream } // params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed. // See audioInput for a description of the params and their limits. func (ac *audioInput) params(vars map[string]string) bool { // We are the only writers to this field // so we don't need to lock here. p := ac.parameters changed := false mode := vars["mode"] if p.mode != mode { p.mode = mode changed = true } source := vars["source"] if p.source != source { p.source = source changed = true } val, err := strconv.Atoi(vars["rate"]) if err != nil { val = defaultRate } if p.rate != val { p.rate = val changed = true } val, err = strconv.Atoi(vars["period"]) if err != nil || val < 1 || 5 < val { val = defaultPeriod } if p.period != val { p.period = val changed = true } val, err = strconv.Atoi(vars["channels"]) if err != nil || (val != 1 && val != 2) { val = defaultChannels } if p.channels != val { p.channels = val changed = true } val, err = strconv.Atoi(vars["bits"]) if err != nil || (val != 16 && val != 32) { val = defaultBits } if p.bits != val { p.bits = val changed = true } if changed { ac.mu.Lock() ac.parameters = p ac.mu.Unlock() log.Log(logger.Debug, "Params changed") } log.Log(logger.Debug, "Parameters", "mode", p.mode, "source", p.source, "rate", p.rate, "period", p.period, "channels", p.channels, "bits", p.bits) return changed } // open or re-open the recording device with the given name and prepare it to record. // If name is empty, the first recording device is used. func (ac *audioInput) open() error { if ac.dev != nil { log.Log(logger.Debug, "Closing", "source", ac.source) ac.dev.Close() ac.dev = nil } log.Log(logger.Debug, "Opening", "source", ac.source) cards, err := alsa.OpenCards() if err != nil { return err } defer alsa.CloseCards(cards) for _, card := range cards { devices, err := card.Devices() if err != nil { return err } for _, dev := range devices { if dev.Type != alsa.PCM || !dev.Record { continue } if dev.Title == ac.source || ac.source == "" { ac.dev = dev break } } } if ac.dev == nil { return errors.New("No audio source found") } log.Log(logger.Debug, "Found audio source", "source", ac.dev.Title) // ToDo: time out if Open takes too long. err = ac.dev.Open() if err != nil { return err } log.Log(logger.Debug, "Opened audio source") _, err = ac.dev.NegotiateChannels(defaultChannels) if err != nil { return err } // Try to negotiate a rate to record in that is divisible by the wanted rate // so that it can be easily downsampled to the wanted rate. // Note: if a card thinks it can record at a rate but can't actually, this can cause a failure. Eg. // the audioinjector is supposed to record at 8000Hz and 16000Hz but it can't due to a firmware issue, // to fix this 8000 and 16000 must be removed from this slice. rates := [8]int{8000, 16000, 32000, 44100, 48000, 88200, 96000, 192000} foundRate := false for i := 0; i < len(rates) && !foundRate; i++ { if rates[i] < ac.rate { continue } if rates[i]%ac.rate == 0 { _, err = ac.dev.NegotiateRate(rates[i]) if err == nil { foundRate = true log.Log(logger.Debug, "Sample rate set", "rate", rates[i]) } } } // If no easily divisible rate is found, then use the default rate. if !foundRate { log.Log(logger.Warning, "No available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail.", "rateRequested", ac.rate) _, err = ac.dev.NegotiateRate(defaultRate) if err != nil { return err } log.Log(logger.Debug, "Sample rate set", "rate", defaultRate) } var fmt alsa.FormatType switch ac.bits { case 16: fmt = alsa.S16_LE case 32: fmt = alsa.S32_LE default: return errors.New("Unsupported sample bits") } _, err = ac.dev.NegotiateFormat(fmt) if err != nil { return err } // Either 8192 or 16384 bytes is a reasonable ALSA buffer size. _, err = ac.dev.NegotiateBufferSize(8192, 16384) if err != nil { return err } if err = ac.dev.Prepare(); err != nil { return err } log.Log(logger.Debug, "Successfully negotiated ALSA params") return nil } // input continously records audio and writes it to the ringbuffer. // Re-opens the device and tries again if ASLA returns an error. // Spends a lot of time sleeping in Paused mode. // ToDo: Currently, reading audio and writing to the ringbuffer are synchronous. // Need a way to asynchronously read from the ALSA buffer, i.e., _while_ it is recording to avoid any gaps. func (ac *audioInput) input() { for { ac.mu.Lock() mode := ac.mode ac.mu.Unlock() if mode == "Paused" { time.Sleep(time.Duration(ac.period) * time.Second) continue } log.Log(logger.Debug, "Recording audio for period", "seconds", ac.period) ac.mu.Lock() err := ac.dev.Read(ac.ab.Data) ac.mu.Unlock() if err != nil { log.Log(logger.Debug, "Device.Read failed", "error", err.Error()) ac.mu.Lock() err = ac.open() // re-open if err != nil { log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) } ac.mu.Unlock() continue } toWrite := ac.formatBuffer() log.Log(logger.Debug, "Audio format conversion has been performed where needed") var n int n, err = ac.rb.Write(toWrite.Data) switch err { case nil: log.Log(logger.Debug, "Wrote audio to ringbuffer", "length", n) case ring.ErrDropped: log.Log(logger.Warning, "Dropped audio") default: log.Log(logger.Error, "Unexpected ringbuffer error", "error", err.Error()) return } } } // output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests. // When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0". // When "B0" is not an input, the poll request happens without any audio data // (although other inputs may still be present via URL parameters). // When paused, polling continues but without sending audio (B0) data. // Sending is throttled so as to complete one pass of this loop approximately every audio period, // since cycling more frequently is pointless. // Finally while audio data is sent every audio period, other data is reported only every monitor period. // This function also handles NetReceiver configuration requests and updating of NetReceiver vars. func (ac *audioInput) output() { // Calculate the size of the output data based on wanted channels and rate. outLen := (((len(ac.ab.Data) / ac.ab.Format.Channels) * ac.channels) / ac.ab.Format.Rate) * ac.rate buf := make([]byte, outLen) mime := "audio/x-wav;codec=pcm;rate=" + strconv.Itoa(ac.rate) + ";channels=" + strconv.Itoa(ac.channels) + ";bits=" + strconv.Itoa(ac.bits) ip := ac.ns.Param("ip") mp, err := strconv.Atoi(ac.ns.Param("mp")) if err != nil { log.Log(logger.Fatal, "mp not an integer") } report := true // Report non-audio data. reported := time.Now() // When we last did so. for { var reconfig bool start := time.Now() audio := false var pins []netsender.Pin if ac.mode == "Paused" { // Only send X data when paused (if any). if report { pins = netsender.MakePins(ip, "X") } } else { n, err := read(ac.rb, buf) if err != nil { return } if n == 0 { goto sleep } if n != len(buf) { log.Log(logger.Error, "Unexpected length from read", "length", n) return } if report { pins = netsender.MakePins(ip, "") } else { pins = netsender.MakePins(ip, "B") } for i, pin := range pins { if pin.Name == "B0" { audio = true pins[i].Value = n pins[i].Data = buf pins[i].MimeType = mime } } } if !(report || audio) { goto sleep // nothing to do } // Populate X pins, if any. for i, pin := range pins { if pin.Name[0] == 'X' { err := sds.ReadSystem(&pins[i]) if err != nil { log.Log(logger.Warning, "sds.ReadSystem failed", "error", err.Error()) // Pin.Value defaults to -1 upon error, so OK to continue. } } } _, reconfig, err = ac.ns.Send(netsender.RequestPoll, pins) if err != nil { log.Log(logger.Debug, "netsender.Send failed", "error", err.Error()) goto sleep } if report { reported = start report = false } if reconfig { err = ac.ns.Config() if err != nil { log.Log(logger.Warning, "netsender.Config failed", "error", err.Error()) goto sleep } ip = ac.ns.Param("ip") mp, err = strconv.Atoi(ac.ns.Param("mp")) if err != nil { log.Log(logger.Fatal, "mp not an integer") } } if ac.vs != ac.ns.VarSum() { vars, err := ac.ns.Vars() if err != nil { log.Log(logger.Error, "netsender.Vars failed", "error", err.Error()) goto sleep } ac.params(vars) // ToDo: re-open device if audio params have changed. ac.vs = ac.ns.VarSum() } sleep: pause := ac.period*1000 - int(time.Since(start).Seconds()*1000) if pause > 0 { time.Sleep(time.Duration(pause) * time.Millisecond) } if time.Since(reported).Seconds() >= float64(mp) { report = true } } } // read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Any errors returned are unexpected and should be considered fatal. func read(rb *ring.Buffer, buf []byte) (int, error) { chunk, err := rb.Next(rbNextTimeout) switch err { case nil: // Do nothing. case ring.ErrTimeout: return 0, nil case io.EOF: log.Log(logger.Error, "Unexpected EOF from ring.Next") return 0, io.ErrUnexpectedEOF default: log.Log(logger.Error, "Unexpected error from ring.Next", "error", err.Error()) return 0, err } n, err := io.ReadFull(rb, buf[:chunk.Len()]) if err != nil { log.Log(logger.Error, "Unexpected error from ring.Read", "error", err.Error()) return n, err } log.Log(logger.Debug, "Read audio from ringbuffer", "length", n) return n, nil } // formatBuffer returns an ALSA buffer that has the recording data from the ac's original ALSA buffer but stored // in the desired format specified by the ac's parameters. func (ac *audioInput) formatBuffer() alsa.Buffer { var err error ac.mu.Lock() wantChannels := ac.channels wantRate := ac.rate ac.mu.Unlock() // If nothing needs to be changed, return the original. if ac.ab.Format.Channels == wantChannels && ac.ab.Format.Rate == wantRate { return ac.ab } formatted := alsa.Buffer{Format: ac.ab.Format} bufCopied := false if ac.ab.Format.Channels != wantChannels { // Convert channels. if ac.ab.Format.Channels == 2 && wantChannels == 1 { if formatted.Data, err = pcm.StereoToMono(ac.ab); err != nil { log.Log(logger.Warning, "Channel conversion failed, audio has remained stereo", "error", err.Error()) } else { formatted.Format.Channels = 1 } bufCopied = true } } if ac.ab.Format.Rate != wantRate { // Convert rate. if bufCopied { formatted.Data, err = pcm.Resample(formatted, wantRate) } else { formatted.Data, err = pcm.Resample(ac.ab, wantRate) } if err != nil { log.Log(logger.Warning, "Rate conversion failed, audio has remained original rate", "error", err.Error()) } else { formatted.Format.Rate = wantRate } } return formatted }