run gofmt

This commit is contained in:
Saxon Nelson-Milton 2022-11-06 17:25:28 +10:30
parent ea900398a3
commit fe2f091272
13 changed files with 107 additions and 108 deletions

View File

@ -89,7 +89,7 @@ func main() {
// Create netsender client. // Create netsender client.
ns, err := netsender.New(l, nil, readPin(), nil) ns, err := netsender.New(l, nil, readPin(), nil)
if err != nil { if err != nil {
l.Fatal( "could not initialise netsender client", "error", err) l.Fatal("could not initialise netsender client", "error", err)
} }
// This routine will deal with things that need to happen with the netsender client. // This routine will deal with things that need to happen with the netsender client.
@ -104,28 +104,28 @@ func main() {
// stdout and stderr. // stdout and stderr.
outPipe, err := cmd.StdoutPipe() outPipe, err := cmd.StdoutPipe()
if err != nil { if err != nil {
l.Error( "failed to pipe stdout", "error", err) l.Error("failed to pipe stdout", "error", err)
} }
errPipe, err := cmd.StderrPipe() errPipe, err := cmd.StderrPipe()
if err != nil { if err != nil {
l.Error( "failed to pipe stderr", "error", err) l.Error("failed to pipe stderr", "error", err)
} }
// Start playback of the audio file. // Start playback of the audio file.
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
l.Error( "start failed", "error", err.Error()) l.Error("start failed", "error", err.Error())
continue continue
} }
numPlays++ numPlays++
l.Debug( "playing audio", "numPlays", numPlays) l.Debug("playing audio", "numPlays", numPlays)
// Copy any std out to a buffer for logging. // Copy any std out to a buffer for logging.
var outBuff bytes.Buffer var outBuff bytes.Buffer
go func() { go func() {
_, err = io.Copy(&outBuff, outPipe) _, err = io.Copy(&outBuff, outPipe)
if err != nil { if err != nil {
l.Error( "failed to copy out pipe", "error", err) l.Error("failed to copy out pipe", "error", err)
} }
}() }()
@ -134,20 +134,20 @@ func main() {
go func() { go func() {
_, err = io.Copy(&errBuff, errPipe) _, err = io.Copy(&errBuff, errPipe)
if err != nil { if err != nil {
l.Error( "failed to copy error pipe", "error", err) l.Error("failed to copy error pipe", "error", err)
} }
}() }()
// Wait for playback to complete. // Wait for playback to complete.
err = cmd.Wait() err = cmd.Wait()
if err != nil { if err != nil {
l.Error( "failed to wait for execution finish", "error", err.Error()) l.Error("failed to wait for execution finish", "error", err.Error())
} }
l.Debug( "stdout received", "stdout", string(outBuff.Bytes())) l.Debug("stdout received", "stdout", string(outBuff.Bytes()))
// If there was any errors on stderr, l them. // If there was any errors on stderr, l them.
if errBuff.Len() != 0 { if errBuff.Len() != 0 {
l.Error( "errors from stderr", "stderr", string(errBuff.Bytes())) l.Error("errors from stderr", "stderr", string(errBuff.Bytes()))
} }
} }
} }
@ -158,14 +158,14 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger) {
for { for {
err := ns.Run() err := ns.Run()
if err != nil { if err != nil {
l.Warning( "Run Failed. Retrying...", "error", err) l.Warning("Run Failed. Retrying...", "error", err)
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
err = nl.Send(ns) err = nl.Send(ns)
if err != nil { if err != nil {
l.Warning( "Logs could not be sent", "error", err.Error()) l.Warning("Logs could not be sent", "error", err.Error())
} }
// If var sum hasn't changed we skip rest of loop. // If var sum hasn't changed we skip rest of loop.
@ -178,7 +178,7 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger) {
vars, err := ns.Vars() vars, err := ns.Vars()
if err != nil { if err != nil {
l.Error( "netSender failed to get vars", "error", err) l.Error("netSender failed to get vars", "error", err)
time.Sleep(netSendRetryTime) time.Sleep(netSendRetryTime)
continue continue
} }
@ -186,7 +186,7 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger) {
// Configure looper based on vars. // Configure looper based on vars.
err = update(vars) err = update(vars)
if err != nil { if err != nil {
l.Warning( "couldn't update with new vars", "error", err) l.Warning("couldn't update with new vars", "error", err)
sleep(ns, l) sleep(ns, l)
continue continue
} }
@ -205,9 +205,9 @@ func run(ns *netsender.Sender, l logging.Logger, nl *netlogger.Logger) {
func checkPath(cmd string, l logging.Logger) { func checkPath(cmd string, l logging.Logger) {
path, err := exec.LookPath(cmd) path, err := exec.LookPath(cmd)
if err != nil { if err != nil {
l.Fatal( fmt.Sprintf("couldn't find %s", cmd), "error", err) l.Fatal(fmt.Sprintf("couldn't find %s", cmd), "error", err)
} }
l.Debug( fmt.Sprintf("found %s", cmd), "path", path) l.Debug(fmt.Sprintf("found %s", cmd), "path", path)
} }
// sleep uses a delay to halt the program based on the monitoring period // sleep uses a delay to halt the program based on the monitoring period
@ -215,7 +215,7 @@ func checkPath(cmd string, l logging.Logger) {
func sleep(ns *netsender.Sender, l logging.Logger) { func sleep(ns *netsender.Sender, l logging.Logger) {
t, err := strconv.Atoi(ns.Param("mp")) t, err := strconv.Atoi(ns.Param("mp"))
if err != nil { if err != nil {
l.Error( "could not get sleep time, using default", "error", err) l.Error("could not get sleep time, using default", "error", err)
t = defaultSleepTime t = defaultSleepTime
} }
time.Sleep(time.Duration(t) * time.Second) time.Sleep(time.Duration(t) * time.Second)

View File

@ -52,7 +52,7 @@ func initCommand(l logging.Logger) {
cmdInit := exec.Command(alsactl, "-f", cardPath, "restore") cmdInit := exec.Command(alsactl, "-f", cardPath, "restore")
err := cmdInit.Run() err := cmdInit.Run()
for err != nil { for err != nil {
l.Warning( "alsactl run failed, retrying...", "error", err) l.Warning("alsactl run failed, retrying...", "error", err)
time.Sleep(retryDur) time.Sleep(retryDur)
err = cmdInit.Run() err = cmdInit.Run()
} }

View File

@ -107,7 +107,7 @@ func (tp *turbidityProbe) Write(p []byte) (int, error) {
} }
return len(p), nil return len(p), nil
} else { } else {
tp.log.Debug( "trim successful", "keyframe error counter", tp.trimCounter) tp.log.Debug("trim successful", "keyframe error counter", tp.trimCounter)
tp.trimCounter = 0 tp.trimCounter = 0
} }
@ -116,7 +116,7 @@ func (tp *turbidityProbe) Write(p []byte) (int, error) {
tp.buffer.Reset() tp.buffer.Reset()
return 0, fmt.Errorf("could not write trimmed video to buffer: %w", err) return 0, fmt.Errorf("could not write trimmed video to buffer: %w", err)
} }
tp.log.Debug( "video trimmed, write keyframe complete", "size(bytes)", n) tp.log.Debug("video trimmed, write keyframe complete", "size(bytes)", n)
} else if tp.buffer.Len() < bufferLimit { } else if tp.buffer.Len() < bufferLimit {
// Buffer size is limited to speed up decoding. // Buffer size is limited to speed up decoding.
_, err := tp.buffer.Write(p) _, err := tp.buffer.Write(p)
@ -128,13 +128,13 @@ func (tp *turbidityProbe) Write(p []byte) (int, error) {
// Buffer is large enough to begin turbidity calculation. // Buffer is large enough to begin turbidity calculation.
select { select {
case <-tp.ticker.C: case <-tp.ticker.C:
tp.log.Debug( "beginning turbidity calculation") tp.log.Debug("beginning turbidity calculation")
startTime := time.Now() startTime := time.Now()
err := tp.turbidityCalculation() err := tp.turbidityCalculation()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not calculate turbidity: %w", err) return 0, fmt.Errorf("could not calculate turbidity: %w", err)
} }
tp.log.Debug( "finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds()) tp.log.Debug("finished turbidity calculation", "total duration (sec)", time.Since(startTime).Seconds())
default: default:
} }
} }
@ -155,13 +155,13 @@ func (tp *turbidityProbe) Update(transformMatrix []float64) error {
continue continue
} }
// Update the turbidity sensor with new transformation. // Update the turbidity sensor with new transformation.
tp.log.Debug( "updating the transformation matrix") tp.log.Debug("updating the transformation matrix")
tp.transform = transformMatrix tp.transform = transformMatrix
newTransform := floatToMat(tp.transform) newTransform := floatToMat(tp.transform)
tp.ts.TransformMatrix = newTransform tp.ts.TransformMatrix = newTransform
return nil return nil
} }
tp.log.Debug( "no change to the transformation matrix") tp.log.Debug("no change to the transformation matrix")
return nil return nil
} }
@ -174,13 +174,13 @@ func (tp *turbidityProbe) turbidityCalculation() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to create temp file: %w", err) return fmt.Errorf("failed to create temp file: %w", err)
} }
tp.log.Debug( "writing to file", "buffer size(bytes)", tp.buffer.Len()) tp.log.Debug("writing to file", "buffer size(bytes)", tp.buffer.Len())
_, err = file.Write(tp.buffer.Bytes()) _, err = file.Write(tp.buffer.Bytes())
if err != nil { if err != nil {
return fmt.Errorf("failed to write to temporary file: %w", err) return fmt.Errorf("failed to write to temporary file: %w", err)
} }
tp.log.Debug( "write to file success", "buffer size(bytes)", tp.buffer.Len()) tp.log.Debug("write to file success", "buffer size(bytes)", tp.buffer.Len())
tp.buffer.Reset() tp.buffer.Reset()
// Open the video file. // Open the video file.
@ -189,7 +189,7 @@ func (tp *turbidityProbe) turbidityCalculation() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to open video file: %w", err) return fmt.Errorf("failed to open video file: %w", err)
} }
tp.log.Debug( "video capture open", "total duration (sec)", time.Since(startTime).Seconds()) tp.log.Debug("video capture open", "total duration (sec)", time.Since(startTime).Seconds())
// Store each frame until maximum amount is reached. // Store each frame until maximum amount is reached.
startTime = time.Now() startTime = time.Now()
@ -199,7 +199,7 @@ func (tp *turbidityProbe) turbidityCalculation() error {
if len(imgs) <= 0 { if len(imgs) <= 0 {
return errors.New("no frames found") return errors.New("no frames found")
} }
tp.log.Debug( "read time", "total duration (sec)", time.Since(startTime).Seconds()) tp.log.Debug("read time", "total duration (sec)", time.Since(startTime).Seconds())
// Process video data to get saturation and contrast scores. // Process video data to get saturation and contrast scores.
res, err := tp.ts.Evaluate(imgs) res, err := tp.ts.Evaluate(imgs)

View File

@ -191,7 +191,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
e.pktCount = 0 e.pktCount = 0
err := e.writePSI() err := e.writePSI()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not write psi (psiMethodPacket): %w",err) return 0, fmt.Errorf("could not write psi (psiMethodPacket): %w", err)
} }
} }
case psiMethodNAL: case psiMethodNAL:
@ -203,7 +203,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
if nalType == h264dec.NALTypeSPS { if nalType == h264dec.NALTypeSPS {
err := e.writePSI() err := e.writePSI()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not write psi (psiMethodNAL): %w",err) return 0, fmt.Errorf("could not write psi (psiMethodNAL): %w", err)
} }
} }
case psiMethodTime: case psiMethodTime:
@ -214,7 +214,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
e.startTime = time.Now() e.startTime = time.Now()
err := e.writePSI() err := e.writePSI()
if err != nil { if err != nil {
return 0, fmt.Errorf("could not write psi (psiMethodTime): %w",err) return 0, fmt.Errorf("could not write psi (psiMethodTime): %w", err)
} }
} }
default: default:
@ -259,7 +259,7 @@ func (e *Encoder) Write(data []byte) (int, error) {
e.log.Debug("writing MTS packet to destination", "size", len(b), "pusi", pusi, "PID", pkt.PID, "PTS", pts, "PCR", pkt.PCR) e.log.Debug("writing MTS packet to destination", "size", len(b), "pusi", pusi, "PID", pkt.PID, "PTS", pts, "PCR", pkt.PCR)
_, err := e.dst.Write(b) _, err := e.dst.Write(b)
if err != nil { if err != nil {
return len(data), fmt.Errorf("could not write MTS packet to destination: %w",err) return len(data), fmt.Errorf("could not write MTS packet to destination: %w", err)
} }
e.pktCount++ e.pktCount++
} }
@ -282,13 +282,13 @@ func (e *Encoder) writePSI() error {
} }
_, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize])) _, err := e.dst.Write(patPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return fmt.Errorf("could not write pat packet: %w",err) return fmt.Errorf("could not write pat packet: %w", err)
} }
e.pktCount++ e.pktCount++
e.pmtBytes, err = updateMeta(e.pmtBytes, e.log) e.pmtBytes, err = updateMeta(e.pmtBytes, e.log)
if err != nil { if err != nil {
return fmt.Errorf("could not update pmt metadata: %w",err) return fmt.Errorf("could not update pmt metadata: %w", err)
} }
// Create mts packet from pmt table. // Create mts packet from pmt table.
@ -301,7 +301,7 @@ func (e *Encoder) writePSI() error {
} }
_, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize])) _, err = e.dst.Write(pmtPkt.Bytes(e.tsSpace[:PacketSize]))
if err != nil { if err != nil {
return fmt.Errorf("could not write pmt packet: %w",err) return fmt.Errorf("could not write pmt packet: %w", err)
} }
e.pktCount++ e.pktCount++

