revid: general bug fixing

Use http instead of https in gvctrl getLogin request. Gave RTP client a close method so that the conn can
be closed after we're done with the client. Put timeout on reading from the RTP client PacketConn so that
we don't hang on the ReadFrom call if the conn is closed. Closing the RTSP and RTP clients when
startRTSPCamera is returned.
This commit is contained in:
Saxon 2019-10-22 23:04:33 +10:30
parent 504179c03b
commit f336a03d7a
6 changed files with 41 additions and 19 deletions

View File

@ -31,6 +31,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io" "io"
"net"
"time" "time"
"bitbucket.org/ausocean/av/codec/h264/h264dec" "bitbucket.org/ausocean/av/codec/h264/h264dec"
@ -95,9 +96,9 @@ func (e *Extracter) Extract(dst io.Writer, src io.Reader, delay time.Duration) e
buf := make([]byte, maxRTPSize) buf := make([]byte, maxRTPSize)
for { for {
n, err := src.Read(buf) n, err := src.Read(buf)
switch err { switch {
case nil: // Do nothing. case err == nil: // Do nothing.
case io.EOF: case err == io.EOF || err.(net.Error).Timeout():
return nil return nil
default: default:
return fmt.Errorf("source read error: %v\n", err) return fmt.Errorf("source read error: %v\n", err)

View File

@ -52,7 +52,7 @@ const (
// from which (as well as username and password) two hashes are generated. // from which (as well as username and password) two hashes are generated.
// The generated hex is encoded into a url encoded form and returned as a string. // The generated hex is encoded into a url encoded form and returned as a string.
func getLogin(c *http.Client, id, host string) (string, error) { func getLogin(c *http.Client, id, host string) (string, error) {
req, err := http.NewRequest("GET", "https://"+host+loginSubDir, nil) req, err := http.NewRequest("GET", "http://"+host+loginSubDir, nil)
if err != nil { if err != nil {
return "", fmt.Errorf("can't create GET request for log-in page: %v", err) return "", fmt.Errorf("can't create GET request for log-in page: %v", err)
} }

View File

@ -28,8 +28,10 @@ LICENSE
package rtp package rtp
import ( import (
"fmt"
"net" "net"
"sync" "sync"
"time"
) )
// Client describes an RTP client that can receive an RTP stream and implements // Client describes an RTP client that can receive an RTP stream and implements
@ -58,6 +60,7 @@ func NewClient(addr string) (*Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return c, nil return c, nil
} }
@ -81,6 +84,11 @@ func (c *Client) Read(p []byte) (int, error) {
return n, err return n, err
} }
// Close will close the RTP client's connection.
func (c *Client) Close() error {
return c.r.PacketConn.Close()
}
// setSequence sets the most recently received sequence number, and updates the // setSequence sets the most recently received sequence number, and updates the
// cycles count if the sequence number has rolled over. // cycles count if the sequence number has rolled over.
func (c *Client) setSequence(s uint16) { func (c *Client) setSequence(s uint16) {
@ -113,6 +121,11 @@ type PacketReader struct {
// Read implements io.Reader. // Read implements io.Reader.
func (r PacketReader) Read(b []byte) (int, error) { func (r PacketReader) Read(b []byte) (int, error) {
const readTimeout = time.Second
err := r.PacketConn.SetReadDeadline(time.Now().Add(readTimeout))
if err != nil {
return 0, fmt.Errorf("could not set read deadline for PacketConn: %w", err)
}
n, _, err := r.PacketConn.ReadFrom(b) n, _, err := r.PacketConn.ReadFrom(b)
return n, err return n, err
} }

View File

@ -317,13 +317,13 @@ func (c *Config) Validate() error {
c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity) c.Logger.Log(logger.Info, pkg+"bad LogLevel mode defined, defaulting", "LogLevel", defaultVerbosity)
} }
switch c.Input {
case Raspivid, V4L, File, Audio:
case RTSP:
if c.CameraIP == "" { if c.CameraIP == "" {
c.Logger.Log(logger.Info, pkg+"no CameraIP defined, defaulting", "CameraIP", defaultCameraIP) c.Logger.Log(logger.Info, pkg+"no CameraIP defined, defaulting", "CameraIP", defaultCameraIP)
c.CameraIP = defaultCameraIP c.CameraIP = defaultCameraIP
} }
switch c.Input {
case Raspivid, V4L, File, Audio, RTSP:
case NothingDefined: case NothingDefined:
c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput) c.Logger.Log(logger.Info, pkg+"no input type defined, defaulting", "input", defaultInput)
c.Input = defaultInput c.Input = defaultInput

View File

@ -109,7 +109,7 @@ func (r *Revid) startRaspivid() (func() error, error) {
} }
err = r.cmd.Start() err = r.cmd.Start()
if err != nil { if err != nil {
r.config.Logger.Log(logger.Fatal, pkg+"cannot start raspivid", "error", err.Error()) return nil, fmt.Errorf("could not start raspivid command: %w", err)
} }
r.wg.Add(1) r.wg.Add(1)
@ -272,7 +272,16 @@ func (r *Revid) startRTSPCamera() (func() error, error) {
go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate)) go r.processFrom(rtpClt, time.Second/time.Duration(r.config.FrameRate))
return func() error { return func() error {
rtspClt.Close() err := rtpClt.Close()
if err != nil {
return fmt.Errorf("could not close RTP client: %w", err)
}
err = rtspClt.Close()
if err != nil {
return fmt.Errorf("could not close RTSP client: %w", err)
}
rtcpClt.Stop() rtcpClt.Stop()
return nil return nil
}, nil }, nil

View File

@ -134,17 +134,12 @@ func (r *Revid) Config() Config {
return r.config return r.config
} }
// TODO(Saxon): put more thought into error severity. // TODO(Saxon): put more thought into error severity and how to handle these.
func (r *Revid) handleErrors() { func (r *Revid) handleErrors() {
for { for {
err := <-r.err err := <-r.err
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"async error", "error", err.Error())
r.Stop()
err = r.Start()
if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to restart revid", "error", err.Error())
}
} }
} }
} }
@ -362,7 +357,6 @@ func (r *Revid) Start() error {
} }
r.closeInput, err = r.setupInput() r.closeInput, err = r.setupInput()
if err != nil { if err != nil {
r.Stop()
return err return err
} }
r.running = true r.running = true
@ -394,12 +388,17 @@ func (r *Revid) Stop() {
if err != nil { if err != nil {
r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error()) r.config.Logger.Log(logger.Error, pkg+"failed to close pipeline", "error", err.Error())
} }
r.config.Logger.Log(logger.Info, pkg+"closed pipeline")
if r.cmd != nil && r.cmd.Process != nil { if r.cmd != nil && r.cmd.Process != nil {
r.config.Logger.Log(logger.Info, pkg+"killing input proccess") r.config.Logger.Log(logger.Info, pkg+"killing input proccess")
r.cmd.Process.Kill() r.cmd.Process.Kill()
} }
r.config.Logger.Log(logger.Info, pkg+"waiting for routines to close")
r.wg.Wait() r.wg.Wait()
r.config.Logger.Log(logger.Info, pkg+"revid stopped")
r.running = false r.running = false
} }