device/file/file.go: simplify AVFile.Read() method

This commit is contained in:
Saxon Nelson-Milton 2020-12-21 11:52:35 +10:30
commit 6993f136c8
18 changed files with 246 additions and 92 deletions

View File

@ -3,7 +3,7 @@ jobs:
build: build:
docker: docker:
# CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/ # CircleCI Go images available at: https://hub.docker.com/r/circleci/golang/
- image: circleci/golang:1.13 - image: circleci/golang:1.15.2
environment: environment:
GO111MODULE: "on" GO111MODULE: "on"

View File

@ -1,52 +1,84 @@
# Install rc.local file # Install files and directories required by NetSender clients (such as gpio-netsender, rv, etc.)
# and create a dhcpcd.enter-hook for setting the MAC address.
# MA and DK can be optionally passed to Make, e.g, for a hard (first-time) installation:
# sudo MA=mac DK=dk install_hard
# NB: The default (soft) install does not override conf files.
USER := $(shell whoami) USER := $(shell whoami)
PATH := /usr/local/go/bin:$(PATH) PATH := /usr/local/go/bin:$(PATH)
ifeq ($(MA),)
MA := "00:E0:4C:00:00:01"
endif
ifeq ($(DK),)
DK := 0
endif
.SILENT:copy_files .SILENT:make_dirs
.SILENT:soft_copy_files
.SILENT:hard_copy_files .SILENT:hard_copy_files
.SILENT:build .SILENT:set_mac
.SILENT:syncreboot
.SILENT:clean .SILENT:clean
install: as_root copy_files build install: as_root make_dirs soft_copy_files
@echo "Install complete" @echo "Install complete"
install_hard: as_root hard_copy_files build install_hard: as_root make_dirs hard_copy_files set_mac syncreboot
@echo "Install complete" @echo "Hard install complete"
as_root: as_root:
ifneq ($(USER),root) ifneq ($(USER),root)
$(error Must run as superuser!) $(error Must run as superuser!)
endif endif
copy_files: make_dirs:
if [ -f /etc/rc.local ] ; then \ if [ ! -d /var/netsender ] ; then \
echo "/etc/rc.local left unmodified" ; \ mkdir /var/netsender; \
else \ chmod guo+rwx /var/netsender; \
cp rc.local /etc; \
fi fi
if [ ! -d /var/log/netsender ] ; then \
mkdir /var/log/netsender; \
chmod guo+rwx /var/log/netsender; \
fi
soft_copy_files:
if [ -f /etc/systemd/system/looper.service ] ; then \
echo "/etc/systemd/system/looper.service left unmodified" ; \
else \
cp looper.service /etc/systemd/system; \
fi
systemctl enable looper.service
chmod +x pi_run.sh
if [ -f /etc/netsender.conf ] ; then \ if [ -f /etc/netsender.conf ] ; then \
echo "/etc/netsender.conf left unmodified" ; \ echo "/etc/netsender.conf left unmodified" ; \
else \ else \
cp netsender.conf /etc; \ printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf; \
chown pi /etc/netsender.conf; \
fi fi
hard_copy_files: hard_copy_files:
if [ -f /etc/rc.local ] ; then \ if [ -f /etc/systemd/system/looper.service ] ; then \
echo "Backed up rc.local to /etc/rc.local.bak" ; \ echo "/etc/systemd/system/looper.service overwritten" ; \
cp /etc/rc.local /etc/rc.local.bak ; \
fi fi
cp -f rc.local /etc cp -f looper.service /etc/systemd/system
systemctl enable looper.service
chmod +x pi_run.sh
if [ -f /etc/netsender.conf ] ; then \ if [ -f /etc/netsender.conf ] ; then \
echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \ echo "Backed up netsender.conf to /etc/netsender.conf.bak"; \
cp /etc/netsender.conf /etc/netsender.conf.bak ; \ cp /etc/netsender.conf /etc/netsender.conf.bak ; \
fi fi
cp -f netsender.conf /etc printf "ma $(MA)\ndk $(DK)\n" > /etc/netsender.conf
chown pi /etc/netsender.conf
build: set_mac:
if grep -q 'Raspberry Pi 3' '/proc/device-tree/model'; then \ printf "ip link set eth0 address $(MA)\n" > /etc/dhcpcd.enter-hook
echo "Compiling for Raspberry pi 3";\ chmod guo+x /etc/dhcpcd.enter-hook
go build -tags pi3;\
else \ syncreboot:
echo "Compiling for Raspberry pi 0";\ cd ../../utils/cmd/syncreboot; make; make install
go build -tags pi0;\
fi clean: as_root
rm -rf /var/netsender
rm -rf /var/log/netsender
rm -rf /etc/systemd/system/looper.service
rm -rf /etc/netsender.conf
@echo "Clean complete"

