revid: made start and stop change audio device state

This commit is contained in:
Trek H 2019-05-21 00:45:54 +09:30
parent 76edcfe8ed
commit 7ba9d023a3
4 changed files with 54 additions and 30 deletions

View File

@ -46,15 +46,21 @@ const (
rbNextTimeout = 100 * time.Millisecond rbNextTimeout = 100 * time.Millisecond
) )
const (
running = iota
paused
stopped
)
var log *logger.Logger var log *logger.Logger
// AudioInput holds everything we need to know about the audio input stream. // audioDevice holds everything we need to know about the audio input stream.
// Note: At 44100 Hz sample rate, 2 channels and 16-bit samples, a period of 5 seconds // Note: At 44100 Hz sample rate, 2 channels and 16-bit samples, a period of 5 seconds
// results in PCM data chunks of 882000 bytes. A longer period exceeds datastore's 1MB blob limit. // results in PCM data chunks of 882000 bytes. A longer period exceeds datastore's 1MB blob limit.
type AudioInput struct { type audioDevice struct {
mu sync.Mutex mu sync.Mutex
source string // Name of audio source, or empty for the default source. source string // Name of audio source, or empty for the default source.
mode string // Operating mode, either "Running", "Paused", or "Stopped". mode uint8 // Operating mode, either running, paused, or stopped.
dev *alsa.Device // Audio input device. dev *alsa.Device // Audio input device.
ab alsa.Buffer // ALSA's buffer. ab alsa.Buffer // ALSA's buffer.
@ -64,7 +70,7 @@ type AudioInput struct {
*AudioConfig *AudioConfig
} }
// AudioConfig provides parameters used by AudioInput. // AudioConfig provides parameters used by audioDevice.
type AudioConfig struct { type AudioConfig struct {
SampleRate int SampleRate int
Channels int Channels int
@ -73,8 +79,8 @@ type AudioConfig struct {
Codec uint8 Codec uint8
} }
// NewAudioInput initializes and returns an AudioInput struct which can be started, read from, and stopped. // NewAudioDevice initializes and returns an audioDevice struct which can be started, read from, and stopped.
func NewAudioInput(cfg *AudioConfig) *AudioInput { func NewAudioDevice(cfg *AudioConfig) *audioDevice {
// Initialize logger. // Initialize logger.
logLevel := int(logger.Debug) logLevel := int(logger.Debug)
validLogLevel := true validLogLevel := true
@ -89,7 +95,7 @@ func NewAudioInput(cfg *AudioConfig) *AudioInput {
log.Log(logger.Error, "Invalid log level was defaulted to Info") log.Log(logger.Error, "Invalid log level was defaulted to Info")
} }
a := &AudioInput{} a := &audioDevice{}
a.AudioConfig = cfg a.AudioConfig = cfg
// Open the requested audio device. // Open the requested audio device.
@ -104,27 +110,42 @@ func NewAudioInput(cfg *AudioConfig) *AudioInput {
rbLen := rbDuration / a.RecPeriod rbLen := rbDuration / a.RecPeriod
a.rb = ring.NewBuffer(rbLen, a.chunkSize, rbTimeout) a.rb = ring.NewBuffer(rbLen, a.chunkSize, rbTimeout)
a.mode = "Paused" a.mode = paused
return a return a
} }
// Start will start recording audio and writing to the output. // Start will start recording audio and writing to the output.
func (a *AudioInput) Start() { func (a *audioDevice) Start() {
a.mu.Lock() a.mu.Lock()
mode := a.mode mode := a.mode
a.mu.Unlock() a.mu.Unlock()
switch mode { switch mode {
case "Paused": case paused:
// Start Recording
go a.input() go a.input()
case "Stopped": mode = running
case stopped:
// Open the audio device and start recording.
err := a.open()
if err != nil {
log.Log(logger.Fatal, "alsa.open failed", "error", err.Error())
} }
go a.input()
mode = running
case running:
return
}
a.mu.Lock()
a.mode = mode
a.mu.Unlock()
} }
// Stop will stop recording audio and close // Stop will stop recording audio and close
func (a *AudioInput) Stop() { func (a *audioDevice) Stop() {
a.mode = "Stopped" a.mu.Lock()
a.mode = stopped
a.mu.Unlock()
if a.dev != nil { if a.dev != nil {
log.Log(logger.Debug, "Closing", "source", a.source) log.Log(logger.Debug, "Closing", "source", a.source)
a.dev.Close() a.dev.Close()
@ -133,14 +154,14 @@ func (a *AudioInput) Stop() {
} }
// ChunkSize returns the AudioInput's chunkSize, ie. the number of bytes of audio written to output at a time. // ChunkSize returns the audioDevice's chunkSize, ie. the number of bytes of audio written to output at a time.
func (a *AudioInput) ChunkSize() int { func (a *audioDevice) ChunkSize() int {
return a.chunkSize return a.chunkSize
} }
// open or re-open the recording device with the given name and prepare it to record. // open or re-open the recording device with the given name and prepare it to record.
// If name is empty, the first recording device is used. // If name is empty, the first recording device is used.
func (a *AudioInput) open() error { func (a *audioDevice) open() error {
if a.dev != nil { if a.dev != nil {
log.Log(logger.Debug, "Closing", "source", a.source) log.Log(logger.Debug, "Closing", "source", a.source)
a.dev.Close() a.dev.Close()
@ -248,16 +269,16 @@ func (a *AudioInput) open() error {
// input continously records audio and writes it to the ringbuffer. // input continously records audio and writes it to the ringbuffer.
// Re-opens the device and tries again if ASLA returns an error. // Re-opens the device and tries again if ASLA returns an error.
func (a *AudioInput) input() { func (a *audioDevice) input() {
for { for {
a.mu.Lock() a.mu.Lock()
mode := a.mode mode := a.mode
a.mu.Unlock() a.mu.Unlock()
switch mode { switch mode {
case "Paused": case paused:
time.Sleep(time.Duration(a.RecPeriod) * time.Second) time.Sleep(time.Duration(a.RecPeriod) * time.Second)
continue continue
case "Stopped": case stopped:
break break
} }
log.Log(logger.Debug, "Recording audio for period", "seconds", a.RecPeriod) log.Log(logger.Debug, "Recording audio for period", "seconds", a.RecPeriod)
@ -295,7 +316,7 @@ func (a *AudioInput) input() {
// Read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success.
// Any errors returned are unexpected and should be considered fatal. // Any errors returned are unexpected and should be considered fatal.
func (a *AudioInput) Read(p []byte) (n int, err error) { func (a *audioDevice) Read(p []byte) (n int, err error) {
chunk, err := a.rb.Next(rbNextTimeout) chunk, err := a.rb.Next(rbNextTimeout)
switch err { switch err {
case nil: case nil:
@ -321,7 +342,7 @@ func (a *AudioInput) Read(p []byte) (n int, err error) {
// formatBuffer returns an ALSA buffer that has the recording data from the ac's original ALSA buffer but stored // formatBuffer returns an ALSA buffer that has the recording data from the ac's original ALSA buffer but stored
// in the desired format specified by the ac's parameters. // in the desired format specified by the ac's parameters.
func (a *AudioInput) formatBuffer() alsa.Buffer { func (a *audioDevice) formatBuffer() alsa.Buffer {
var err error var err error
a.mu.Lock() a.mu.Lock()
wantChannels := a.Channels wantChannels := a.Channels

View File

@ -8,7 +8,7 @@ import (
"bitbucket.org/ausocean/iot/pi/netsender" "bitbucket.org/ausocean/iot/pi/netsender"
) )
func TestAudioInputNew(t *testing.T) { func TestAudioPipeline(t *testing.T) {
mts.Meta = meta.New() mts.Meta = meta.New()
var logger testLogger var logger testLogger
ns, err := netsender.New(&logger, nil, nil, nil) ns, err := netsender.New(&logger, nil, nil, nil)
@ -18,7 +18,10 @@ func TestAudioInputNew(t *testing.T) {
var c Config var c Config
c.Logger = &logger c.Logger = &logger
c.Input = Audio c.Input = File
c.InputPath = "../../test/test-data/av/input/original_8kHz_adpcm_test.pcm"
c.Outputs = []uint8{File}
c.OutputPath = "./test-temp"
rv, err := New(c, ns) rv, err := New(c, ns)
if err != nil { if err != nil {

View File

@ -272,7 +272,7 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate, mediaType in
case File: case File:
r.setupInput = r.setupInputForFile r.setupInput = r.setupInputForFile
case Audio: case Audio:
r.setupInput = r.startAudioInput r.setupInput = r.startAudioDevice
} }
switch r.config.InputCodec { switch r.config.InputCodec {
@ -613,12 +613,12 @@ func (r *Revid) setupInputForFile() (func() error, error) {
// TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop. // TODO(kortschak): Maybe we want a context.Context-aware parser that we can stop.
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(f, time.Second/time.Duration(r.config.FrameRate), 0) go r.processFrom(f, 0, 0)
return func() error { return f.Close() }, nil return func() error { return f.Close() }, nil
} }
// startAudioInput is used to start capturing audio from an audio device and processing it. // startAudioDevice is used to start capturing audio from an audio device and processing it.
func (r *Revid) startAudioInput() (func() error, error) { func (r *Revid) startAudioDevice() (func() error, error) {
ac := &AudioConfig{ ac := &AudioConfig{
SampleRate: r.config.SampleRate, SampleRate: r.config.SampleRate,
Channels: r.config.Channels, Channels: r.config.Channels,
@ -626,7 +626,7 @@ func (r *Revid) startAudioInput() (func() error, error) {
BitDepth: r.config.BitDepth, BitDepth: r.config.BitDepth,
Codec: r.config.InputCodec, Codec: r.config.InputCodec,
} }
ai := NewAudioInput(ac) ai := NewAudioDevice(ac)
r.wg.Add(1) r.wg.Add(1)
go r.processFrom(ai, time.Second/time.Duration(r.config.WriteRate), ai.ChunkSize()) go r.processFrom(ai, time.Second/time.Duration(r.config.WriteRate), ai.ChunkSize())
return func() error { return func() error {

View File

@ -57,7 +57,7 @@ type httpSender struct {
log func(lvl int8, msg string, args ...interface{}) log func(lvl int8, msg string, args ...interface{})
} }
// newMinimalHttpSender returns a pointer to a new minimalHttpSender. // newHttpSender returns a pointer to a new httpSender.
func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender { func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
return &httpSender{ return &httpSender{
client: ns, client: ns,