View File

@ -80,14 +80,14 @@ var (
// An ALSA device holds everything we need to know about the audio input stream and implements io.Reader and device.AVDevice. // An ALSA device holds everything we need to know about the audio input stream and implements io.Reader and device.AVDevice.
type ALSA struct { type ALSA struct {
l logging.Logger // Logger for device's routines to log to. l logging.Logger // Logger for device's routines to log to.
mode uint8 // Operating mode, either running, paused, or stopped. mode uint8 // Operating mode, either running, paused, or stopped.
mu sync.Mutex // Provides synchronisation when changing modes concurrently. mu sync.Mutex // Provides synchronisation when changing modes concurrently.
title string // Name of audio title, or empty for the default title. title string // Name of audio title, or empty for the default title.
dev *yalsa.Device // ALSA device's Audio input device. dev *yalsa.Device // ALSA device's Audio input device.
pb pcm.Buffer // Buffer to contain the direct audio from ALSA. pb pcm.Buffer // Buffer to contain the direct audio from ALSA.
buf *pool.Buffer // Ring buffer to contain processed audio ready to be read. buf *pool.Buffer // Ring buffer to contain processed audio ready to be read.
Config // Configuration parameters for this device. Config // Configuration parameters for this device.
} }
// Config provides parameters used by the ALSA device. // Config provides parameters used by the ALSA device.
@ -221,20 +221,20 @@ func (d *ALSA) Stop() error {
func (d *ALSA) open() error { func (d *ALSA) open() error {
// Close any existing device. // Close any existing device.
if d.dev != nil { if d.dev != nil {
d.l.Debug( "closing device", "title", d.title) d.l.Debug("closing device", "title", d.title)
d.dev.Close() d.dev.Close()
d.dev = nil d.dev = nil
} }
// Open sound card and open recording device. // Open sound card and open recording device.
d.l.Debug( "opening sound card") d.l.Debug("opening sound card")
cards, err := yalsa.OpenCards() cards, err := yalsa.OpenCards()
if err != nil { if err != nil {
return err return err
} }
defer yalsa.CloseCards(cards) defer yalsa.CloseCards(cards)
d.l.Debug( "finding audio device") d.l.Debug("finding audio device")
for _, card := range cards { for _, card := range cards {
devices, err := card.Devices() devices, err := card.Devices()
if err != nil { if err != nil {
@ -254,7 +254,7 @@ func (d *ALSA) open() error {
return errors.New("no ALSA device found") return errors.New("no ALSA device found")
} }
d.l.Debug( "opening ALSA device", "title", d.dev.Title) d.l.Debug("opening ALSA device", "title", d.dev.Title)
err = d.dev.Open() err = d.dev.Open()
if err != nil { if err != nil {
return err return err
@ -263,13 +263,13 @@ func (d *ALSA) open() error {
// Try to configure device with chosen channels. // Try to configure device with chosen channels.
channels, err := d.dev.NegotiateChannels(int(d.Channels)) channels, err := d.dev.NegotiateChannels(int(d.Channels))
if err != nil && d.Channels == 1 { if err != nil && d.Channels == 1 {
d.l.Info( "device is unable to record in mono, trying stereo", "error", err) d.l.Info("device is unable to record in mono, trying stereo", "error", err)
channels, err = d.dev.NegotiateChannels(2) channels, err = d.dev.NegotiateChannels(2)
} }
if err != nil { if err != nil {
return fmt.Errorf("device is unable to record with requested number of channels: %w", err) return fmt.Errorf("device is unable to record with requested number of channels: %w", err)
} }
d.l.Debug( "alsa device channels set", "channels", channels) d.l.Debug("alsa device channels set", "channels", channels)
// Try to negotiate a rate to record in that is divisible by the wanted rate // Try to negotiate a rate to record in that is divisible by the wanted rate
// so that it can be easily downsampled to the wanted rate. // so that it can be easily downsampled to the wanted rate.
@ -289,7 +289,7 @@ func (d *ALSA) open() error {
rate, err = d.dev.NegotiateRate(r) rate, err = d.dev.NegotiateRate(r)
if err == nil { if err == nil {
foundRate = true foundRate = true
d.l.Debug( "alsa device sample rate set", "rate", rate) d.l.Debug("alsa device sample rate set", "rate", rate)
break break
} }
} }
@ -297,12 +297,12 @@ func (d *ALSA) open() error {
// If no easily divisible rate is found, then use the default rate. // If no easily divisible rate is found, then use the default rate.
if !foundRate { if !foundRate {
d.l.Warning( "unable to sample at requested rate, default used.", "rateRequested", d.SampleRate) d.l.Warning("unable to sample at requested rate, default used.", "rateRequested", d.SampleRate)
rate, err = d.dev.NegotiateRate(defaultSampleRate) rate, err = d.dev.NegotiateRate(defaultSampleRate)
if err != nil { if err != nil {
return err return err
} }
d.l.Debug( "alsa device sample rate set", "rate", rate) d.l.Debug("alsa device sample rate set", "rate", rate)
} }
var aFmt yalsa.FormatType var aFmt yalsa.FormatType
@ -327,7 +327,7 @@ func (d *ALSA) open() error {
default: default:
return fmt.Errorf("unsupported sample bits %v", d.BitDepth) return fmt.Errorf("unsupported sample bits %v", d.BitDepth)
} }
d.l.Debug( "alsa device bit depth set", "bitdepth", bitdepth) d.l.Debug("alsa device bit depth set", "bitdepth", bitdepth)
// A 50ms period is a sensible value for low-ish latency. (this could be made configurable if needed) // A 50ms period is a sensible value for low-ish latency. (this could be made configurable if needed)
// Some devices only accept even period sizes while others want powers of 2. // Some devices only accept even period sizes while others want powers of 2.
@ -342,13 +342,13 @@ func (d *ALSA) open() error {
if err != nil { if err != nil {
return err return err
} }
d.l.Debug( "alsa device buffer size set", "buffersize", bufSize) d.l.Debug("alsa device buffer size set", "buffersize", bufSize)
if err = d.dev.Prepare(); err != nil { if err = d.dev.Prepare(); err != nil {
return err return err
} }
d.l.Debug( "successfully negotiated device params") d.l.Debug("successfully negotiated device params")
return nil return nil
} }
@ -366,43 +366,43 @@ func (d *ALSA) input() {
continue continue
case stopped: case stopped:
if d.dev != nil { if d.dev != nil {
d.l.Debug( "closing ALSA device", "title", d.title) d.l.Debug("closing ALSA device", "title", d.title)
d.dev.Close() d.dev.Close()
d.dev = nil d.dev = nil
} }
err := d.buf.Close() err := d.buf.Close()
if err != nil { if err != nil {
d.l.Error( "unable to close pool buffer", "error", err) d.l.Error("unable to close pool buffer", "error", err)
} }
return return
} }
// Read from audio device. // Read from audio device.
d.l.Debug( "recording audio for period", "seconds", d.RecPeriod) d.l.Debug("recording audio for period", "seconds", d.RecPeriod)
err := d.dev.Read(d.pb.Data) err := d.dev.Read(d.pb.Data)
if err != nil { if err != nil {
d.l.Debug( "read failed", "error", err.Error()) d.l.Debug("read failed", "error", err.Error())
err = d.open() // re-open err = d.open() // re-open
if err != nil { if err != nil {
d.l.Fatal( "reopening device failed", "error", err.Error()) d.l.Fatal("reopening device failed", "error", err.Error())
return return
} }
continue continue
} }
// Process audio. // Process audio.
d.l.Debug( "processing audio") d.l.Debug("processing audio")
toWrite := d.formatBuffer() toWrite := d.formatBuffer()
// Write audio to ringbuffer. // Write audio to ringbuffer.
n, err := d.buf.Write(toWrite.Data) n, err := d.buf.Write(toWrite.Data)
switch err { switch err {
case nil: case nil:
d.l.Debug( "wrote audio to ringbuffer", "length", n) d.l.Debug("wrote audio to ringbuffer", "length", n)
case pool.ErrDropped: case pool.ErrDropped:
d.l.Warning( "old audio data overwritten") d.l.Warning("old audio data overwritten")
default: default:
d.l.Error( "unexpected ringbuffer error", "error", err.Error()) d.l.Error("unexpected ringbuffer error", "error", err.Error())
} }
} }
} }
@ -410,36 +410,36 @@ func (d *ALSA) input() {
// Read reads from the ringbuffer, returning the number of bytes read upon success. // Read reads from the ringbuffer, returning the number of bytes read upon success.
func (d *ALSA) Read(p []byte) (int, error) { func (d *ALSA) Read(p []byte) (int, error) {
// Ready ringbuffer for read. // Ready ringbuffer for read.
d.l.Debug( pkg+"getting next chunk ready") d.l.Debug(pkg + "getting next chunk ready")
_, err := d.buf.Next(rbNextTimeout) _, err := d.buf.Next(rbNextTimeout)
if err != nil { if err != nil {
switch err { switch err {
case io.EOF: case io.EOF:
d.l.Debug( pkg+"EOF from Next") d.l.Debug(pkg + "EOF from Next")
return 0, err return 0, err
case pool.ErrTimeout: case pool.ErrTimeout:
d.l.Debug( pkg+"pool buffer timeout") d.l.Debug(pkg + "pool buffer timeout")
return 0, err return 0, err
default: default:
d.l.Error( pkg+"unexpected error from Next", "error", err.Error()) d.l.Error(pkg+"unexpected error from Next", "error", err.Error())
return 0, err return 0, err
} }
} }
// Read from pool buffer. // Read from pool buffer.
d.l.Debug( pkg+"reading from buffer") d.l.Debug(pkg + "reading from buffer")
n, err := d.buf.Read(p) n, err := d.buf.Read(p)
if err != nil { if err != nil {
switch err { switch err {
case io.EOF: case io.EOF:
d.l.Debug( pkg+"EOF from Read") d.l.Debug(pkg + "EOF from Read")
return n, err return n, err
default: default:
d.l.Error( pkg+"unexpected error from Read", "error", err.Error()) d.l.Error(pkg+"unexpected error from Read", "error", err.Error())
return n, err return n, err
} }
} }
d.l.Debug( fmt.Sprintf("%v read %v bytes", pkg, n)) d.l.Debug(fmt.Sprintf("%v read %v bytes", pkg, n))
return n, nil return n, nil
} }
@ -458,7 +458,7 @@ func (d *ALSA) formatBuffer() pcm.Buffer {
if d.pb.Format.Channels == 2 && d.Channels == 1 { if d.pb.Format.Channels == 2 && d.Channels == 1 {
formatted, err = pcm.StereoToMono(d.pb) formatted, err = pcm.StereoToMono(d.pb)
if err != nil { if err != nil {
d.l.Fatal( "channel conversion failed", "error", err.Error()) d.l.Fatal("channel conversion failed", "error", err.Error())
} }
} }
} }
@ -467,7 +467,7 @@ func (d *ALSA) formatBuffer() pcm.Buffer {
// Convert rate. // Convert rate.
formatted, err = pcm.Resample(formatted, d.SampleRate) formatted, err = pcm.Resample(formatted, d.SampleRate)
if err != nil { if err != nil {
d.l.Fatal( "rate conversion failed", "error", err.Error()) d.l.Fatal("rate conversion failed", "error", err.Error())
} }
} }
@ -478,11 +478,11 @@ func (d *ALSA) formatBuffer() pcm.Buffer {
enc := adpcm.NewEncoder(b) enc := adpcm.NewEncoder(b)
_, err = enc.Write(formatted.Data) _, err = enc.Write(formatted.Data)
if err != nil { if err != nil {
d.l.Fatal( "unable to encode", "error", err.Error()) d.l.Fatal("unable to encode", "error", err.Error())
} }
formatted.Data = b.Bytes() formatted.Data = b.Bytes()
default: default:
d.l.Error( "unhandled audio codec") d.l.Error("unhandled audio codec")
} }
return formatted return formatted

View File

@ -277,7 +277,7 @@ func (r *Raspivid) Start() error {
return fmt.Errorf("could not create raspivid args: %w", err) return fmt.Errorf("could not create raspivid args: %w", err)
} }
r.cfg.Logger.Info( pkg+"raspivid args", "raspividArgs", strings.Join(args, " ")) r.cfg.Logger.Info(pkg+"raspivid args", "raspividArgs", strings.Join(args, " "))
r.cmd = exec.Command("raspivid", args...) r.cmd = exec.Command("raspivid", args...)
r.out, err = r.cmd.StdoutPipe() r.out, err = r.cmd.StdoutPipe()
@ -294,17 +294,17 @@ func (r *Raspivid) Start() error {
for { for {
select { select {
case <-r.done: case <-r.done:
r.cfg.Logger.Info( "raspivid.Stop() called, finished checking stderr") r.cfg.Logger.Info("raspivid.Stop() called, finished checking stderr")
return return
default: default:
buf, err := ioutil.ReadAll(stderr) buf, err := ioutil.ReadAll(stderr)
if err != nil { if err != nil {
r.cfg.Logger.Error( "could not read stderr", "error", err) r.cfg.Logger.Error("could not read stderr", "error", err)
return return
} }
if len(buf) != 0 { if len(buf) != 0 {
r.cfg.Logger.Error( "error from raspivid stderr", "error", string(buf)) r.cfg.Logger.Error("error from raspivid stderr", "error", string(buf))
return return
} }
} }

View File

@ -152,7 +152,7 @@ func (w *Webcam) Start() error {
"-", "-",
) )
w.log.Info( pkg+"ffmpeg args", "args", strings.Join(args, " ")) w.log.Info(pkg+"ffmpeg args", "args", strings.Join(args, " "))
w.cmd = exec.Command("ffmpeg", args...) w.cmd = exec.Command("ffmpeg", args...)
var err error var err error
@ -172,29 +172,29 @@ func (w *Webcam) Start() error {
for { for {
select { select {
case <-w.done: case <-w.done:
w.cfg.Logger.Info( "webcam.Stop() called, finished checking stderr") w.cfg.Logger.Info("webcam.Stop() called, finished checking stderr")
return return
default: default:
buf, err := ioutil.ReadAll(stderr) buf, err := ioutil.ReadAll(stderr)
if err != nil { if err != nil {
w.cfg.Logger.Error( "could not read stderr", "error", err) w.cfg.Logger.Error("could not read stderr", "error", err)
return return
} }
if len(buf) != 0 { if len(buf) != 0 {
w.cfg.Logger.Error( "error from webcam stderr", "error", string(buf)) w.cfg.Logger.Error("error from webcam stderr", "error", string(buf))
return return
} }
} }
} }
}() }()
w.cfg.Logger.Info( "starting webcam") w.cfg.Logger.Info("starting webcam")
err = w.cmd.Start() err = w.cmd.Start()
if err != nil { if err != nil {
return fmt.Errorf("failed to start ffmpeg: %w", err) return fmt.Errorf("failed to start ffmpeg: %w", err)
} }
w.cfg.Logger.Info( "webcam started") w.cfg.Logger.Info("webcam started")
return nil return nil
} }