10
cmd/looper/looper.service Normal file
View File

@ -0,0 +1,10 @@
[Unit]
Description=looper is a utility to robustly loop a sound file from a raspberry pi.
[Service]
Type=simple
ExecStart=/home/pi/go/src/bitbucket.org/ausocean/av/cmd/looper/pi_run.sh
Restart=on-failure
[Install]
WantedBy=multi-user.target

49
cmd/looper/pi_run.sh Normal file
View File

@ -0,0 +1,49 @@
#!/bin/sh -e
# This script launches looper on a pi, intended to run at boot time.
LOOPER_PATH=/home/pi/go/src/bitbucket.org/ausocean/av/cmd/looper
echo Set kernel parameters:
# kernel settings to improve performance on Raspberry Pi
# tell Linux to fork optimistically
sudo sysctl -w vm.overcommit_memory=1
# minimize swapping, without disabling it completely
sudo sysctl -w vm.swappiness=1
# the following required directories _should_ already exist
if [ ! -d /var/log/netsender ]; then
sudo mkdir /var/log/netsender
chmod guo+rwx /var/log/netsender
fi
if [ ! -d /var/netsender ]; then
sudo mkdir /var/netsender
chmod guo+rwx /var/netsender
fi
# show IP addresses
echo Our IP addresses:
sudo ip addr show | grep inet
# Start gpio stuff.
sudo systemctl start pigpiod
# capture stdout and stderr to a secondary log file (just in case)
exec 2> /var/log/netsender/stream.log
exec 1>&2
# set env, working dir and run looper as pi user
HOME=/home/pi
GOPATH=$HOME/go
LOOPER_PATH=$GOPATH/src/bitbucket.org/ausocean/av/cmd/looper
PATH=$PATH:/usr/local/go/bin:$LOOPER_PATH
cd $LOOPER_PATH
sudo HOME=$HOME GOPATH=$GOPATH PATH=$PATH ./looper
if [ $? -eq 0 ]
then
echo "Successfully exited looper"
exit 0
else
echo "looper exited with code: $?" >&2
exit 1
fi

View File

@ -127,12 +127,12 @@ func TestIsRunning(t *testing.T) {
InputCodec: codecutil.ADPCM, InputCodec: codecutil.ADPCM,
}) })
if err != nil { if err != nil {
t.Skipf("could not set device: %w", err) t.Skipf("could not set device: %v", err)
} }
err = d.Start() err = d.Start()
if err != nil { if err != nil {
t.Fatalf("could not start device %w", err) t.Fatalf("could not start device %v", err)
} }
time.Sleep(dur) time.Sleep(dur)

View File

@ -37,7 +37,7 @@ import (
// AVFile is an implementation of the AVDevice interface for a file containg // AVFile is an implementation of the AVDevice interface for a file containg
// audio or video data. // audio or video data.
type AVFile struct { type AVFile struct {
f io.ReadCloser f *os.File
cfg config.Config cfg config.Config
isRunning bool isRunning bool
} }
@ -82,7 +82,7 @@ func (m *AVFile) Stop() error {
// called and Stop has since been called, an error is returned. // called and Stop has since been called, an error is returned.
func (m *AVFile) Read(p []byte) (int, error) { func (m *AVFile) Read(p []byte) (int, error) {
if m.f != nil { if m.f != nil {
n, err := m.f.Read() n, err := m.f.Read(p)
if err != nil { if err != nil {
// In the case that we reach end of file but loop is true, we want to // In the case that we reach end of file but loop is true, we want to
// seek to start and keep reading from there. // seek to start and keep reading from there.
@ -93,16 +93,14 @@ func (m *AVFile) Read(p []byte) (int, error) {
} }
// Now that we've seeked to start, let's try reading again. // Now that we've seeked to start, let's try reading again.
n, err = m.f.Read() n, err = m.f.Read(p)
if err != nil { if err != nil {
return n, fmt.Errorf("could not read after start seek: %w", err) return n, fmt.Errorf("could not read after start seek: %w", err)
} }
return n, nil }
} }
return n, err return n, err
} }
return n, nil
}
return 0, errors.New("AV file is closed") return 0, errors.New("AV file is closed")
} }

