diff --git a/container/mts/encoder.go b/container/mts/encoder.go index a903500b..90177fde 100644 --- a/container/mts/encoder.go +++ b/container/mts/encoder.go @@ -39,11 +39,11 @@ import ( "bitbucket.org/ausocean/utils/realtime" ) -// Media type values. -// TODO: reference relevant specifications. +// Stream IDs as per ITU-T Rec. H.222.0 / ISO/IEC 13818-1 [1], tables 2-22 and 2-34. const ( H264ID = 27 H265ID = 36 + MJPEGID = 28 audioStreamID = 0xc0 // ADPCM audio stream ID. ) @@ -51,9 +51,16 @@ const ( const ( EncodeH264 = iota EncodeH265 + EncodeMJPEG EncodeAudio ) +// The program IDs we assign to different types of media. +const ( + VideoPid = 256 + AudioPid = 210 +) + // Time-related constants. const ( // ptsOffset is the offset added to the clock to determine @@ -73,6 +80,16 @@ const ( // If we are not using NAL based PSI intervals then we will send PSI every 7 packets. const psiSendCount = 7 +const ( + hasPayload = 0x1 + hasAdaptationField = 0x2 +) + +const ( + hasDTS = 0x1 + hasPTS = 0x2 +) + // Some common manifestations of PSI. var ( // StandardPAT is a minimal PAT. @@ -147,21 +164,24 @@ type Encoder struct { // NewEncoder returns an Encoder with the specified media type and rate eg. if a video stream // calls write for every frame, the rate will be the frame rate of the video. -func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { - var mPid uint16 - var sid byte +func NewEncoder(dst io.WriteCloser, rate float64, mediaType int, options ...func(*Encoder) error) (*Encoder, error) { + var mPID uint16 + var sID byte nbp := true switch mediaType { case EncodeAudio: - mPid = AudioPid - sid = audioStreamID + mPID = AudioPid + sID = audioStreamID nbp = false case EncodeH265: - mPid = VideoPid - sid = H265ID + mPID = VideoPid + sID = H265ID case EncodeH264: - mPid = VideoPid - sid = H264ID + mPID = VideoPid + sID = H264ID + case EncodeMJPEG: + mPID = VideoPid + sID = MJPEGID } pmt := BasePMT @@ -169,14 +189,14 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { Pcrpid: 0x0100, Pil: 0, Essd: &psi.ESSD{ - St: byte(sid), - Epid: mPid, + St: byte(sID), + Epid: mPID, Esil: 0x00, }, } pmtTable = pmt.Bytes() - return &Encoder{ + e := &Encoder{ dst: dst, writePeriod: time.Duration(float64(time.Second) / rate), @@ -186,31 +206,35 @@ func NewEncoder(dst io.WriteCloser, rate float64, mediaType int) *Encoder { pktCount: 8, - mediaPid: mPid, - streamID: sid, + mediaPid: mPID, + streamID: sID, continuity: map[uint16]byte{ PatPid: 0, PmtPid: 0, - mPid: 0, + mPID: 0, }, } + + for _, option := range options { + err := option(e) + if err != nil { + return nil, fmt.Errorf("option failed with error: %v", err) + } + } + return e, nil } -const ( - hasPayload = 0x1 - hasAdaptationField = 0x2 -) - -const ( - hasDTS = 0x1 - hasPTS = 0x2 -) - -func (e *Encoder) NALBasedPSI(b bool, sendCount int) { - e.nalBasedPSI = b - e.psiSendCount = sendCount - e.pktCount = e.psiSendCount +// PacketBasedPSI is an option that can be passed to NewEncoder to select +// packet based PSI writing, i.e. PSI are written to the destination every +// sendCount packets. +func PacketBasedPSI(sendCount int) func(*Encoder) error { + return func(e *Encoder) error { + e.nalBasedPSI = false + e.psiSendCount = sendCount + e.pktCount = e.psiSendCount + return nil + } } // Write implements io.Writer. Write takes raw video or audio data and encodes into MPEG-TS, diff --git a/container/mts/encoder_test.go b/container/mts/encoder_test.go index 6b46703e..e154dcc5 100644 --- a/container/mts/encoder_test.go +++ b/container/mts/encoder_test.go @@ -3,8 +3,8 @@ NAME encoder_test.go AUTHOR - Trek Hopton Saxon A. Nelson-Milton + Trek Hopton LICENSE encoder_test.go is Copyright (C) 2017-2019 the Australian Ocean Lab (AusOcean) @@ -101,12 +101,14 @@ func TestEncodeVideo(t *testing.T) { // Create the dst and write the test data to encoder. dst := &destination{} - e := NewEncoder(nopCloser{dst}, 25, EncodeH264) - e.NALBasedPSI(false, psiSendCount) - - _, err := e.Write(data) + e, err := NewEncoder(nopCloser{dst}, 25, EncodeH264, PacketBasedPSI(psiSendCount)) if err != nil { - t.Fatalf("could not write data to encoder, failed with err: %v\n", err) + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + + _, err = e.Write(data) + if err != nil { + t.Fatalf("could not write data to encoder, failed with error: %v\n", err) } // Check headers. @@ -163,7 +165,10 @@ func TestEncodePcm(t *testing.T) { sampleSize := 2 blockSize := 16000 writeFreq := float64(sampleRate*sampleSize) / float64(blockSize) - e := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) + e, err := NewEncoder(nopCloser{&buf}, writeFreq, EncodeAudio) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } inPath := "../../../test/test-data/av/input/sweep_400Hz_20000Hz_-3dBFS_5s_48khz.pcm" inPcm, err := ioutil.ReadFile(inPath) @@ -264,7 +269,11 @@ const fps = 25 func TestMetaEncode1(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") if err := e.writePSI(); err != nil { t.Errorf("unexpected error: %v\n", err.Error()) @@ -292,7 +301,11 @@ func TestMetaEncode1(t *testing.T) { func TestMetaEncode2(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { @@ -320,7 +333,11 @@ func TestMetaEncode2(t *testing.T) { func TestExtractMeta(t *testing.T) { Meta = meta.New() var buf bytes.Buffer - e := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + e, err := NewEncoder(nopCloser{&buf}, fps, EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } + Meta.Add("ts", "12345678") Meta.Add("loc", "1234,4321,1234") if err := e.writePSI(); err != nil { diff --git a/container/mts/mpegts.go b/container/mts/mpegts.go index fe31b5ce..8352a984 100644 --- a/container/mts/mpegts.go +++ b/container/mts/mpegts.go @@ -42,13 +42,11 @@ import ( const PacketSize = 188 -// Program ID for various types of ts packets. +// Standard program IDs for program specific information MPEG-TS packets. const ( - SdtPid = 17 - PatPid = 0 - PmtPid = 4096 - VideoPid = 256 - AudioPid = 210 + SdtPid = 17 + PatPid = 0 + PmtPid = 4096 ) // StreamID is the id of the first stream. diff --git a/revid/config.go b/revid/config.go index 87fb9139..21f509b4 100644 --- a/revid/config.go +++ b/revid/config.go @@ -286,12 +286,7 @@ func (c *Config) Validate() error { } switch c.InputCodec { - case codecutil.H264: - case codecutil.MJPEG: - if c.Quantization > 0 || c.Bitrate == 0 { - return errors.New("bad bitrate or quantization for mjpeg input") - } - case codecutil.PCM, codecutil.ADPCM: + case codecutil.H264, codecutil.MJPEG, codecutil.PCM, codecutil.ADPCM: default: switch c.Input { case Audio: diff --git a/revid/revid.go b/revid/revid.go index 0f357ebd..085fcdd4 100644 --- a/revid/revid.go +++ b/revid/revid.go @@ -43,6 +43,7 @@ import ( "bitbucket.org/ausocean/av/codec/codecutil" "bitbucket.org/ausocean/av/codec/h264" "bitbucket.org/ausocean/av/codec/h265" + "bitbucket.org/ausocean/av/codec/mjpeg" "bitbucket.org/ausocean/av/container/flv" "bitbucket.org/ausocean/av/container/mts" "bitbucket.org/ausocean/av/protocol/rtcp" @@ -121,7 +122,7 @@ func New(c Config, ns *netsender.Sender) (*Revid, error) { r := Revid{ns: ns, err: make(chan error)} err := r.setConfig(c) if err != nil { - return nil, fmt.Errorf("could not set config, failed with error: %v",err) + return nil, fmt.Errorf("could not set config, failed with error: %v", err) } r.config.Logger.SetLevel(c.LogLevel) go r.handleErrors() @@ -171,16 +172,26 @@ func (r *Revid) reset(config Config) error { err = r.setupPipeline( func(dst io.WriteCloser, fps float64) (io.WriteCloser, error) { var st int + var encOptions []func(*mts.Encoder) error + switch r.config.Input { - case Raspivid, File, V4L: + case Raspivid: + switch r.config.InputCodec { + case codecutil.H264: + st = mts.EncodeH264 + case codecutil.MJPEG: + st = mts.EncodeMJPEG + encOptions = append(encOptions, mts.PacketBasedPSI(int(r.config.MinFrames))) + } + case File, V4L: st = mts.EncodeH264 case RTSP: st = mts.EncodeH265 case Audio: st = mts.EncodeAudio } - e := mts.NewEncoder(dst, float64(fps), st) - return e, nil + + return mts.NewEncoder(dst, float64(fps), st, encOptions...) }, func(dst io.WriteCloser, fps int) (io.WriteCloser, error) { return flv.NewEncoder(dst, true, true, fps) @@ -290,7 +301,12 @@ func (r *Revid) setupPipeline(mtsEnc func(dst io.WriteCloser, rate float64) (io. switch r.config.Input { case Raspivid: r.setupInput = r.startRaspivid - r.lexTo = h264.Lex + switch r.config.InputCodec { + case codecutil.H264: + r.lexTo = h264.Lex + case codecutil.MJPEG: + r.lexTo = mjpeg.Lex + } case V4L: r.setupInput = r.startV4L r.lexTo = h264.Lex @@ -390,6 +406,15 @@ func (r *Revid) Update(vars map[string]string) error { r.config.Exposure = value case "AutoWhiteBalance": r.config.AutoWhiteBalance = value + case "InputCodec": + switch value { + case "H264": + r.config.InputCodec = codecutil.H264 + case "MJPEG": + r.config.InputCodec = codecutil.MJPEG + default: + r.config.Logger.Log(logger.Warning, pkg+"invalid InputCodec variable value", "value", value) + } case "Output": outputs := strings.Split(value, ",") r.config.Outputs = make([]uint8, len(outputs)) diff --git a/revid/senders_test.go b/revid/senders_test.go index 3fe2f291..e6a39c7a 100644 --- a/revid/senders_test.go +++ b/revid/senders_test.go @@ -135,11 +135,12 @@ func TestMtsSenderSegment(t *testing.T) { const numberOfClips = 11 dst := &destination{t: t, done: make(chan struct{}), doneAt: numberOfClips} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -213,11 +214,12 @@ func TestMtsSenderFailedSend(t *testing.T) { const clipToFailAt = 3 dst := &destination{t: t, testFails: true, failAt: clipToFailAt, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(defaultMTSRBSize, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder and send PSI every 10 packets. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264, mts.PacketBasedPSI(psiSendCount)) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number. @@ -293,11 +295,12 @@ func TestMtsSenderDiscontinuity(t *testing.T) { const clipToDelay = 3 dst := &destination{t: t, sendDelay: 10 * time.Millisecond, delayAt: clipToDelay, done: make(chan struct{})} sender := newMtsSender(dst, (*dummyLogger)(t).log, ring.NewBuffer(1, defaultMTSRBElementSize, 0), 0) - encoder := mts.NewEncoder(sender, 25, mts.EncodeH264) - // Turn time based PSI writing off for encoder. const psiSendCount = 10 - encoder.NALBasedPSI(false, psiSendCount) + encoder, err := mts.NewEncoder(sender, 25, mts.EncodeH264) + if err != nil { + t.Fatalf("could not create MTS encoder, failed with error: %v", err) + } // Write the packets to the encoder, which will in turn write to the mtsSender. // Payload will just be packet number.