View File

@ -253,9 +253,9 @@ func main() {
func profile() { func profile() {
f, err := os.Create(profilePath) f, err := os.Create(profilePath)
if err != nil { if err != nil {
log.Fatal( pkg+"could not create CPU profile", "error", err.Error()) log.Fatal(pkg+"could not create CPU profile", "error", err.Error())
} }
if err := pprof.StartCPUProfile(f); err != nil { if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal( pkg+"could not start CPU profile", "error", err.Error()) log.Fatal(pkg+"could not start CPU profile", "error", err.Error())
} }
} }

View File

@ -138,14 +138,14 @@ func Dial(url string, log Log, options ...func(*Conn) error) (*Conn, error) {
var err error var err error
c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url) c.link.protocol, c.link.host, c.link.port, c.link.app, c.link.playpath, err = parseURL(url)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not parse url: %w",err) return nil, fmt.Errorf("could not parse url: %w", err)
} }
c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app c.link.url = rtmpProtocolStrings[c.link.protocol] + "://" + c.link.host + ":" + strconv.Itoa(int(c.link.port)) + "/" + c.link.app
c.link.protocol |= featureWrite c.link.protocol |= featureWrite
err = connect(&c) err = connect(&c)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not connect: %w",err) return nil, fmt.Errorf("could not connect: %w", err)
} }
return &c, nil return &c, nil
} }
@ -161,17 +161,17 @@ func (c *Conn) Close() error {
if c.link.protocol&featureWrite != 0 { if c.link.protocol&featureWrite != 0 {
err := sendFCUnpublish(c) err := sendFCUnpublish(c)
if err != nil { if err != nil {
return fmt.Errorf("could not send fc unpublish: %w",err) return fmt.Errorf("could not send fc unpublish: %w", err)
} }
} }
err := sendDeleteStream(c, float64(c.streamID)) err := sendDeleteStream(c, float64(c.streamID))
if err != nil { if err != nil {
return fmt.Errorf("could not send delete stream: %w",err) return fmt.Errorf("could not send delete stream: %w", err)
} }
} }
err := c.link.conn.Close() err := c.link.conn.Close()
if err != nil { if err != nil {
return fmt.Errorf("could not close link conn: %w",err) return fmt.Errorf("could not close link conn: %w", err)
} }
*c = Conn{} *c = Conn{}
return nil return nil
@ -201,7 +201,7 @@ func (c *Conn) Write(data []byte) (int, error) {
copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize]) copy(pkt.body, data[flvTagheaderSize:flvTagheaderSize+pkt.bodySize])
err := pkt.writeTo(c, false) err := pkt.writeTo(c, false)
if err != nil { if err != nil {
return 0, fmt.Errorf("could not write packet to connection: %w",err) return 0, fmt.Errorf("could not write packet to connection: %w", err)
} }
return len(data), nil return len(data), nil
} }
@ -214,18 +214,18 @@ func (c *Conn) Write(data []byte) (int, error) {
func (c *Conn) read(buf []byte) (int, error) { func (c *Conn) read(buf []byte) (int, error) {
err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) err := c.link.conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
if err != nil { if err != nil {
return 0, fmt.Errorf("could not set read deadline: %w",err) return 0, fmt.Errorf("could not set read deadline: %w", err)
} }
n, err := io.ReadFull(c.link.conn, buf) n, err := io.ReadFull(c.link.conn, buf)
if err != nil { if err != nil {
c.log(DebugLevel, pkg+"read failed", "error", err.Error()) c.log(DebugLevel, pkg+"read failed", "error", err.Error())
return 0, fmt.Errorf("could not read conn: %w",err) return 0, fmt.Errorf("could not read conn: %w", err)
} }
c.nBytesIn += uint32(n) c.nBytesIn += uint32(n)
if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) { if c.nBytesIn > (c.nBytesInSent + c.clientBW/10) {
err := sendBytesReceived(c) err := sendBytesReceived(c)
if err != nil { if err != nil {
return n, fmt.Errorf("could not send bytes received: %w",err) // NB: we still read n bytes, even though send bytes failed return n, fmt.Errorf("could not send bytes received: %w", err) // NB: we still read n bytes, even though send bytes failed
} }
} }
return n, nil return n, nil
@ -236,12 +236,12 @@ func (c *Conn) write(buf []byte) (int, error) {
//ToDo: consider using a different timeout for writes than for reads //ToDo: consider using a different timeout for writes than for reads
err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout))) err := c.link.conn.SetWriteDeadline(time.Now().Add(time.Second * time.Duration(c.link.timeout)))
if err != nil { if err != nil {
return 0, fmt.Errorf("could not set write deadline: %w",err) return 0, fmt.Errorf("could not set write deadline: %w", err)
} }
n, err := c.link.conn.Write(buf) n, err := c.link.conn.Write(buf)
if err != nil { if err != nil {
c.log(WarnLevel, pkg+"write failed", "error", err.Error()) c.log(WarnLevel, pkg+"write failed", "error", err.Error())
return 0, fmt.Errorf("could not write to conn: %w",err) return 0, fmt.Errorf("could not write to conn: %w", err)
} }
return n, nil return n, nil
} }