View File

@ -41,12 +41,12 @@ func TestIsRunning(t *testing.T) {
InputPath: path, InputPath: path,
}) })
if err != nil { if err != nil {
t.Skipf("could not set device: %w", err) t.Skipf("could not set device: %v", err)
} }
err = d.Start() err = d.Start()
if err != nil { if err != nil {
t.Fatalf("could not start device %w", err) t.Fatalf("could not start device %v", err)
} }
time.Sleep(dur) time.Sleep(dur)

View File

@ -153,13 +153,13 @@ func TestCodecOut(t *testing.T) {
}, },
{ {
s: settings{ch: 1}, s: settings{ch: 1},
c: Codec(500), c: Codec("500"),
want: settings{ch: 1}, want: settings{ch: 1},
err: true, err: true,
}, },
{ {
s: settings{ch: 2}, s: settings{ch: 2},
c: Codec(500), c: Codec("500"),
want: settings{ch: 2}, want: settings{ch: 2},
err: true, err: true,
}, },

View File

@ -47,12 +47,12 @@ func TestIsRunning(t *testing.T) {
CameraIP: ip, CameraIP: ip,
}) })
if err != nil { if err != nil {
t.Skipf("could not set device: %w", err) t.Skipf("could not set device: %v", err)
} }
err = d.Start() err = d.Start()
if err != nil { if err != nil {
t.Fatalf("could not start device %w", err) t.Fatalf("could not start device %v", err)
} }
time.Sleep(dur) time.Sleep(dur)

View File

@ -45,12 +45,12 @@ func TestIsRunning(t *testing.T) {
InputCodec: codecutil.H264, InputCodec: codecutil.H264,
}) })
if err != nil { if err != nil {
t.Skipf("could not set device: %w", err) t.Skipf("could not set device: %v", err)
} }
err = d.Start() err = d.Start()
if err != nil { if err != nil {
t.Fatalf("could not start device %w", err) t.Fatalf("could not start device %v", err)
} }
time.Sleep(dur) time.Sleep(dur)

View File

@ -45,12 +45,12 @@ func TestIsRunning(t *testing.T) {
InputCodec: codecutil.H264, InputCodec: codecutil.H264,
}) })
if err != nil { if err != nil {
t.Skipf("could not set device: %w", err) t.Skipf("could not set device: %v", err)
} }
err = d.Start() err = d.Start()
if err != nil { if err != nil {
t.Fatalf("could not start device %w", err) t.Fatalf("could not start device %v", err)
} }
time.Sleep(dur) time.Sleep(dur)

View File

