diff --git a/Makefile b/Makefile index 81126522..47877dd1 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,7 @@ -all: +all: revid-cli audio-netsender + +revid-cli: cd cmd/revid-cli; go build + +audio-netsender: + cd cmd/audio-netsender; go build \ No newline at end of file diff --git a/cmd/audio-netsender/main.go b/cmd/audio-netsender/main.go new file mode 100644 index 00000000..b4cbfc12 --- /dev/null +++ b/cmd/audio-netsender/main.go @@ -0,0 +1,556 @@ +/* +NAME + audio-netsender - NetSender client for sending audio to NetReceiver + +AUTHORS + Alan Noble + Trek Hopton + +ACKNOWLEDGEMENTS + A special thanks to Joel Jensen for his Go ALSA package. + +LICENSE + audio-netsender is Copyright (C) 2018 the Australian Ocean Lab (AusOcean). + + It is free software: you can redistribute it and/or modify them under + the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + It is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + for more details. + + You should have received a copy of the GNU General Public License + along with https://bitbucket.org/ausocean/iot/src/master/gpl.txt. + If not, see http://www.gnu.org/licenses. +*/ + +// audio-netsender is a NetSender client for sending audio to +// NetReceiver. Audio is captured by means of an ALSA recording +// device, specified by the NetReceiver "source" variable. It sent via +// HTTP to NetReceiver in raw audio form, i.e., as PCM data, where it +// is stored as BinaryData objects. Other NetReceiver variables are +// "rate", "period", "channels" and "bits", for specifiying the frame +// rate (Hz), audio period (seconds), number of channels and sample +// bit size respectively. For a description of NetReceiver see +// http://netreceiver.appspot.com/help. +package main + +import ( + "errors" + "flag" + "io" + "strconv" + "sync" + "time" + + "github.com/yobert/alsa" + + "bitbucket.org/ausocean/av/codec/pcm" + "bitbucket.org/ausocean/iot/pi/netsender" + "bitbucket.org/ausocean/iot/pi/sds" + "bitbucket.org/ausocean/iot/pi/smartlogger" + "bitbucket.org/ausocean/utils/logger" + "bitbucket.org/ausocean/utils/ring" +) + +const ( + progName = "audio-netsender" + logPath = "/var/log/netsender" + retryPeriod = 5 * time.Second + defaultFrameRate = 48000 + defaultPeriod = 5 // seconds + defaultChannels = 2 + defaultBits = 16 + rbDuration = 300 // seconds + rbTimeout = 100 * time.Millisecond + rbNextTimeout = 100 * time.Millisecond +) + +// audioClient holds everything we need to know about the client. +// NB: At 44100 Hz frame rate, 2 channels and 16-bit samples, a period of 5 seconds +// results in PCM data chunks of 882000 bytes! A longer period exceeds datastore's 1MB blob limit. +type audioClient struct { + mu sync.Mutex // mu protects the audioClient. + + parameters + + // internals + dev *alsa.Device // audio input device + ab alsa.Buffer // ALSA's buffer + rb *ring.Buffer // our buffer + ns *netsender.Sender // our NetSender + vs int // our "var sum" to track var changes +} + +type parameters struct { + mode string // operating mode, either "Normal" or "Paused" + source string // name of audio source, or empty for the default source + rate int // frame rate in Hz, 44100Hz by default + period int // audio period in seconds, 5s by default + channels int // number of audio channels, 1 for mono, 2 for stereo + bits int // sample bit size, 16 by default +} + +var log *logger.Logger + +func main() { + var logLevel int + flag.IntVar(&logLevel, "LogLevel", int(logger.Debug), "Specifies log level") + flag.Parse() + + validLogLevel := true + if logLevel < int(logger.Debug) || logLevel > int(logger.Fatal) { + logLevel = int(logger.Info) + validLogLevel = false + } + + logSender := smartlogger.New(logPath) + log = logger.New(int8(logLevel), &logSender.LogRoller) + log.Log(logger.Info, "log-netsender: Logger Initialized") + if !validLogLevel { + log.Log(logger.Error, "invalid log level was defaulted to Info") + } + + var ac audioClient + var err error + ac.ns, err = netsender.New(log, nil, sds.ReadSystem, nil) + if err != nil { + log.Log(logger.Fatal, "netsender.Init failed", "error", err.Error()) + } + + // Get audio params and store the current var sum. + vars, err := ac.ns.Vars() + if err != nil { + log.Log(logger.Warning, "netsender.Vars failed; using defaults", "error", err.Error()) + } + ac.params(vars) + ac.vs = ac.ns.VarSum() + + // Open the requested audio device. + err = ac.open() + if err != nil { + log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) + } + + // Capture audio in periods of ac.period seconds, and buffer rbDuration seconds in total. + ac.ab = ac.dev.NewBufferDuration(time.Second * time.Duration(ac.period)) + recSize := (((len(ac.ab.Data) / ac.dev.BufferFormat().Channels) * ac.channels) / ac.dev.BufferFormat().Rate) * ac.rate + rbLen := rbDuration / ac.period + ac.rb = ring.NewBuffer(rbLen, recSize, rbTimeout) + + go ac.input() + + ac.output() +} + +// params extracts audio params from corresponding NetReceiver vars and returns true if anything has changed. +// See audioClient for a description of the params and their limits. +func (ac *audioClient) params(vars map[string]string) bool { + // We are the only writers to this field + // so we don't need to lock here. + p := ac.parameters + changed := false + + mode := vars["mode"] + if p.mode != mode { + p.mode = mode + changed = true + } + source := vars["source"] + if p.source != source { + p.source = source + changed = true + } + val, err := strconv.Atoi(vars["rate"]) + if err != nil { + val = defaultFrameRate + } + if p.rate != val { + p.rate = val + changed = true + } + val, err = strconv.Atoi(vars["period"]) + if err != nil || val < 1 || 5 < val { + val = defaultPeriod + } + if p.period != val { + p.period = val + changed = true + } + val, err = strconv.Atoi(vars["channels"]) + if err != nil || (val != 1 && val != 2) { + val = defaultChannels + } + if p.channels != val { + p.channels = val + changed = true + } + val, err = strconv.Atoi(vars["bits"]) + if err != nil || (val != 16 && val != 32) { + val = defaultBits + } + if p.bits != val { + p.bits = val + changed = true + } + + if changed { + ac.mu.Lock() + ac.parameters = p + ac.mu.Unlock() + log.Log(logger.Debug, "params changed") + } + log.Log(logger.Debug, "parameters", "mode", p.mode, "source", p.source, "rate", p.rate, "period", p.period, "channels", p.channels, "bits", p.bits) + return changed +} + +// open or re-open the recording device with the given name and prepare it to record. +// If name is empty, the first recording device is used. +func (ac *audioClient) open() error { + if ac.dev != nil { + log.Log(logger.Debug, "closing", "source", ac.source) + ac.dev.Close() + ac.dev = nil + } + log.Log(logger.Debug, "opening", "source", ac.source) + + cards, err := alsa.OpenCards() + if err != nil { + return err + } + defer alsa.CloseCards(cards) + + for _, card := range cards { + devices, err := card.Devices() + if err != nil { + return err + } + for _, dev := range devices { + if dev.Type != alsa.PCM || !dev.Record { + continue + } + if dev.Title == ac.source || ac.source == "" { + ac.dev = dev + break + } + } + } + + if ac.dev == nil { + return errors.New("No audio source found") + } + log.Log(logger.Debug, "found audio source", "source", ac.dev.Title) + + // ToDo: time out if Open takes too long. + err = ac.dev.Open() + if err != nil { + return err + } + log.Log(logger.Debug, "opened audio source") + + _, err = ac.dev.NegotiateChannels(defaultChannels) + if err != nil { + return err + } + + // Try to negotiate a rate to record in that is divisible by the wanted rate + // so that it can be easily downsampled to the wanted rate. + // Note: if a card thinks it can record at a rate but can't actually, this can cause a failure. Eg. + // the audioinjector is supposed to record at 8000Hz and 16000Hz but it can't due to a firmware issue, + // to fix this 8000 and 16000 must be removed from this slice. + rates := [8]int{8000, 16000, 32000, 44100, 48000, 88200, 96000, 192000} + foundRate := false + for r := range rates { + if r < ac.rate { + continue + } + if r%ac.rate == 0 { + _, err = ac.dev.NegotiateRate(r) + if err == nil { + foundRate = true + log.Log(logger.Debug, "sample rate set", "rate", r) + break + } + } + } + + // If no easily divisible rate is found, then use the default rate. + if !foundRate { + log.Log(logger.Warning, "no available device sample-rates are divisible by the requested rate. Default rate will be used. Resampling may fail.", "rateRequested", ac.rate) + _, err = ac.dev.NegotiateRate(defaultFrameRate) + if err != nil { + return err + } + log.Log(logger.Debug, "sample rate set", "rate", defaultFrameRate) + } + + var fmt alsa.FormatType + switch ac.bits { + case 16: + fmt = alsa.S16_LE + case 32: + fmt = alsa.S32_LE + default: + return errors.New("unsupported sample bits") + } + _, err = ac.dev.NegotiateFormat(fmt) + if err != nil { + return err + } + + // Either 8192 or 16384 bytes is a reasonable ALSA buffer size. + _, err = ac.dev.NegotiateBufferSize(8192, 16384) + if err != nil { + return err + } + + if err = ac.dev.Prepare(); err != nil { + return err + } + log.Log(logger.Debug, "successfully negotiated ALSA params") + return nil +} + +// input continously records audio and writes it to the ringbuffer. +// Re-opens the device and tries again if ASLA returns an error. +// Spends a lot of time sleeping in Paused mode. +// ToDo: Currently, reading audio and writing to the ringbuffer are synchronous. +// Need a way to asynchronously read from the ALSA buffer, i.e., _while_ it is recording to avoid any gaps. +func (ac *audioClient) input() { + for { + ac.mu.Lock() + mode := ac.mode + ac.mu.Unlock() + if mode == "Paused" { + time.Sleep(time.Duration(ac.period) * time.Second) + continue + } + log.Log(logger.Debug, "recording audio for period", "seconds", ac.period) + ac.mu.Lock() + err := ac.dev.Read(ac.ab.Data) + ac.mu.Unlock() + if err != nil { + log.Log(logger.Debug, "device.Read failed", "error", err.Error()) + ac.mu.Lock() + err = ac.open() // re-open + if err != nil { + log.Log(logger.Fatal, "alsa.open failed", "error", err.Error()) + } + ac.mu.Unlock() + continue + } + + toWrite := ac.formatBuffer() + + log.Log(logger.Debug, "audio format conversion has been performed where needed") + + var n int + n, err = ac.rb.Write(toWrite.Data) + switch err { + case nil: + log.Log(logger.Debug, "wrote audio to ringbuffer", "length", n) + case ring.ErrDropped: + log.Log(logger.Warning, "dropped audio") + default: + log.Log(logger.Error, "unexpected ringbuffer error", "error", err.Error()) + return + } + } +} + +// output continously reads audio from the ringbuffer and sends it to NetReceiver via poll requests. +// When "B0" is configured as one of the NetReceiver inputs, audio data is posted as "B0". +// When "B0" is not an input, the poll request happens without any audio data +// (although other inputs may still be present via URL parameters). +// When paused, polling continues but without sending audio (B0) data. +// Sending is throttled so as to complete one pass of this loop approximately every audio period, +// since cycling more frequently is pointless. +// Finally while audio data is sent every audio period, other data is reported only every monitor period. +// This function also handles NetReceiver configuration requests and updating of NetReceiver vars. +func (ac *audioClient) output() { + // Calculate the size of the output data based on wanted channels and rate. + outLen := (((len(ac.ab.Data) / ac.ab.Format.Channels) * ac.channels) / ac.ab.Format.Rate) * ac.rate + buf := make([]byte, outLen) + + mime := "audio/x-wav;codec=pcm;rate=" + strconv.Itoa(ac.rate) + ";channels=" + strconv.Itoa(ac.channels) + ";bits=" + strconv.Itoa(ac.bits) + ip := ac.ns.Param("ip") + mp, err := strconv.Atoi(ac.ns.Param("mp")) + if err != nil { + log.Log(logger.Fatal, "mp not an integer") + } + + report := true // Report non-audio data. + reported := time.Now() // When we last did so. + + for { + var rc int + start := time.Now() + audio := false + var pins []netsender.Pin + + if ac.mode == "Paused" { + + // Only send X data when paused (if any). + if report { + pins = netsender.MakePins(ip, "X") + } + } else { + n, err := read(ac.rb, buf) + if err != nil { + return + } + if n == 0 { + goto sleep + } + if n != len(buf) { + log.Log(logger.Error, "unexpected length from read", "length", n) + return + } + if report { + pins = netsender.MakePins(ip, "") + } else { + pins = netsender.MakePins(ip, "B") + } + for i, pin := range pins { + if pin.Name == "B0" { + audio = true + pins[i].Value = n + pins[i].Data = buf + pins[i].MimeType = mime + } + } + } + + if !(report || audio) { + goto sleep // nothing to do + } + + // Populate X pins, if any. + for i, pin := range pins { + if pin.Name[0] == 'X' { + err := sds.ReadSystem(&pins[i]) + if err != nil { + log.Log(logger.Warning, "sds.ReadSystem failed", "error", err.Error()) + // Pin.Value defaults to -1 upon error, so OK to continue. + } + } + } + _, rc, err = ac.ns.Send(netsender.RequestPoll, pins) + if err != nil { + log.Log(logger.Debug, "netsender.Send failed", "error", err.Error()) + goto sleep + } + if report { + reported = start + report = false + } + if rc == netsender.ResponseUpdate { + _, err = ac.ns.Config() + if err != nil { + log.Log(logger.Warning, "netsender.Config failed", "error", err.Error()) + goto sleep + } + ip = ac.ns.Param("ip") + mp, err = strconv.Atoi(ac.ns.Param("mp")) + if err != nil { + log.Log(logger.Fatal, "mp not an integer") + } + } + + if ac.vs != ac.ns.VarSum() { + vars, err := ac.ns.Vars() + if err != nil { + log.Log(logger.Error, "netsender.Vars failed", "error", err.Error()) + goto sleep + } + ac.params(vars) // ToDo: re-open device if audio params have changed. + ac.vs = ac.ns.VarSum() + } + + sleep: + pause := ac.period*1000 - int(time.Since(start).Seconds()*1000) + if pause > 0 { + time.Sleep(time.Duration(pause) * time.Millisecond) + } + if time.Since(reported).Seconds() >= float64(mp) { + report = true + } + + } +} + +// read reads a full PCM chunk from the ringbuffer, returning the number of bytes read upon success. +// Any errors returned are unexpected and should be considered fatal. +func read(rb *ring.Buffer, buf []byte) (int, error) { + chunk, err := rb.Next(rbNextTimeout) + switch err { + case nil: + // Do nothing. + case ring.ErrTimeout: + return 0, nil + case io.EOF: + log.Log(logger.Error, "unexpected EOF from ring.Next") + return 0, io.ErrUnexpectedEOF + default: + log.Log(logger.Error, "unexpected error from ring.Next", "error", err.Error()) + return 0, err + } + + n, err := io.ReadFull(rb, buf[:chunk.Len()]) + if err != nil { + log.Log(logger.Error, "unexpected error from ring.Read", "error", err.Error()) + return n, err + } + + log.Log(logger.Debug, "read audio from ringbuffer", "length", n) + return n, nil +} + +// formatBuffer returns an ALSA buffer that has the recording data from the ac's original ALSA buffer but stored +// in the desired format specified by the ac's parameters. +func (ac *audioClient) formatBuffer() alsa.Buffer { + var err error + ac.mu.Lock() + wantChannels := ac.channels + wantRate := ac.rate + ac.mu.Unlock() + + // If nothing needs to be changed, return the original. + if ac.ab.Format.Channels == wantChannels && ac.ab.Format.Rate == wantRate { + return ac.ab + } + + formatted := alsa.Buffer{Format: ac.ab.Format} + bufCopied := false + if ac.ab.Format.Channels != wantChannels { + + // Convert channels. + if ac.ab.Format.Channels == 2 && wantChannels == 1 { + if formatted, err = pcm.StereoToMono(ac.ab); err != nil { + log.Log(logger.Warning, "channel conversion failed, audio has remained stereo", "error", err.Error()) + } else { + formatted.Format.Channels = 1 + } + bufCopied = true + } + } + + if ac.ab.Format.Rate != wantRate { + + // Convert rate. + if bufCopied { + formatted, err = pcm.Resample(formatted, wantRate) + } else { + formatted, err = pcm.Resample(ac.ab, wantRate) + } + if err != nil { + log.Log(logger.Warning, "rate conversion failed, audio has remained original rate", "error", err.Error()) + } else { + formatted.Format.Rate = wantRate + } + } + return formatted +} diff --git a/cmd/audio-netsender/upgrade.sh b/cmd/audio-netsender/upgrade.sh new file mode 100755 index 00000000..76eb413b --- /dev/null +++ b/cmd/audio-netsender/upgrade.sh @@ -0,0 +1,56 @@ +#!/bin/bash +# All-purpose upgrade script. +# Upgrades source(s) to given Git tag, runs make in each directory, +# and write tags to tags.conf upon success, exiting 0. +# NB: Customize SrcDirs as needed to reflect dependencies. +Usage="Usage: upgrade.sh [-d] tag" +BaseDir=$GOPATH/src/bitbucket.org/ausocean +VarDir=/var/netsender +LogFile=/var/log/netsender/stream.log +SrcDirs=($BaseDir/utils $BaseDir/iot) +if [ "$1" == "-d" ]; then + set -x + GitFlags="" + NewTag="$2" +else + # capture stdout and stderr + exec 2> $LogFile + exec 1>&2 + GitFlags="--quiet" + NewTag="$1" +fi +if [ -z "$GOPATH" ]; then + echo "Error: GOPATH not defined" + exit 1 +fi +if [ -z "$NewTag" ]; then + echo "$Usage" + exit 1 +fi +for dir in ${SrcDirs[@]}; do + pushd $dir + if [ ! "$?" == 0 ]; then + exit 1 + fi + git fetch $GitFlags --depth=1 origin refs/tags/$NewTag:refs/tags/$NewTag + if [ ! "$?" == 0 ]; then + exit 1 + fi + git checkout $GitFlags --force tags/$NewTag + if [ ! "$?" == 0 ]; then + exit 1 + fi + if [ -e Makefile ]; then + make + if [ ! "$?" == 0 ]; then + exit 1 + fi + fi + popd +done +if [ ! -d "$VarDir" ]; then + echo "Error: $VarDir does not exit." + exit 1 +fi +git tag > "$VarDir/tags.conf" +exit $? diff --git a/container/mts/encoder.go b/container/mts/encoder.go index 693b0be3..a903500b 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -136,19 +136,19 @@ type Encoder struct { tsSpace [PacketSize]byte pesSpace [pes.MaxPesSize]byte - continuity map[int]byte + continuity map[uint16]byte nalBasedPSI bool pktCount int psiSendCount int - mediaPid int + mediaPid uint16 streamID byte } // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { - var mPid int + var mPid uint16 var sid byte nbp := true switch mediaType { @@ -170,7 +170,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { Pil: 0, Essd: &psi.ESSD{ St: byte(sid), - Epid: uint16(mPid), + Epid: mPid, Esil: 0x00, }, } @@ -189,7 +189,7 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { mediaPid: mPid, streamID: sid, - continuity: map[int]byte{ + continuity: map[uint16]byte{ PatPid: 0, PmtPid: 0, mPid: 0, @@ -331,7 +331,7 @@ func (e *Encoder) pcr() uint64 { } // ccFor returns the next continuity counter for pid. -func (e *Encoder) ccFor(pid int) byte { +func (e *Encoder) ccFor(pid uint16) byte { cc := e.continuity[pid] const continuityCounterMask = 0xf e.continuity[pid] = (cc + 1) & continuityCounterMask