View File

@ -221,7 +221,6 @@ func connect(c *Conn) error {
pkt = packet{buf: buf[:]} pkt = packet{buf: buf[:]}
} }
} }
return nil return nil
} }
@ -670,12 +669,12 @@ func handleInvoke(c *Conn, body []byte) error {
case avCreatestream: case avCreatestream:
n, err := obj.NumberProperty("", 3) n, err := obj.NumberProperty("", 3)
if err != nil { if err != nil {
return fmt.Errorf("could not get value for stream id number property: %w",err) return fmt.Errorf("could not get value for stream id number property: %w", err)
} }
c.streamID = uint32(n) c.streamID = uint32(n)
err = sendPublish(c) err = sendPublish(c)
if err != nil { if err != nil {
return fmt.Errorf("could not send publish: %w",err) return fmt.Errorf("could not send publish: %w", err)
} }
default: default:

View File

@ -304,7 +304,7 @@ func (s *mtsSender) Write(d []byte) (int, error) {
s.pool.Flush() s.pool.Flush()
} }
if err != nil { if err != nil {
s.log.Warning("ringBuffer write error", "error", err.Error(), "n", n, "size", len(s.buf), "rb element size", adjustedMTSPoolElementSize) s.log.Warning("ringBuffer write error", "error", err.Error(), "n", n, "writeSize", len(s.buf), "rbElementSize", adjustedMTSPoolElementSize)
if err == pool.ErrTooLong { if err == pool.ErrTooLong {
adjustedMTSPoolElementSize = len(s.buf) * 2 adjustedMTSPoolElementSize = len(s.buf) * 2
numElements := maxBuffLen / adjustedMTSPoolElementSize numElements := maxBuffLen / adjustedMTSPoolElementSize

View File

@ -203,7 +203,7 @@ func TestMtsSenderFailedSend(t *testing.T) {
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
_, err := encoder.Write([]byte{byte(i)}) _, err := encoder.Write([]byte{byte(i)})
if err != nil { if err != nil {
t.Errorf("did not expect error from encoder write: %v",err) t.Errorf("did not expect error from encoder write: %v", err)
} }
} }
@ -286,7 +286,7 @@ func TestMtsSenderDiscontinuity(t *testing.T) {
for i := 0; i < noOfPacketsToWrite; i++ { for i := 0; i < noOfPacketsToWrite; i++ {
_, err := encoder.Write([]byte{byte(i)}) _, err := encoder.Write([]byte{byte(i)})
if err != nil { if err != nil {
t.Errorf("did not expect error from encoder write: %v",err) t.Errorf("did not expect error from encoder write: %v", err)
} }
} }

View File

@ -82,18 +82,18 @@ func (ts TurbiditySensor) Evaluate(imgs []gocv.Mat) (*Results, error) {
return nil, fmt.Errorf("could not transform image: %d: %w", i, err) return nil, fmt.Errorf("could not transform image: %d: %w", i, err)
} }
ts.log.Debug( "transform successful", "transform duration (sec)", time.Since(timer).Seconds()) ts.log.Debug("transform successful", "transform duration (sec)", time.Since(timer).Seconds())
timer = time.Now() timer = time.Now()
edge := ts.sobel(marker) edge := ts.sobel(marker)
ts.log.Debug( "sobel filter successful", "sobel duration", time.Since(timer).Seconds()) ts.log.Debug("sobel filter successful", "sobel duration", time.Since(timer).Seconds())
timer = time.Now() timer = time.Now()
sharpScore, contScore, err := ts.EvaluateImage(marker, edge) sharpScore, contScore, err := ts.EvaluateImage(marker, edge)
if err != nil { if err != nil {
return result, err return result, err
} }
ts.log.Debug( "sharpness and contrast evaluation successful", "evaluation duration", time.Since(timer).Seconds()) ts.log.Debug("sharpness and contrast evaluation successful", "evaluation duration", time.Since(timer).Seconds())
result.Update(sharpScore, contScore, float64(i), i) result.Update(sharpScore, contScore, float64(i), i)
} }
return result, nil return result, nil