Fixing bugs

This commit is contained in:
Unknown 2018-03-13 15:03:31 +10:30
parent 57d416563a
commit 76df33ceb9
10 changed files with 48 additions and 537 deletions

View File

@ -66,7 +66,7 @@ func btb(b bool) byte {
func (h *Header) ToByteSlice() (output []byte) { func (h *Header) ToByteSlice() (output []byte) {
output = make([]byte, 0, headerLength) output = make([]byte, 0, headerLength)
output = append(output, flvheaderCode...) output = append(output, flvHeaderCode...)
output = append(output, []byte { output = append(output, []byte {
version, version,
0x00 | btb(h.AudioFlag)<<2 | btb(h.VideoFlag), 0x00 | btb(h.AudioFlag)<<2 | btb(h.VideoFlag),

View File

@ -131,16 +131,17 @@ func getNalType(frame []byte) byte {
} }
} }
} }
return 0x00
} }
// isKeyFrame checks the nature of the passed frame - returning true if the // isKeyFrame checks the nature of the passed frame - returning true if the
// frame is keyframe and false otherwise // frame is keyframe and false otherwise
func isKeyFrame(frame []byte) bool { func isKeyFrame(frame []byte) bool {
nalType := getNaleType(frame) nalType := getNalType(frame)
switch { switch {
case nalType == interFrameCode: case nalType == interFrameCode:
return false return false
case nalType == keyFramecode || nalType == 6: case nalType == keyFrameCode || nalType == 6:
return true return true
} }
return false return false
@ -149,9 +150,9 @@ func isKeyFrame(frame []byte) bool {
// isSequenceHeader checks the nature of the passed frame and returns true // isSequenceHeader checks the nature of the passed frame and returns true
// if it is a sequnce header and false otherwise // if it is a sequnce header and false otherwise
func isSequenceHeader(frame []byte) bool { func isSequenceHeader(frame []byte) bool {
nalType := getnaleType(frame) nalType := getNalType(frame)
switch { switch {
case nalType == 1 || naltype == 5: case nalType == 1 || nalType == 5:
return false return false
case nalType == 6 || nalType == 7 || nalType == 8: case nalType == 6 || nalType == 7 || nalType == 8:
return true return true
@ -181,7 +182,7 @@ func (g *flvGenerator) generate() {
timeStamp := g.getNextTimestamp() timeStamp := g.getNextTimestamp()
// Do we have video to send off ? // Do we have video to send off ?
if g.videoFlag { if g.videoFlag {
g.outputChan <- flv.VideoTag{ tag := flv.VideoTag{
TagType: uint8(flv.VideoTagType), TagType: uint8(flv.VideoTagType),
DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength, DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength,
Timestamp: timeStamp, Timestamp: timeStamp,
@ -192,12 +193,13 @@ func (g *flvGenerator) generate() {
CompositionTime: 0, CompositionTime: 0,
Data: videoFrame, Data: videoFrame,
PrevTagSize: uint32(videoHeaderSize + len(videoFrame)), PrevTagSize: uint32(videoHeaderSize + len(videoFrame)),
}.ToByteSlice() }
g.outputChan <- tag.ToByteSlice()
} }
// Do we even have some audio to send off ? // Do we even have some audio to send off ?
if g.audioFlag { if g.audioFlag {
// Not sure why but we need two audio tags for dummy silent audio // Not sure why but we need two audio tags for dummy silent audio
g.outputChan <- flv.AudioTag{ tag := flv.AudioTag{
TagType: uint8(flv.AudioTagType), TagType: uint8(flv.AudioTagType),
DataSize: 7, DataSize: 7,
Timestamp: timeStamp, Timestamp: timeStamp,
@ -208,9 +210,10 @@ func (g *flvGenerator) generate() {
SoundType: true, SoundType: true,
Data: dummyAudioTag1Data, Data: dummyAudioTag1Data,
PrevTagSize: uint32(audioSize), PrevTagSize: uint32(audioSize),
}.ToByteSlice() }
g.outputChan <- tag.ToByteSlice()
g.outputChan <- flv.AudioTag{ tag = flv.AudioTag{
TagType: uint8(flv.AudioTagType), TagType: uint8(flv.AudioTagType),
DataSize: 21, DataSize: 21,
Timestamp: timeStamp, Timestamp: timeStamp,
@ -221,7 +224,8 @@ func (g *flvGenerator) generate() {
SoundType: true, SoundType: true,
Data: dummyAudioTag2Data, Data: dummyAudioTag2Data,
PrevTagSize: uint32(22), PrevTagSize: uint32(22),
}.ToByteSlice() }
g.outputChan <- tag.ToByteSlice()
} }
} }
} }

View File

@ -35,8 +35,8 @@ import (
//"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
"../mpegts" "../mpegts"
"../pes" "../pes"
"../rtp" //"../rtp"
"../tools" //"../tools"
) )
// TODO: really need to finish the at and pmt stuff - this is too hacky // TODO: really need to finish the at and pmt stuff - this is too hacky
@ -44,7 +44,7 @@ var (
patTableStart = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178} patTableStart = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178}
patTable []byte patTable []byte
pmtTableStart = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56} pmtTableStart = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56}
pmtTble []byte pmtTable []byte
) )
// genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them // genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them
@ -52,14 +52,14 @@ var (
// NewMpegtsgenerator // NewMpegtsgenerator
func genPatAndPmt() { func genPatAndPmt() {
patTable = make([]byte, 0, 184) patTable = make([]byte, 0, 184)
patTable = append(pat, patTableStart...) patTable = append(patTable, patTableStart...)
for i := 0; i < 167; i++ { for i := 0; i < 167; i++ {
pmtTable = append(pmt, 255) pmtTable = append(pmtTable, 255)
} }
pmtTable = make([]byte, 0, 184) pmtTable = make([]byte, 0, 184)
pmtTable = append(pmt, pmtTableStart...) pmtTable = append(pmtTable, pmtTableStart...)
for i := 0; i < 162; i++ { for i := 0; i < 162; i++ {
pmtTable = append(pmt, 255) pmtTable = append(pmtTable, 255)
} }
} }
@ -70,16 +70,15 @@ const (
videoPid = 256 videoPid = 256
streamID = 0xE0 streamID = 0xE0
outputChanSize = 100 outputChanSize = 100
inputChanSie = 10000 inputChanSize = 10000
pesPktChanSize = 1000 pesPktChanSize = 1000
payloadbyteChanSize = 100000 payloadByteChanSize = 100000
ptsOffset = .7 ptsOffset = .7
maxCC = 15 maxCC = 15
) )
// tsGenerator encapsulates properties of an mpegts generator. // tsGenerator encapsulates properties of an mpegts generator.
type tsGenerator struct { type tsGenerator struct {
rtpInputChan chan rtp.RtpPacket
outputChan chan []byte outputChan chan []byte
nalInputChan chan []byte nalInputChan chan []byte
currentTsPacket *mpegts.MpegTsPacket currentTsPacket *mpegts.MpegTsPacket
@ -113,14 +112,14 @@ func NewTsGenerator(fps uint) (g *tsGenerator) {
g.fps = fps g.fps = fps
g.currentPcrTime = 0.0 g.currentPcrTime = 0.0
g.currentPtsTime = ptsOffset g.currentPtsTime = ptsOffset
g.pesPktChan = make(chan []byte, pesPktchanSize) g.pesPktChan = make(chan []byte, pesPktChanSize)
g.payloadByteChan = make(chan byte, payloadByteChanSize) g.payloadByteChan = make(chan byte, payloadByteChanSize)
g.ccMap = make(map[int]int, 4) g.ccMap = make(map[int]int, 4)
g.ccMap[SdtPid] = 0 g.ccMap[SdtPid] = 0
g.ccMap[PatPid] = 0 g.ccMap[PatPid] = 0
g.ccMap[pmtPid] = 0 g.ccMap[pmtPid] = 0
g.ccMap[videoPid] = 0 g.ccMap[videoPid] = 0
getPatAndPmt() genPatAndPmt()
return return
} }
@ -144,7 +143,7 @@ func (g *tsGenerator) Start() {
go g.generate() go g.generate()
} }
// getCC returns the next continuity counter for a particular pid // getCC returns the next continuity counter for a particular pid
func (g *tsGenerator) getCC(pid int) int { func (g *tsGenerator) getCC(pid int) int {
temp := g.ccMap[pid] temp := g.ccMap[pid]
if g.ccMap[pid]++; g.ccMap[pid] > maxCC { if g.ccMap[pid]++; g.ccMap[pid] > maxCC {
@ -156,17 +155,17 @@ func (g *tsGenerator) getCC(pid int) int {
// generate handles the incoming data and generates equivalent mpegts packets - // generate handles the incoming data and generates equivalent mpegts packets -
// sending them to the output channel // sending them to the output channel
func (g *tsGenerator) generate() { func (g *tsGenerator) generate() {
var rtpBuffer [](*rtp.RtpPacket)
for { for {
select { select {
case nalUnit := <-g.nalInputChan: case nalUnit := <-g.nalInputChan:
g.pesPktChan <- pes.PESPacket{ pesPkt := pes.PESPacket{
StreamID: streamID, StreamID: streamID,
PDI: byte(2), PDI: byte(2),
PTS: g.genPts(), PTS: g.genPts(),
Data: nalUnit, Data: nalUnit,
HeaderLength: 5, HeaderLength: 5,
}.ToByteSlice() }
g.pesPktChan <- pesPkt.ToByteSlice()
case pesPkt := <-g.pesPktChan: case pesPkt := <-g.pesPktChan:
for ii := range pesPkt { for ii := range pesPkt {
g.payloadByteChan <- pesPkt[ii] g.payloadByteChan <- pesPkt[ii]
@ -185,21 +184,23 @@ func (g *tsGenerator) generate() {
if pusi { if pusi {
// Create pat table // Create pat table
g.outputChan <- mpegts.MpegTsPacket{ patPkt := mpegts.MpegTsPacket{
PUSI: pusi, PUSI: pusi,
PID: PatPid, PID: PatPid,
CC: byte(g.getCC(PatPid)), CC: byte(g.getCC(PatPid)),
AFC: 1, AFC: 1,
Payload: PatTable, Payload: patTable,
}.ToByteSlise() }
g.outputChan <- patPkt.ToByteSlice()
// Create pmt table // Create pmt table
g.outputChan <- mpegts.MpegTsPacket{ pmtPkt := mpegts.MpegTsPacket{
PUSI: pusi, PUSI: pusi,
PID: pmtPid, PID: pmtPid,
CC: byte(g.getCC(pmtPid)), CC: byte(g.getCC(pmtPid)),
AFC: 1, AFC: 1,
Payload: pmtTable, Payload: pmtTable,
}.ToByteSlise() }
g.outputChan <- pmtPkt.ToByteSlice()
// If pusi then we need to gen a pcr // If pusi then we need to gen a pcr
pkt.PCR = g.genPcr() pkt.PCR = g.genPcr()
pusi = false pusi = false

View File

@ -32,7 +32,7 @@ import (
//"bitbucket.org/ausocean/av/tools" //"bitbucket.org/ausocean/av/tools"
"../tools" "../tools"
"errors" _"errors"
//"fmt" //"fmt"
) )
@ -128,18 +128,18 @@ type MpegTsPacket struct {
TPDL byte // Tranposrt private data length TPDL byte // Tranposrt private data length
TPD []byte // Private data TPD []byte // Private data
Ext []byte // Adaptation field extension Ext []byte // Adaptation field extension
payload []byte // Mpeg ts payload Payload []byte // Mpeg ts Payload
} }
// FillPayload takes a channel and fills the packets payload field until the // FillPayload takes a channel and fills the packets Payload field until the
// channel is empty or we've the packet reaches capacity // channel is empty or we've the packet reaches capacity
func (p *MpegTsPacket) FillPayload(channel chan byte){ func (p *MpegTsPacket) FillPayload(channel chan byte){
p.Payload = make([]byte,0,mpegtsPayloadSize) p.Payload = make([]byte,0,mpegtsPayloadSize)
currentPktLength := 6 + int(btb(p.PCRF))*6+int(btb(p.OPCRF))*6+ currentPktLength := 6 + int(btb(p.PCRF))*6+int(btb(p.OPCRF))*6+
int(btb(p.SPF))*1+int(btb(p.TPDF))*1+len(p.TPD) int(btb(p.SPF))*1+int(btb(p.TPDF))*1+len(p.TPD)
// While we're within the mpegts packet size and we still have data we can use // While we're within the mpegts packet size and we still have data we can use
for (currentPktLength+len(p.Payload)) < mpegTsSize && len(cannel) > 0 { for (currentPktLength+len(p.Payload)) < mpegTsSize && len(channel) > 0 {
p.payload = append(p.payload,<-channel) p.Payload = append(p.Payload,<-channel)
} }
} }
@ -150,7 +150,7 @@ func btb(b bool) byte {
} }
// ToByteSlice interprets the fields of the ts packet instance and outputs a // ToByteSlice interprets the fields of the ts packet instance and outputs a
// corresponding byte slice // corresponding byte slice
func (p *MpegTsPacket) ToByteSlice() (output []byte) { func (p *MpegTsPacket) ToByteSlice() (output []byte) {
stuffingLength := 182-len(p.Payload)-len(p.TPD)-int(btb(p.PCRF))*6- stuffingLength := 182-len(p.Payload)-len(p.TPD)-int(btb(p.PCRF))*6-
int(btb(p.OPCRF))*6 - int(btb(p.SPF)) int(btb(p.OPCRF))*6 - int(btb(p.SPF))

View File

@ -1,153 +0,0 @@
/*
NOTE: this file is in progress...
NAME
H264Writer.go
DESCRIPTION
See Readme.md
AUTHOR
Saxon Nelson-Milton <saxon@ausocean.org>
LICENSE
H264Writer.go is Copyright (C) 2017 the Australian Ocean Lab (AusOcean)
It is free software: you can redistribute it and/or modify them
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
It is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with revid in gpl.txt. If not, see [GNU licenses](http://www.gnu.org/licenses).
*/
// TODO: complete this file
package parser
import (
_"fmt"
"os"
//"bitbucket.org/ausocean/av/mpegts"
//"bitbucket.org/ausocean/av/rtp"
//"bitbucket.org/ausocean/av/tools"
//"bitbucket.org/ausocean/av/itut"
"../mpegts"
"../rtp"
"../tools"
"../itut"
)
type RtpToH264Converter interface {
Convert()
}
type rtpToH264Converter struct {
TsChan <-chan *mpegts.MpegTsPacket
tsChan chan<- *mpegts.MpegTsPacket
InputChan chan<- rtp.RtpPacket
inputChan <-chan rtp.RtpPacket
currentTsPacket *mpegts.MpegTsPacket
payloadByteChan chan byte
currentCC byte
}
//func parseH264File()
func NewRtpToH264Converter() (c *rtpToH264Converter) {
c = new(rtpToH264Converter)
tsChan := make(chan *mpegts.MpegTsPacket,100)
c.TsChan = tsChan
c.tsChan = tsChan
inputChan := make(chan rtp.RtpPacket,100)
c.InputChan = inputChan
c.inputChan = inputChan
c.currentCC = 0
return
}
func (c* rtpToH264Converter) Convert() {
file,_ := os.Create("video")
var rtpBuffer [](*rtp.RtpPacket)
for {
select {
default:
case rtpPacket := <-c.inputChan:
rtpBuffer = append(rtpBuffer,&rtpPacket)
if len(rtpBuffer) > 2 {
// if there's something weird going on with sequence numbers then sort
if rtpPacket.SequenceNumber < rtpBuffer[len(rtpBuffer)-2].SequenceNumber {
for i := 1; i < len(rtpBuffer); i++ {
for j := i; j > 0 && rtpBuffer[j].SequenceNumber < rtpBuffer[j - 1].SequenceNumber; j-- {
temp := rtpBuffer[j]
rtpBuffer[j] = rtpBuffer[j-1]
rtpBuffer[j-1] = temp
}
}
}
}
if len(rtpBuffer) > 200 {
// Discard everything before a type 7
for tools.GetOctectType(rtpBuffer[0]) != 7 {
rtpBuffer = rtpBuffer[1:]
}
// get sps
sps := make([]byte,len(rtpBuffer[0].Payload))
copy(sps[:],rtpBuffer[0].Payload[:])
rtpBuffer = rtpBuffer[1:]
// get pps
pps := make([]byte,len(rtpBuffer[0].Payload))
copy(pps[:],rtpBuffer[0].Payload[:])
rtpBuffer = rtpBuffer[1:]
// get sei
sei := make([]byte, len(rtpBuffer[0].Payload))
copy(sei[:],rtpBuffer[0].Payload[:])
rtpBuffer = rtpBuffer[1:]
// while we haven't reached the next sps in the buffer
for tools.GetOctectType(rtpBuffer[0]) != 7 {
switch(tools.GetOctectType(rtpBuffer[0])){
case 28:
if tools.GetStartBit(rtpBuffer[0]) == 1{
var buffer []byte
buffer = append(buffer, append(itut.StartCode1(),itut.AUD()...)...)
buffer = append(buffer, append(itut.StartCode1(),sps...)...)
buffer = append(buffer, append(itut.StartCode1(),pps...)...)
buffer = append(buffer, append(itut.StartCode1(),sei...)...)
buffer = append(buffer, itut.StartCode1()...)
buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F )
buffer = append(buffer, rtpBuffer[0].Payload[2:]...)
rtpBuffer = rtpBuffer[1:]
for {
buffer = append(buffer, rtpBuffer[0].Payload[2:]...)
if tools.GetEndBit(rtpBuffer[0]) == 1 {
rtpBuffer = rtpBuffer[1:]
file.Write(buffer)
break
}
rtpBuffer = rtpBuffer[1:]
}
}
case 1:
var buffer []byte
buffer = append(buffer, append(itut.StartCode1(), itut.AUD()...)...)
buffer = append(buffer, append(itut.StartCode1(), sps...)...)
buffer = append(buffer, append(itut.StartCode1(),pps...)...)
buffer = append(buffer, append(itut.StartCode1(),sei...)...)
buffer = append(buffer, itut.StartCode1()...)
buffer = append(buffer, rtpBuffer[0].Payload[0] & 0xE0 | rtpBuffer[0].Payload[1] & 0x1F )
buffer = append(buffer, rtpBuffer[0].Payload[2:]...)
rtpBuffer = rtpBuffer[1:]
file.Write(buffer)
default:
}
}
}
}
}
}

View File

@ -109,7 +109,6 @@ type revidInst struct {
setupInput func() error setupInput func() error
setupOutput func() error setupOutput func() error
getFrame func() []byte getFrame func() []byte
flushData func()
sendClip func(clip []byte) error sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession rtmpInst rtmp.RTMPSession
} }
@ -124,7 +123,7 @@ func NewRevidInstance(config Config) (r *revidInst, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
r.outputChan = make(chan []byte, outputchanSize) r.outputChan = make(chan []byte, outputChanSize)
r.parser.Start() r.parser.Start()
go r.packClips() go r.packClips()
r.Log(Info, "New revid instance created! config is:") r.Log(Info, "New revid instance created! config is:")

View File

@ -142,7 +142,7 @@ func TestFlvOutputFile(t *testing.T) {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
// Test h264 inputfile to flv format into rtmp using librtmp c wrapper // Test h264 inputfile to flv format into rtmp using librtmp c wrapper
func TestRtmpOutputUsingLibRtmp(t *testing.T){ func TestRtmpOutputUsingLibRtmp(t *testing.T){
@ -166,8 +166,9 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){
time.Sleep(120*time.Second) time.Sleep(120*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/
/*
// Test revidInst with a Raspivid h264 input // Test revidInst with a Raspivid h264 input
func TestRaspividToRtmp(t *testing.T){ func TestRaspividToRtmp(t *testing.T){
config := Config{ config := Config{
@ -188,3 +189,4 @@ func TestRaspividToRtmp(t *testing.T){
time.Sleep(120*time.Second) time.Sleep(120*time.Second)
revidInst.Stop() revidInst.Stop()
} }
*/

View File

@ -1,160 +0,0 @@
/*
Copyright (c) 2015, T. Jameson Little <t.jameson.little@gmail.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package rtp
import (
"net"
_"fmt"
)
const (
RTP_VERSION = 2
)
const (
hasRtpPadding = 1 << 5
hasRtpExt = 1 << 4
hasMarker = 1 << 7
)
type RtpPacket struct {
Version byte
Padding bool
Ext bool
CC byte
Marker bool
PayloadType byte
SequenceNumber uint
Timestamp uint
SyncSource uint
CSRC []uint
ExtHeader uint
ExtData []byte
Payload []byte
}
type Session struct {
Rtp net.PacketConn
Rtcp net.PacketConn
RtpChan <-chan RtpPacket
RtcpChan <-chan []byte
rtpChan chan<- RtpPacket
rtcpChan chan<- []byte
}
func NewSession(rtp, rtcp net.PacketConn) *Session {
rtpChan := make(chan RtpPacket, 10)
rtcpChan := make(chan []byte, 10)
s := &Session{
Rtp: rtp,
Rtcp: rtcp,
RtpChan: rtpChan,
RtcpChan: rtcpChan,
rtpChan: rtpChan,
rtcpChan: rtcpChan,
}
go s.HandleRtpConn(rtp)
go s.HandleRtcpConn(rtcp)
return s
}
func toUint(arr []byte) (ret uint) {
for i, b := range arr {
ret |= uint(b) << (8 * uint(len(arr)-i-1))
}
return ret
}
func (s *Session) HandleRtpConn(conn net.PacketConn) {
buf := make([]byte, 4096)
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
cpy := make([]byte, n)
copy(cpy, buf)
go s.handleRtp(cpy)
}
}
func (s *Session) HandleRtcpConn(conn net.PacketConn) {
buf := make([]byte, 4096)
for {
n, _, err := conn.ReadFrom(buf)
if err != nil {
panic(err)
}
cpy := make([]byte, n)
copy(cpy, buf)
go s.handleRtcp(cpy)
}
}
func (s *Session) handleRtp(buf []byte) {
packet := RtpPacket{
Version: (buf[0] & 0xC0) >> 6,
Padding: buf[0]&hasRtpPadding != 0,
Ext: buf[0]&hasRtpExt != 0,
CC: buf[0] & 0x0F,
Marker: buf[1]&hasMarker != 0,
PayloadType: buf[1] & 0x7F,
SequenceNumber: toUint(buf[2:4]),
Timestamp: toUint(buf[4:8]),
SyncSource: toUint(buf[8:12]),
CSRC: make([]uint, buf[0]&0x0F),
}
if packet.Version != RTP_VERSION {
panic("Unsupported version")
}
i := 12
for j := range packet.CSRC {
packet.CSRC[j] = toUint(buf[i : i+4])
i += 4
}
if packet.Ext {
packet.ExtHeader = toUint(buf[i : i+2])
length := toUint(buf[i+2 : i+4])
i += 4
if length > 0 {
packet.ExtData = buf[i : i+int(length)*4]
i += int(length) * 4
}
}
packet.Payload = buf[i:]
s.rtpChan <- packet
}
func (s *Session) handleRtcp(buf []byte) {
// TODO: implement rtcp
}

View File

@ -1,169 +0,0 @@
package camstreamer
import (
"errors"
)
type CamStreamer struct {
RtspUrl string
RtpUrl string
RtpPort uint16
RtcpPort uint16
}
func (cs *CamStreamer)Connect()(session *RtpSession, err error){
var res string
sess := rtsp.NewSession()
if res, err = sess.Options(sc.RtspUrl); err != nil {
return
}
res, err = sess.Describe(rtspUrl)
if err != nil {
log.Fatalln(err)
}
p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength})
if err != nil {
}
log.Printf("%+v", p)
res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
}
log.Println(res)
res, err = sess.Play(rtspUrl, res.Header.Get("Session"))
if err != nil {
}
log.Println(res)
// create udp connection for rtp stuff
rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300")
if err != nil {
t.Errorf("Local rtp addr not set! %v\n", err)
}
rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300")
if err != nil {
t.Errorf("Resolving rtp address didn't work! %v\n", err)
}
rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr)
if err != nil {
t.Errorf("Conncection not established! %v\n", err)
}
// Create udp connection for rtcp stuff
rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319")
if err != nil {
t.Errorf("Local RTCP address not resolved! %v\n", err)
}
rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301")
if err != nil {
t.Errorf("Remote RTCP address not resolved! %v\n", err)
}
rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr)
if err != nil {
t.Errorf("Connection not established! %v\n", err)
}
// let's create a session that will store useful stuff from the connections
rtpSession := NewSession(rtpConn, rtcpConn)
}
/*******************************************************
Testing stuff related to connection i.e. rtsp, rtp, rtcp
********************************************************/
const (
rtpPort = 17300
rtcpPort = 17319
rtspUrl = "rtsp://192.168.0.50:8554/CH002.sdp"
rtpUrl = "rtsp://192.168.0.50:8554/CH002.sdp/track1"
)
/* Let's see if we can connect to an rtsp device then read an rtp stream,
and then convert the rtp packets to mpegts packets and output. */
func TestRTSP(t *testing.T) {
sess := rtsp.NewSession()
res, err := sess.Options(rtspUrl)
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
res, err = sess.Describe(rtspUrl)
if err != nil {
log.Fatalln(err)
t.Errorf("Shouldn't have got error: %v\n", err)
}
p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength})
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Printf("%+v", p)
res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Println(res)
res, err = sess.Play(rtspUrl, res.Header.Get("Session"))
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Println(res)
}
func TestRTP(t *testing.T) {
sess := rtsp.NewSession()
res, err := sess.Options(rtspUrl)
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
res, err = sess.Describe(rtspUrl)
if err != nil {
log.Fatalln(err)
t.Errorf("Shouldn't have got error: %v\n", err)
}
p, err := rtsp.ParseSdp(&io.LimitedReader{R: res.Body, N: res.ContentLength})
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Printf("%+v", p)
res, err = sess.Setup(rtpUrl, fmt.Sprintf("RTP/AVP;unicast;client_port=%d-%d", rtpPort, rtcpPort))
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Println(res)
res, err = sess.Play(rtspUrl, res.Header.Get("Session"))
if err != nil {
t.Errorf("Shouldn't have got error: %v\n", err)
}
log.Println(res)
// create udp connection for rtp stuff
rtpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17300")
if err != nil {
t.Errorf("Local rtp addr not set! %v\n", err)
}
rtpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17300")
if err != nil {
t.Errorf("Resolving rtp address didn't work! %v\n", err)
}
rtpConn, err := net.DialUDP("udp", rtpLaddr, rtpAddr)
if err != nil {
t.Errorf("Conncection not established! %v\n", err)
}
// Create udp connection for rtcp stuff
rtcpLaddr, err := net.ResolveUDPAddr("udp", "192.168.0.109:17319")
if err != nil {
t.Errorf("Local RTCP address not resolved! %v\n", err)
}
rtcpAddr, err := net.ResolveUDPAddr("udp", "192.168.0.50:17301")
if err != nil {
t.Errorf("Remote RTCP address not resolved! %v\n", err)
}
rtcpConn, err := net.DialUDP("udp", rtcpLaddr, rtcpAddr)
if err != nil {
t.Errorf("Connection not established! %v\n", err)
}
// let's create a session that will store useful stuff from the connections
rtpSession := NewSession(rtpConn, rtcpConn)
time.Sleep(2 * time.Second)
select {
default:
t.Errorf("Should have got rtpPacket!")
case rtpPacket := <-rtpSession.RtpChan:
fmt.Printf("RTP packet: %v\n", rtpPacket)
}
}

View File

@ -32,7 +32,6 @@ import (
_"os" _"os"
_"fmt" _"fmt"
//"bitbucket.org/ausocean/av/rtp" //"bitbucket.org/ausocean/av/rtp"
"../rtp"
) )
func BoolToByte(in bool) (out byte) { func BoolToByte(in bool) (out byte) {
@ -41,15 +40,3 @@ func BoolToByte(in bool) (out byte) {
} }
return return
} }
func GetOctectType(p *rtp.RtpPacket) byte {
return p.Payload[0] & 0x1F
}
func GetStartBit(p *rtp.RtpPacket) byte {
return (p.Payload[1] & 0x80) >> 7
}
func GetEndBit(p *rtp.RtpPacket) byte {
return (p.Payload[1] & 0x40) >> 6
}