revid: added concurrency support to start and stop

This commit is contained in:
Trek H 2019-05-22 14:56:58 +09:30
parent c58c573cd7
commit 17d59014c6
3 changed files with 84 additions and 49 deletions

View File

@ -41,9 +41,9 @@ import (
const ( const (
logPath = "/var/log/netsender" logPath = "/var/log/netsender"
rbDuration = 300 // seconds
rbTimeout = 100 * time.Millisecond rbTimeout = 100 * time.Millisecond
rbNextTimeout = 100 * time.Millisecond rbNextTimeout = 100 * time.Millisecond
rbLen = 200
) )
const ( const (
@ -77,7 +77,7 @@ type AudioConfig struct {
SampleRate int SampleRate int
Channels int Channels int
BitDepth int BitDepth int
RecPeriod int RecPeriod float64
Codec uint8 Codec uint8
} }
@ -109,7 +109,6 @@ func NewAudioDevice(cfg *AudioConfig) *audioDevice {
// Setup ring buffer to capture audio in periods of a.RecPeriod seconds, and buffer rbDuration seconds in total. // Setup ring buffer to capture audio in periods of a.RecPeriod seconds, and buffer rbDuration seconds in total.
a.ab = a.dev.NewBufferDuration(time.Second * time.Duration(a.RecPeriod)) a.ab = a.dev.NewBufferDuration(time.Second * time.Duration(a.RecPeriod))
a.chunkSize = (((len(a.ab.Data) / a.dev.BufferFormat().Channels) * a.Channels) / a.dev.BufferFormat().Rate) * a.SampleRate a.chunkSize = (((len(a.ab.Data) / a.dev.BufferFormat().Channels) * a.Channels) / a.dev.BufferFormat().Rate) * a.SampleRate
rbLen := rbDuration / a.RecPeriod
a.rb = ring.NewBuffer(rbLen, a.chunkSize, rbTimeout) a.rb = ring.NewBuffer(rbLen, a.chunkSize, rbTimeout)
a.mode = paused a.mode = paused
@ -120,13 +119,11 @@ func NewAudioDevice(cfg *AudioConfig) *audioDevice {
// Start will start recording audio and writing to the output. // Start will start recording audio and writing to the output.
func (a *audioDevice) Start() { func (a *audioDevice) Start() {
a.mu.Lock() a.mu.Lock()
mode := a.mode switch a.mode {
a.mu.Unlock()
switch mode {
case paused: case paused:
// Start Recording // Start Recording
go a.input() go a.input()
mode = running a.mode = running
case stopped: case stopped:
// Open the audio device and start recording. // Open the audio device and start recording.
err := a.open() err := a.open()
@ -134,25 +131,23 @@ func (a *audioDevice) Start() {
log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) log.Log(logger.Fatal, "alsa.open failed", "error", err.Error())
} }
go a.input() go a.input()
mode = running a.mode = running
case running: case running:
return return
} }
a.mu.Lock()
a.mode = mode
a.mu.Unlock() a.mu.Unlock()
} }
// Stop will stop recording audio and close the device // Stop will stop recording audio and close the device
func (a *audioDevice) Stop() { func (a *audioDevice) Stop() {
a.mu.Lock() a.mu.Lock()
a.mode = stopped
a.mu.Unlock()
if a.dev != nil { if a.dev != nil {
log.Log(logger.Debug, "Closing", "source", a.source) log.Log(logger.Debug, "Closing", "source", a.source)
a.dev.Close() a.dev.Close()
a.dev = nil a.dev = nil
} }
a.mode = stopped
a.mu.Unlock()
} }
@ -273,17 +268,16 @@ func (a *audioDevice) open() error {
func (a *audioDevice) input() { func (a *audioDevice) input() {
for { for {
a.mu.Lock() a.mu.Lock()
mode := a.mode switch a.mode {
a.mu.Unlock()
switch mode {
case paused: case paused:
a.mu.Unlock()
time.Sleep(time.Duration(a.RecPeriod) * time.Second) time.Sleep(time.Duration(a.RecPeriod) * time.Second)
continue continue
case stopped: case stopped:
break a.mu.Unlock()
return
} }
log.Log(logger.Debug, "Recording audio for period", "seconds", a.RecPeriod) log.Log(logger.Debug, "Recording audio for period", "seconds", a.RecPeriod)
a.mu.Lock()
err := a.dev.Read(a.ab.Data) err := a.dev.Read(a.ab.Data)
a.mu.Unlock() a.mu.Unlock()
if err != nil { if err != nil {
@ -318,6 +312,16 @@ func (a *audioDevice) input() {
// Read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. // Read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success.
// Any errors returned are unexpected and should be considered fatal. // Any errors returned are unexpected and should be considered fatal.
func (a *audioDevice) Read(p []byte) (n int, err error) { func (a *audioDevice) Read(p []byte) (n int, err error) {
a.mu.Lock()
if a.rb == nil {
fmt.Println("READ: RB IS NIL")
}
switch a.mode {
case paused:
return 0, nil
case stopped:
return 0, nil
}
chunk, err := a.rb.Next(rbNextTimeout) chunk, err := a.rb.Next(rbNextTimeout)
switch err { switch err {
case nil: case nil:
@ -336,8 +340,8 @@ func (a *audioDevice) Read(p []byte) (n int, err error) {
log.Log(logger.Error, "Unexpected error from ring.Read", "error", err.Error()) log.Log(logger.Error, "Unexpected error from ring.Read", "error", err.Error())
return n, err return n, err
} }
log.Log(logger.Debug, "Read audio from ringbuffer", "length", n) log.Log(logger.Debug, "Read audio from ringbuffer", "length", n)
a.mu.Unlock()
return n, nil return n, nil
} }

View File

@ -1,25 +1,20 @@
package revid package revid
import ( import (
"bytes"
"errors"
"testing" "testing"
"time"
"bitbucket.org/ausocean/av/codec/lex"
"github.com/yobert/alsa" "github.com/yobert/alsa"
) )
func TestAudioDevice(t *testing.T) { // Check that a device exists with the given config parameters.
// We want to open a device with a standard configuration. func checkDevice(ac *AudioConfig) error {
ac := &AudioConfig{
SampleRate: 8000,
Channels: 1,
RecPeriod: 1,
BitDepth: 16,
Codec: ADPCM,
}
// Check that a device exists with the desired parameters.
cards, err := alsa.OpenCards() cards, err := alsa.OpenCards()
if err != nil { if err != nil {
t.Skip("skipping, no audio card found") return errors.New("no audio cards found")
} }
defer alsa.CloseCards(cards) defer alsa.CloseCards(cards)
var testDev *alsa.Device var testDev *alsa.Device
@ -37,11 +32,15 @@ func TestAudioDevice(t *testing.T) {
} }
} }
if testDev == nil { if testDev == nil {
t.Skip("skipping, no suitable audio device found") return errors.New("no suitable device found")
}
err = testDev.Open()
if err != nil {
return err
} }
_, err = testDev.NegotiateChannels(2) _, err = testDev.NegotiateChannels(2)
if err != nil { if err != nil {
t.Skip("skipping, no suitable audio device found") return err
} }
foundRate := false foundRate := false
for i := 0; i < len(rates) && !foundRate; i++ { for i := 0; i < len(rates) && !foundRate; i++ {
@ -58,25 +57,57 @@ func TestAudioDevice(t *testing.T) {
if !foundRate { if !foundRate {
_, err = testDev.NegotiateRate(defaultSampleRate) _, err = testDev.NegotiateRate(defaultSampleRate)
if err != nil { if err != nil {
t.Skip("skipping, no suitable audio device found") return err
} }
} }
_, err = testDev.NegotiateFormat(alsa.S16_LE) var aFmt alsa.FormatType
switch ac.BitDepth {
case 16:
aFmt = alsa.S16_LE
case 32:
aFmt = alsa.S32_LE
default:
return errors.New("unsupported bitdepth")
}
_, err = testDev.NegotiateFormat(aFmt)
if err != nil { if err != nil {
t.Skip("skipping, no suitable audio device found") return err
} }
_, err = testDev.NegotiateBufferSize(8192, 16384) _, err = testDev.NegotiateBufferSize(8192, 16384)
if err != nil { if err != nil {
t.Skip("skipping, no suitable audio device found") return err
} }
if err = testDev.Prepare(); err != nil { if err = testDev.Prepare(); err != nil {
t.Skip("skipping, no suitable audio device found") return err
} }
testDev.Close() if testDev != nil {
testDev.Close()
ai := NewAudioDevice(ac) }
return nil
ai.Start() }
ai.Stop() func TestAudio(t *testing.T) {
// We want to open a device with a standard configuration.
ac := &AudioConfig{
SampleRate: 8000,
Channels: 1,
RecPeriod: 0.01,
BitDepth: 16,
Codec: ADPCM,
}
// Skip if there are no suitable devices to test with.
err := checkDevice(ac)
if err != nil {
t.Error(err)
}
// Create a new audioDevice, start, read/lex, and then stop it.
ai := NewAudioDevice(ac)
dst := bytes.NewBuffer(make([]byte, 0, ai.ChunkSize()*4))
ai.Start()
go lex.ADPCM(dst, ai, time.Duration(ac.RecPeriod), ai.ChunkSize())
time.Sleep(time.Millisecond * 10)
ai.Stop()
} }

View File

@ -74,10 +74,10 @@ type Config struct {
AutoWhiteBalance string AutoWhiteBalance string
// Audio // Audio
SampleRate int // Samples a second (Hz). SampleRate int // Samples a second (Hz).
RecPeriod int // How many seconds to record at a time. RecPeriod float64 // How many seconds to record at a time.
Channels int // Number of audio channels, 1 for mono, 2 for stereo. Channels int // Number of audio channels, 1 for mono, 2 for stereo.
BitDepth int // Sample bit depth. BitDepth int // Sample bit depth.
} }
// Possible modes for raspivid --exposure parameter. // Possible modes for raspivid --exposure parameter.
@ -168,7 +168,7 @@ const (
defaultSampleRate = 48000 defaultSampleRate = 48000
defaultBitDepth = 16 defaultBitDepth = 16
defaultChannels = 1 defaultChannels = 1
defaultRecPeriod = 1 defaultRecPeriod = 1.0
) )
// Validate checks for any errors in the config fields and defaults settings // Validate checks for any errors in the config fields and defaults settings