@ -60,12 +60,10 @@ package main
import ( import (
"encoding/json" "encoding/json"
"flag" "flag"
"fmt"
"os" "os"
"os/exec"
"runtime/pprof" "runtime/pprof"
"gopkg.in/natefinch/lumberjack.v2"
"bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/container/mts"
"bitbucket.org/ausocean/av/container/mts/meta" "bitbucket.org/ausocean/av/container/mts/meta"
"bitbucket.org/ausocean/av/revid" "bitbucket.org/ausocean/av/revid"
@ -82,12 +80,8 @@ const (
// Logging configuration. // Logging configuration.
const ( const (
logMaxSize = 500 // MB logLevel = logger.Info
logMaxBackups = 10 logSuppress = true
logMaxAge = 28 // days
logLevel = logger.Debug
logPath = "/var/log/netsender/netsender.log"
logSuppress = false
) )
// Misc consts. // Misc consts.
@ -96,15 +90,66 @@ const (
profilePath = "rvcl.prof" profilePath = "rvcl.prof"
) )
// Netsender conf consts.
const (
cfgPath = "/etc/netsender.conf"
fMode = 0777
)
// Default config parameters.
const (
defaultInput = "File"
defaultInputPath = "../../../test/test-data/av/input/betterInput.h264"
defaultFileFPS = "25"
defaultOutput = "RTP"
defaultRTPAddress = "localhost:6970"
defaultLoop = "true"
)
// canProfile is set to false with revid-cli is built with "-tags profile". // canProfile is set to false with revid-cli is built with "-tags profile".
var canProfile = false var canProfile = false
// The logger that will be used throughout. // The logger that will be used throughout.
var log *logger.Logger var log *logger.Logger
// stdoutLogger provides an io.Writer for the purpose of capturing stdout from
// the VLC process and using the logger to capture and print to stdout of
// this process.
type stdoutLogger struct {
l *logger.Logger
t string
}
func (sl *stdoutLogger) Write(d []byte) (int, error) {
sl.l.Info(sl.t + ": " + string(d))
return len(d), nil
}
// stderrLogger provides an io.Writer for the purpose of capturing stderr from
// the VLC process and using the logger to capture and print to stdout of
// this process.
type stderrLogger struct {
l *logger.Logger
t string
}
func (sl *stderrLogger) Write(d []byte) (int, error) {
sl.l.Error(sl.t + ": " + string(d))
return len(d), nil
}
func main() { func main() {
mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}}) mts.Meta = meta.NewWith([][2]string{{metaPreambleKey, metaPreambleData}})
// Create logger that methods will be called on by the netsender client and
// revid to log messages. Logs will go the lumberjack logger to handle file
// writing of messages.
log = logger.New(
logLevel,
os.Stdout,
logSuppress,
)
// If built with profile tag, we will start CPU profiling. // If built with profile tag, we will start CPU profiling.
if canProfile { if canProfile {
profile() profile()
@ -115,6 +160,7 @@ func main() {
var ( var (
configPtr = flag.String("config", "", "Provide configuration JSON to revid (see readme for further information).") configPtr = flag.String("config", "", "Provide configuration JSON to revid (see readme for further information).")
configFilePtr = flag.String("config-file", "", "Location of revid configuration file (see readme for further information).") configFilePtr = flag.String("config-file", "", "Location of revid configuration file (see readme for further information).")
rtpAddrPtr = flag.String("rtp-addr", defaultRTPAddress, "RTP destination address (<ip>:<port>)(common port=6970)")
) )
flag.Parse() flag.Parse()
@ -124,62 +170,79 @@ func main() {
err error err error
) )
switch { switch {
case *configPtr != "" && *configFilePtr != "": // This doesn't make sense so panic. // This doesn't make sense so panic.
case *configPtr != "" && *configFilePtr != "":
panic("cannot define both command-line config and file config") panic("cannot define both command-line config and file config")
case *configPtr != "": // Decode JSON file to map.
// Decode JSON file to map.
case *configPtr != "":
err = json.Unmarshal([]byte(*configPtr), &cfg) err = json.Unmarshal([]byte(*configPtr), &cfg)
if err != nil { if err != nil {
panic(fmt.Sprintf("could not decode JSON config: %w", err)) log.Fatal("could not decode JSON config", "error", err)
} }
case *configFilePtr != "": // Decode JSON string to map from command line flag.
// Decode JSON string to map from command line flag.
case *configFilePtr != "":
f, err := os.Open(*configFilePtr) f, err := os.Open(*configFilePtr)
if err != nil { if err != nil {
panic(fmt.Sprintf("could not open config file: %w", err)) log.Fatal("could not open config file", "error", err)
} }
err = json.NewDecoder(f).Decode(&cfg) err = json.NewDecoder(f).Decode(&cfg)
if err != nil { if err != nil {
panic(fmt.Sprintf("could not decode JSON config: %w", err)) log.Fatal("could not decode JSON config", "error", err)
}
default: // No config information has been provided; give empty map to force defaults.
cfg = map[string]string{}
} }
// Create logger that methods will be called on by the netsender client and // No config information has been provided; provide a default config map.
// revid to log messages. Logs will go the lumberjack logger to handle file default:
// writing of messages. cfg = map[string]string{
log = logger.New( "Input": defaultInput,
logLevel, "InputPath": defaultInputPath,
&lumberjack.Logger{ "FileFPS": defaultFileFPS,
Filename: logPath, "Output": defaultOutput,
MaxSize: logMaxSize, // MB "RTPAddress": *rtpAddrPtr,
MaxBackups: logMaxBackups, "Loop": defaultLoop,
MaxAge: logMaxAge, // days }
}, }
logSuppress, log.Info("got config", "config", cfg)
)
// Create a netsender client. This is used only for HTTP sending of media // Create a netsender client. This is used only for HTTP sending of media
// in this binary. // in this binary.
ns, err := netsender.New(log, nil, nil, nil, nil) ns, err := netsender.New(log, nil, nil, nil, nil)
if err != nil { if err != nil {
log.Log(logger.Fatal, pkg+"could not initialise netsender client: "+err.Error()) log.Fatal("could not initialise netsender client", "error", err)
} }
// Create the revid client, responsible for media collection and processing.
log.Info("got creating revid client")
rv, err := revid.New(config.Config{Logger: log}, ns) rv, err := revid.New(config.Config{Logger: log}, ns)
if err != nil { if err != nil {
panic(fmt.Sprintf("could not create revid: %w", err)) log.Fatal("could not create revid", "error", err)
} }
// Configure revid with configuration map obtained through flags or file. // Configure revid with configuration map obtained through flags or file.
// If config is empty, defaults will be adopted by revid. // If config is empty, defaults will be adopted by revid.
log.Info("updating revid with config")
err = rv.Update(cfg) err = rv.Update(cfg)
if err != nil { if err != nil {
panic(fmt.Sprintf("could not update revid config: %w", err)) log.Fatal("could not update revid config", "error", err)
} }
log.Info("starting revid")
err = rv.Start() err = rv.Start()
if err != nil { if err != nil {
panic(fmt.Sprintf("could not start revid: %w", err)) log.Fatal("could not start revid", "error", err)
}
// If output is RTP, open up a VLC window to see stream.
if v, ok := cfg["Output"]; ok && v == "RTP" {
log.Info("opening vlc window")
cmd := exec.Command("vlc", "rtp://"+*rtpAddrPtr)
cmd.Stdout = &stdoutLogger{log, "VLC STDOUT"}
cmd.Stderr = &stderrLogger{log, "VLC STDERR"}
err = cmd.Start()
if err != nil {
log.Fatal("could not run vlc command", "error", err)
}
} }
// Run indefinitely. // Run indefinitely.

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.13
require ( require (
bitbucket.org/ausocean/iot v1.3.0 bitbucket.org/ausocean/iot v1.3.0
bitbucket.org/ausocean/utils v1.2.14 bitbucket.org/ausocean/utils v1.2.15
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7
github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480 github.com/go-audio/audio v0.0.0-20181013203223-7b2a6ca21480
github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884 github.com/go-audio/wav v0.0.0-20181013172942-de841e69b884

2
go.sum
View File

@ -4,6 +4,8 @@ bitbucket.org/ausocean/utils v1.2.11 h1:zA0FOaPjN960ryp8PKCkV5y50uWBYrIxCVnXjwbv
bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.11/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.14 h1:v5eBYavkEqKOBCppR6P451eT9UT/CQReMsOZZBUPX3Q= bitbucket.org/ausocean/utils v1.2.14 h1:v5eBYavkEqKOBCppR6P451eT9UT/CQReMsOZZBUPX3Q=
bitbucket.org/ausocean/utils v1.2.14/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8= bitbucket.org/ausocean/utils v1.2.14/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
bitbucket.org/ausocean/utils v1.2.15 h1:Pz99ZfobdhACTtU6oj9BTyBcNSQulLvPT7wq4P343Es=
bitbucket.org/ausocean/utils v1.2.15/go.mod h1:uXzX9z3PLemyURTMWRhVI8uLhPX4uuvaaO85v2hcob8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw= github.com/Comcast/gots v0.0.0-20190305015453-8d56e473f0f7 h1:LdOc9B9Bj6LEsKiXShkLA3/kpxXb6LJpH+ekU2krbzw=