av/cmd/treatment/main.go

352 lines
9.4 KiB
Go

/*
DESCRIPTION
treatment is a netsender client intended to provide audio playback control,
and speaker health checking by reversing signal and recording using revid.
AUTHORS
Saxon Nelson-Milton <saxon@ausocean.org>
Trek Hopton <trek@ausocean.org>
LICENSE
Copyright (C) 2020 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
in gpl.txt. If not, see http://www.gnu.org/licenses.
*/
// Package treatment is a program for playing and recording audio through a common
// speaker unit.
package main
import (
"bytes"
"flag"
"fmt"
"io"
"os/exec"
"strconv"
"sync"
"time"
"bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/revid"
"bitbucket.org/ausocean/av/revid/config"
"bitbucket.org/ausocean/iot/pi/gpio"
"bitbucket.org/ausocean/iot/pi/netlogger"
"bitbucket.org/ausocean/iot/pi/netsender"
"bitbucket.org/ausocean/utils/logger"
"github.com/kidoman/embd"
_ "github.com/kidoman/embd/host/rpi"
lumberjack "gopkg.in/natefinch/lumberjack.v2"
)
// Copyright information prefixed to all metadata.
const (
metaPreambleKey = "copyright"
metaPreambleData = "ausocean.org/license/content2020"
)
// Logging configuration.
const (
logPath = "/var/log/netsender/netsender.log"
logMaxSize = 500 // MB
logMaxBackup = 10
logMaxAge = 28 // days
logVerbosity = logger.Info
logSuppress = true
)
// Misc constants.
const (
netSendRetryTime = 5 * time.Second
defaultSleepTime = 60 // Seconds
pkg = "rv: "
minAmpVolume = 0
maxAmpVolume = 63
volAddr = 0x4B
i2cPort = 1
)
// Treatment modes.
const (
modePaused = "Paused"
modeTreatment = "Play"
modeCheck = "Check"
)
// Variable map to send to netreceiver/vidgrind.
var varMap = map[string]string{
"mode": "enum:Paused,Play,Check",
"AudioFilePath": "string",
}
func main() {
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
// Set up the player command with audio file path.
filePtr := flag.String("path", "/home/pi/audio.wav", "Path to sound file we wish to play.")
flag.Parse()
// Create lumberjack logger to handle logging to file.
fileLog := &lumberjack.Logger{
Filename: logPath,
MaxSize: logMaxSize,
MaxBackups: logMaxBackup,
MaxAge: logMaxAge,
}
// Create netlogger to handle logging to cloud.
netLog := netlogger.New()
// Create logger that we call methods on to log, which in turn writes to the
// lumberjack and netloggers.
log := logger.New(logVerbosity, io.MultiWriter(fileLog, netLog), logSuppress)
if *filePtr == "" {
log.Fatal("no file path provided, check usage")
}
// The netsender client will handle communication with netreceiver and GPIO stuff.
log.Debug("initialising netsender client")
ns, err := netsender.New(log, gpio.InitPin, nil, gpio.WritePin, varMap)
if err != nil {
log.Fatal("could not initialise netsender client", "error", err)
}
// Revid will handle the recording and sending of audio for sound checking.
log.Debug("initialising revid")
rv, err := revid.New(config.Config{Logger: log}, ns)
if err != nil {
log.Fatal("could not initialise revid", "error", err)
}
// Start the control loop.
log.Debug("starting control loop")
run(rv, ns, filePtr, log, netLog)
}
// run starts a control loop that runs netsender, sends logs, checks for var changes, and
// if var changes, changes current mode (paused,audio playback or soundcheck)
func run(rv *revid.Revid, ns *netsender.Sender, file *string, l *logger.Logger, nl *netlogger.Logger) {
var (
wg sync.WaitGroup
audioQuit chan struct{}
treating bool
vs int
)
for {
l.Debug("running netsender")
err := ns.Run()
if err != nil {
l.Warning("run failed. Retrying...", "error", err)
time.Sleep(netSendRetryTime)
continue
}
l.Debug("sending logs")
err = nl.Send(ns)
if err != nil {
l.Warning(pkg+"Logs could not be sent", "error", err)
}
l.Debug("checking varsum")
newVs := ns.VarSum()
if vs == newVs {
sleep(ns, l)
continue
}
vs = newVs
l.Info("varsum changed", "vs", vs)
l.Debug("getting new vars")
vars, err := ns.Vars()
if err != nil {
l.Error(pkg+"netSender failed to get vars", "error", err)
time.Sleep(netSendRetryTime)
continue
}
l.Info("got new vars", "vars", vars)
// Configure revid based on the vars.
l.Debug("updating revid configuration")
err = rv.Update(vars)
if err != nil {
l.Warning(pkg+"couldn't update revid", "error", err)
sleep(ns, l)
continue
}
l.Info("revid successfully reconfigured")
l.Debug("checking amplifier volume")
v := vars["AmpVolume"]
if v != "" {
vol, err := strconv.ParseInt(v, 10, 8)
if err != nil {
l.Error(pkg+"failed to parse amplifier volume", "error", err)
} else if vol < minAmpVolume || vol > maxAmpVolume {
l.Error(fmt.Sprintf("%s invalid amplifier volume, must be between %v and %v", pkg, minAmpVolume, maxAmpVolume), "volume", vol)
} else {
bus := embd.NewI2CBus(i2cPort)
err := bus.WriteByte(volAddr, byte(vol))
if err != nil {
l.Error(pkg+"failed to write amplifier volume", "error", err)
}
}
}
l.Debug("checking mode")
switch ns.Mode() {
case modePaused:
stopAudio(&wg, &treating, audioQuit)
l.Info("mode is Paused, stopping revid")
rv.Stop()
case modeTreatment:
l.Debug("checking audio file path")
f := vars["AudioFilePath"]
if f != "" && *file != f {
file = &f
l.Info("updated audio file path, stopping audio", "AudioFilePath", f)
stopAudio(&wg, &treating, audioQuit)
}
if !treating {
l.Info("starting audio treatment")
rv.Stop()
audioQuit = make(chan struct{})
treating = true
wg.Add(1)
go playAudio(file, audioQuit, &wg, l)
}
case modeCheck:
stopAudio(&wg, &treating, audioQuit)
l.Info("sound checking")
err = rv.Start()
if err != nil {
l.Error("could not start revid", "error", err)
ns.SetMode(modePaused, &vs)
sleep(ns, l)
continue
}
default:
l.Warning("mode is not valid", "mode", ns.Mode())
}
l.Info("revid updated with new mode")
sleep(ns, l)
}
}
// playAudio is intended to be run as a routine. It will repeatedly play an audio file until
// a signal is received to return. The entire audio file is played before the termination
// signal chan is checked.
func playAudio(file *string, quit chan struct{}, wg *sync.WaitGroup, l *logger.Logger) {
var numPlays int
for {
cmd := exec.Command(audioCmd, *file)
// We'd like to see what the playback software is outputting, so pipe
// stdout and stderr.
outPipe, err := cmd.StdoutPipe()
if err != nil {
l.Error("failed to pipe stdout", "error", err)
}
errPipe, err := cmd.StderrPipe()
if err != nil {
l.Error("failed to pipe stderr", "error", err)
}
// Start playback of the audio file.
err = cmd.Start()
if err != nil {
l.Error("start failed", "error", err)
continue
}
numPlays++
l.Debug("playing audio", "numPlays", numPlays)
// Copy any std out to a buffer for logging.
var outBuff bytes.Buffer
go func() {
_, err = io.Copy(&outBuff, outPipe)
if err != nil {
l.Error("failed to copy out pipe", "error", err)
}
}()
// Copy any std error to a buffer for logging.
var errBuff bytes.Buffer
go func() {
_, err = io.Copy(&errBuff, errPipe)
if err != nil {
l.Error("failed to copy error pipe", "error", err)
}
}()
// Wait for playback to complete.
err = cmd.Wait()
if err != nil {
l.Error("failed to wait for execution finish", "error", err)
}
l.Debug("stdout received", "stdout", string(outBuff.Bytes()))
// If there was any errors on stderr, log them.
if errBuff.Len() != 0 {
l.Error("errors from stderr", "stderr", string(errBuff.Bytes()))
}
// Check for audio signal halt.
// TODO: work out better way to do this. Doing it this way means we have to wait for
// the audio file to finish playing.
select {
case <-quit:
wg.Done()
return
default:
}
}
}
// stopAudio signals to the playAudio routine to terminate and then waits for it to
// do so.
func stopAudio(wg *sync.WaitGroup, treating *bool, signal chan struct{}) {
if !*treating {
return
}
close(signal)
wg.Wait()
*treating = false
}
// sleep uses a delay to halt the program based on the monitoring period
// netsender parameter (mp) defined in the netsender.conf config.
func sleep(ns *netsender.Sender, l *logger.Logger) {
l.Debug("sleeping")
t, err := strconv.Atoi(ns.Param("mp"))
if err != nil {
l.Error(pkg+"could not get sleep time, using default", "error", err)
t = defaultSleepTime
}
time.Sleep(time.Duration(t) * time.Second)
l.Debug("finished sleeping")
}
// checkPath wraps the use of lookPath to check the existence of executables
// that will be used by the audio looper.
func checkPath(cmd string, l *logger.Logger) {
path, err := exec.LookPath(cmd)
if err != nil {
l.Fatal(fmt.Sprintf("couldn't find %s", cmd), "error", err)
}
l.Debug(fmt.Sprintf("found %s", cmd), "path", path)
}