This commit is contained in:
Unknown 2018-03-14 11:48:03 +10:30
parent 431b2c4a5f
commit 86bc3602d7
18 changed files with 278 additions and 286 deletions

View File

@ -68,7 +68,7 @@ func btb(b bool) byte {
func (h *Header) ToByteSlice() (output []byte) {
output = make([]byte, 0, headerLength)
output = append(output, flvHeaderCode...)
output = append(output, []byte {
output = append(output, []byte{
version,
0x00 | btb(h.AudioFlag)<<2 | btb(h.VideoFlag),
0x00, 0x00, 0x00, byte(9),

View File

@ -29,7 +29,7 @@ package generator
import (
"bitbucket.org/ausocean/av/flv"
//"../flv"
_"fmt"
_ "fmt"
"time"
)
@ -59,8 +59,8 @@ type flvGenerator struct {
lastTagSize int
currentTimestamp uint32
header flv.Header
startTime time.Time
firstTag bool
startTime time.Time
firstTag bool
}
// GetInputChan returns the input channel to the generator. This is where the
@ -112,11 +112,11 @@ func (g *flvGenerator) getNextTimestamp() (timestamp uint32) {
g.startTime = time.Now()
g.firstTag = false
timestamp = 0
return
return
}
//timestamp = g.currentTimestamp
//g.currentTimestamp += 40
timestamp = uint32(time.Now().Sub(g.startTime).Seconds()*float64(1000))
timestamp = uint32(time.Now().Sub(g.startTime).Seconds() * float64(1000))
//fmt.Printf("timestamp: %v", timestamp)
return
}
@ -132,11 +132,11 @@ func isKeyFrame(frame []byte) bool {
for i := range frame {
byteChannel <- frame[i]
}
for len(byteChannel) >= 5{
aByte := <-byteChannel
for i:=1; aByte == 0x00 && i != 4; i++ {
for len(byteChannel) >= 5 {
aByte := <-byteChannel
for i := 1; aByte == 0x00 && i != 4; i++ {
aByte = <-byteChannel
if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) {
if (aByte == 0x01 && i == 2) || (aByte == 0x01 && i == 3) {
aByte = <-byteChannel
nalType := aByte & 0x1F
switch nalType {
@ -158,11 +158,11 @@ func isSequenceHeader(frame []byte) bool {
for i := range frame {
byteChannel <- frame[i]
}
for len(byteChannel) >= 5{
aByte := <-byteChannel
for i:=1; aByte == 0x00 && i != 4; i++ {
for len(byteChannel) >= 5 {
aByte := <-byteChannel
for i := 1; aByte == 0x00 && i != 4; i++ {
aByte = <-byteChannel
if ( aByte == 0x01 && i == 2 ) || ( aByte == 0x01 && i == 3 ) {
if (aByte == 0x01 && i == 2) || (aByte == 0x01 && i == 3) {
aByte = <-byteChannel
nalType := aByte & 0x1F
switch nalType {
@ -183,7 +183,6 @@ func isSequenceHeader(frame []byte) bool {
return false
}
// generate takes in raw video data from the input chan and packetises it into
// flv tags, which are then passed to the output channel.
func (g *flvGenerator) generate() {
@ -206,7 +205,7 @@ func (g *flvGenerator) generate() {
timeStamp := g.getNextTimestamp()
// Do we have video to send off ?
if g.videoFlag {
tag := flv.VideoTag{
tag := flv.VideoTag{
TagType: uint8(flv.VideoTagType),
DataSize: uint32(len(videoFrame)) + flv.DataHeaderLength,
Timestamp: timeStamp,

View File

@ -30,18 +30,16 @@ import (
"bitbucket.org/ausocean/av/mpegts"
"bitbucket.org/ausocean/av/pes"
/*
"../mpegts"
"../pes"
*/
)
"../mpegts"
"../pes"
*/)
// TODO: really need to finish the at and pmt stuff - this is too hacky
var (
patTableStart = []byte{0, 0, 176, 13, 0, 1, 193, 0, 0, 0, 1, 240, 0, 42, 177, 4, 178}
patTable []byte
pmtTableStart = []byte{0, 2, 176, 18, 0, 1, 193, 0, 0, 0xE1, 0x00, 0xF0, 0, 0x1B, 0xE1, 0, 0xF0, 0, 0x15, 0xBD, 0x4D, 0x56}
pmtTable []byte
pmtTable []byte
)
// genPatAndPmt generates the rest of the pat and pmt tables i.e. fills them
@ -67,7 +65,7 @@ const (
videoPid = 256
streamID = 0xE0
outputChanSize = 100
inputChanSize = 10000
inputChanSize = 10000
pesPktChanSize = 1000
payloadByteChanSize = 100000
ptsOffset = .7

View File

@ -30,15 +30,15 @@ package mpegts
import (
"bitbucket.org/ausocean/av/tools"
//"../tools"
_"errors"
_ "errors"
//"fmt"
)
const (
mpegTsSize = 188
mpegTsSize = 188
mpegtsPayloadSize = 176
)
@ -134,13 +134,13 @@ type MpegTsPacket struct {
// FillPayload takes a channel and fills the packets Payload field until the
// channel is empty or we've the packet reaches capacity
func (p *MpegTsPacket) FillPayload(channel chan byte){
p.Payload = make([]byte,0,mpegtsPayloadSize)
currentPktLength := 6 + int(btb(p.PCRF))*6+int(btb(p.OPCRF))*6+
int(btb(p.SPF))*1+int(btb(p.TPDF))*1+len(p.TPD)
func (p *MpegTsPacket) FillPayload(channel chan byte) {
p.Payload = make([]byte, 0, mpegtsPayloadSize)
currentPktLength := 6 + int(btb(p.PCRF))*6 + int(btb(p.OPCRF))*6 +
int(btb(p.SPF))*1 + int(btb(p.TPDF))*1 + len(p.TPD)
// While we're within the mpegts packet size and we still have data we can use
for (currentPktLength+len(p.Payload)) < mpegTsSize && len(channel) > 0 {
p.Payload = append(p.Payload,<-channel)
p.Payload = append(p.Payload, <-channel)
}
}
@ -153,17 +153,17 @@ func btb(b bool) byte {
// ToByteSlice interprets the fields of the ts packet instance and outputs a
// corresponding byte slice
func (p *MpegTsPacket) ToByteSlice() (output []byte) {
stuffingLength := 182-len(p.Payload)-len(p.TPD)-int(btb(p.PCRF))*6-
stuffingLength := 182 - len(p.Payload) - len(p.TPD) - int(btb(p.PCRF))*6 -
int(btb(p.OPCRF))*6 - int(btb(p.SPF))
var stuffing []byte
if stuffingLength > 0 {
stuffing = make([]byte,stuffingLength)
stuffing = make([]byte, stuffingLength)
}
for i := range stuffing {
stuffing[i] = 0xFF
}
afl := 1+int(btb(p.PCRF))*6+int(btb(p.OPCRF))*6+int(btb(p.SPF))+int(btb(p.TPDF))+len(p.TPD)+len(stuffing)
output = make([]byte,0,mpegTsSize)
afl := 1 + int(btb(p.PCRF))*6 + int(btb(p.OPCRF))*6 + int(btb(p.SPF)) + int(btb(p.TPDF)) + len(p.TPD) + len(stuffing)
output = make([]byte, 0, mpegTsSize)
output = append(output, []byte{
0x47,
(btb(p.TEI)<<7 | btb(p.PUSI)<<6 | btb(p.Priority)<<5 | byte((p.PID&0xFF00)>>8)),
@ -174,13 +174,13 @@ func (p *MpegTsPacket) ToByteSlice() (output []byte) {
if p.AFC == 3 || p.AFC == 2 {
output = append(output, []byte{
byte(afl), (btb(p.DI)<<7 | btb(p.RAI)<<6 | btb(p.ESPI)<<5 |
btb(p.PCRF)<<4 | btb(p.OPCRF)<<3 | btb(p.SPF)<<2 |
btb(p.TPDF)<<1 | btb(p.AFEF)),
btb(p.PCRF)<<4 | btb(p.OPCRF)<<3 | btb(p.SPF)<<2 |
btb(p.TPDF)<<1 | btb(p.AFEF)),
}...)
for i := 40; p.PCRF && i >= 0; i-=8 {
for i := 40; p.PCRF && i >= 0; i -= 8 {
output = append(output, byte((p.PCR<<15)>>uint(i)))
}
for i := 40; p.OPCRF && i >= 0; i-=8 {
for i := 40; p.OPCRF && i >= 0; i -= 8 {
output = append(output, byte(p.OPCR>>uint(i)))
}
if p.SPF {

View File

@ -29,41 +29,41 @@ LICENSE
package mpegts
import (
"testing"
_"fmt"
_ "fmt"
"testing"
)
// Just ensure that we can create a byte slice with a mpegts packet correctly
func TestMpegTsToByteSlice(t *testing.T){
payload := []byte{0x56,0xA2,0x78,0x89,0x67}
// Just ensure that we can create a byte slice with a mpegts packet correctly
func TestMpegTsToByteSlice(t *testing.T) {
payload := []byte{0x56, 0xA2, 0x78, 0x89, 0x67}
pcr := 100000 // => 100000
stuffing := make([]byte,171)
for i := range stuffing {
stuffing[i] = 0xFF
}
tsPkt := MpegTsPacket{
PUSI: true,
PID: uint16(256),
AFC: byte(3),
AFL: 7+171,
CC: byte(6),
PCRF: true,
PCR: uint64(pcr),
Stuff: stuffing,
Payload: payload,
stuffing := make([]byte, 171)
for i := range stuffing {
stuffing[i] = 0xFF
}
expectedOutput := []byte{ 0x47, 0x41, 0x00, 0x36, byte(178),0x10}
for i := 40; i >= 0; i-= 8 {
expectedOutput = append(expectedOutput,byte(pcr>>uint(i)))
tsPkt := MpegTsPacket{
PUSI: true,
PID: uint16(256),
AFC: byte(3),
AFL: 7 + 171,
CC: byte(6),
PCRF: true,
PCR: uint64(pcr),
Stuff: stuffing,
Payload: payload,
}
expectedOutput := []byte{0x47, 0x41, 0x00, 0x36, byte(178), 0x10}
for i := 40; i >= 0; i -= 8 {
expectedOutput = append(expectedOutput, byte(pcr>>uint(i)))
}
for i := 0; i < 171; i++ {
expectedOutput = append(expectedOutput, 0xFF)
}
expectedOutput = append(expectedOutput,payload...)
expectedOutput = append(expectedOutput, payload...)
tsPktAsByteSlice, err := tsPkt.ToByteSlice()
if err != nil {
t.Errorf("Should not have got error!")
}
if err != nil {
t.Errorf("Should not have got error!")
}
for i := 0; i < 188; i++ {
if tsPktAsByteSlice[i] != expectedOutput[i] {
t.Errorf("Conversion to byte slice bad! Byte: %v Wanted: %v Got: %v", i, expectedOutput[i], tsPktAsByteSlice[i])

View File

@ -33,7 +33,7 @@ import (
)
const (
frameStartCode = 0xD8
frameStartCode = 0xD8
)
type mjpegParser struct {

View File

@ -28,10 +28,10 @@ LICENSE
package parser
import (
//"bitbucket.org/ausocean/av/itut"
"log"
"sync"
_"fmt"
//"bitbucket.org/ausocean/av/itut"
_ "fmt"
"log"
"sync"
)
// h264 consts
@ -40,15 +40,15 @@ const (
)
var (
Info *log.Logger
mutex *sync.Mutex
Info *log.Logger
mutex *sync.Mutex
)
type Parser interface {
Stop()
Start()
GetInputChan() chan byte
GetOutputChan() chan []byte
SetOutputChan(achan chan []byte)
SetDelay(delay uint)
Stop()
Start()
GetInputChan() chan byte
GetOutputChan() chan []byte
SetOutputChan(achan chan []byte)
SetDelay(delay uint)
}

View File

@ -1,53 +1,53 @@
package parser
import (
"testing"
"strconv"
"os"
"fmt"
"fmt"
"os"
"strconv"
"testing"
)
const (
testInputFileName = "testInput/testInput.avi"
testInputFileName = "testInput/testInput.avi"
)
func TestMJPEGParser(t *testing.T){
fmt.Println("Opening input file!")
// Open the input file
inputFile, err := os.Open(testInputFileName)
if err != nil {
t.Errorf("Should not have got error opening file!")
}
fmt.Println("Getting file stats!")
stats, err := inputFile.Stat()
if err != nil {
t.Errorf("Could not get input file stats!")
return
}
fmt.Println("Creating space for file data!")
data := make([]byte, stats.Size())
_, err = inputFile.Read(data)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
fmt.Println("Creating parser!")
parser := NewMJPEGParser(len(data)+1)
parser.SetOutputChan(make(chan []byte, 10000))
parser.Start()
fmt.Printf("len(data): %v\n", len(data))
for i := range data {
parser.GetInputChan() <- data[i]
}
fmt.Println("Writing jpegs to files!")
for i:=0; len(parser.GetOutputChan()) > 0; i++ {
// Open a new output file
outputFile, err := os.Create("testOutput/image"+strconv.Itoa(i)+".jpeg")
if err != nil {
t.Errorf("Should not have got error creating output file!")
return
}
outputFile.Write(<-parser.GetOutputChan())
outputFile.Close()
}
func TestMJPEGParser(t *testing.T) {
fmt.Println("Opening input file!")
// Open the input file
inputFile, err := os.Open(testInputFileName)
if err != nil {
t.Errorf("Should not have got error opening file!")
}
fmt.Println("Getting file stats!")
stats, err := inputFile.Stat()
if err != nil {
t.Errorf("Could not get input file stats!")
return
}
fmt.Println("Creating space for file data!")
data := make([]byte, stats.Size())
_, err = inputFile.Read(data)
if err != nil {
t.Errorf("Should not have got read error!")
return
}
fmt.Println("Creating parser!")
parser := NewMJPEGParser(len(data) + 1)
parser.SetOutputChan(make(chan []byte, 10000))
parser.Start()
fmt.Printf("len(data): %v\n", len(data))
for i := range data {
parser.GetInputChan() <- data[i]
}
fmt.Println("Writing jpegs to files!")
for i := 0; len(parser.GetOutputChan()) > 0; i++ {
// Open a new output file
outputFile, err := os.Create("testOutput/image" + strconv.Itoa(i) + ".jpeg")
if err != nil {
t.Errorf("Should not have got error creating output file!")
return
}
outputFile.Write(<-parser.GetOutputChan())
outputFile.Close()
}
}

View File

@ -28,7 +28,6 @@ package pes
import (
"bitbucket.org/ausocean/av/tools"
//"../tools"
)

View File

@ -29,7 +29,7 @@ LICENSE
package pes
import (
"testing"
"testing"
)
const (
@ -38,33 +38,33 @@ const (
func TestPesToByteSlice(t *testing.T) {
pesPkt := PESPacket{
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF,0xFF,},
Data: []byte{ 0xEA, 0x4B, 0x12, },
StreamID: 0xE0, // StreamID
PDI: byte(2),
PTS: 100000,
HeaderLength: byte(10),
Stuff: []byte{0xFF, 0xFF},
Data: []byte{0xEA, 0x4B, 0x12},
}
pesExpectedOutput := []byte{
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
0x00, // packet start code prefix byte 1
0x00, // packet start code prefix byte 2
0x01, // packet start code prefix byte 3
0xE0, // stream ID
0x00, // PES Packet length byte 1
0x00, // PES packet length byte 2
0x80, // Marker bits,ScramblingControl, Priority, DAI, Copyright, Original
0x80, // PDI, ESCR, ESRate, DSMTrickMode, ACI, CRC, Ext
byte(10), // header length
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
0x21, // PCR byte 1
0x00, // pcr byte 2
0x07, // pcr byte 3
0x0D, // pcr byte 4
0x41, // pcr byte 5
0xFF, // Stuffing byte 1
0xFF, // stuffing byte 3
0xEA, // data byte 1
0x4B, // data byte 2
0x12, // data byte 3
}
pesPktAsByteSlice := pesPkt.ToByteSlice()
for ii := range pesPktAsByteSlice {

View File

@ -32,7 +32,6 @@ import (
"strconv"
"bitbucket.org/ausocean/utils/smartLogger"
//"../../utils/smartLogger"
)
@ -43,8 +42,9 @@ type Config struct {
InputCodec uint8
Output uint8
RtmpEncodingMethod uint8
RtmpMethod uint8
RtmpMethod uint8
Packetization uint8
QuantizationMode uint8
FramesPerClip int
RtmpUrl string
Bitrate string
@ -62,22 +62,22 @@ type Config struct {
// Enums for config struct
const (
NothingDefined = 0
Raspivid = 1
Rtp = 2
H264Codec = 3
File = 4
Http = 5
H264 = 6
Mjpeg = 7
None = 8
Mpegts = 9
Rtmp = 10
Ffmpeg = 11
Revid = 12
Flv = 13
LibRtmp = 14
QuantizationOn = 15
NothingDefined = 0
Raspivid = 1
Rtp = 2
H264Codec = 3
File = 4
Http = 5
H264 = 6
Mjpeg = 7
None = 8
Mpegts = 9
Rtmp = 10
Ffmpeg = 11
Revid = 12
Flv = 13
LibRtmp = 14
QuantizationOn = 15
QuantizationOff = 16
)

View File

@ -42,17 +42,16 @@ import (
"strconv"
"time"
"bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/generator"
"bitbucket.org/ausocean/av/parser"
"bitbucket.org/ausocean/av/ringbuffer"
"bitbucket.org/ausocean/av/rtmp"
/*
"../generator"
"../parser"
"../ringbuffer"
"../rtmp"
*/
)
/*
"../generator"
"../parser"
"../ringbuffer"
"../rtmp"
*/)
// Misc constants
const (
@ -68,8 +67,8 @@ const (
bitrateTime = 60
mjpegParserInChanLen = 100000
ffmpegPath = "/home/saxon/bin/ffmpeg"
rtmpConnectionTimout = 10
outputChanSize = 10000
rtmpConnectionTimout = 10
outputChanSize = 10000
)
// Log Types
@ -110,8 +109,8 @@ type revidInst struct {
setupInput func() error
setupOutput func() error
getFrame func() []byte
sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession
sendClip func(clip []byte) error
rtmpInst rtmp.RTMPSession
}
// NewRevidInstance returns a pointer to a new revidInst with the desired
@ -200,7 +199,7 @@ func (r *revidInst) ChangeState(config Config) error {
r.getFrame = r.getFramePacketization
r.parser.SetOutputChan(r.generator.GetInputChan())
r.generator.Start()
noPacketizationSetup:
noPacketizationSetup:
return nil
}
@ -359,7 +358,7 @@ func (r *revidInst) outputClips() {
// senClipToFile writes the passed clip to a file
func (r *revidInst) sendClipToFile(clip []byte) error {
_,err := r.outputFile.Write(clip)
_, err := r.outputFile.Write(clip)
if err != nil {
return err
}
@ -370,7 +369,7 @@ func (r *revidInst) sendClipToFile(clip []byte) error {
func (r *revidInst) sendClipToHTTP(clip []byte) error {
timeout := time.Duration(httpTimeOut * time.Second)
client := http.Client{Timeout: timeout}
url := r.config.HttpAddress+strconv.Itoa(len(clip))
url := r.config.HttpAddress + strconv.Itoa(len(clip))
r.Log(Debug, fmt.Sprintf("Posting %s (%d bytes)\n", url, len(clip)))
resp, err := client.Post(url, "video/mp2t", bytes.NewReader(clip)) // lighter than NewBuffer
if err != nil {
@ -396,7 +395,7 @@ func (r *revidInst) sendClipToFfmpegRtmp(clip []byte) (err error) {
// sendClipToLibRtmp send the clip over the current rtmp connection using the
// c based librtmp library
func (r *revidInst) sendClipToLibRtmp(clip []byte) (err error) {
err = r.rtmpInst.WriteFrame(clip,uint(len(clip)))
err = r.rtmpInst.WriteFrame(clip, uint(len(clip)))
return
}
@ -506,8 +505,8 @@ func (r *revidInst) setupInputForRaspivid() error {
// setupInputForFile sets things up for getting input from a file
func (r *revidInst) setupInputForFile() error {
fps,_ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay( uint( float64(1000) / float64(fps) ) )
fps, _ := strconv.Atoi(r.config.FrameRate)
r.parser.SetDelay(uint(float64(1000) / float64(fps)))
r.readFile()
return nil
}
@ -515,15 +514,14 @@ func (r *revidInst) setupInputForFile() error {
// testRtmp is useful to check robustness of connections. Intended to be run as
// goroutine. After every 'delayTime' the rtmp connection is ended and then
// restarted
func (r *revidInst)testRtmp(delayTime uint){
func (r *revidInst) testRtmp(delayTime uint) {
for {
time.Sleep(time.Duration(delayTime)*time.Millisecond)
time.Sleep(time.Duration(delayTime) * time.Millisecond)
r.rtmpInst.EndSession()
r.rtmpInst.StartSession()
}
}
// readCamera reads data from the defined camera while the revidInst is running.
// TODO: use ringbuffer here instead of allocating mem every time!
func (r *revidInst) readCamera() {

View File

@ -169,27 +169,25 @@ func TestRtmpOutputUsingLibRtmp(t *testing.T){
}
*/
// Test revidInst with a Raspivid h264 input
func TestRaspividToRtmp(t *testing.T){
config := Config{
Input: Raspivid,
Output: Rtmp,
RtmpMethod: LibRtmp,
RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1",
FramesPerClip: 1,
Packetization: Flv,
FrameRate: "25",
}
revidInst, err := NewRevidInstance(config)
if err != nil {
t.Errorf("Should not have got an error!")
return
}
revidInst.Start()
time.Sleep(43200*time.Second)
revidInst.Stop()
func TestRaspividToRtmp(t *testing.T) {
config := Config{
Input: Raspivid,
Output: Rtmp,
RtmpMethod: LibRtmp,
QuantizationMode: QuantizationOff,
RtmpUrl: "rtmp://a.rtmp.youtube.com/live2/w44c-mkuu-aezg-ceb1",
Bitrate: "500000",
FramesPerClip: 1,
Packetization: Flv,
FrameRate: "25",
}
revidInst, err := NewRevidInstance(config)
if err != nil {
t.Errorf("Should not have got an error!")
return
}
revidInst.Start()
time.Sleep(43200 * time.Second)
revidInst.Stop()
}

View File

@ -48,7 +48,7 @@ type RingBuffer interface {
GetNoOfElements() int
}
func (rb *ringBuffer)GetNoOfElements() int {
func (rb *ringBuffer) GetNoOfElements() int {
return rb.noOfElements
}

View File

@ -34,80 +34,81 @@ package rtmp
import "C"
import (
"errors"
"unsafe"
_"fmt"
"sync"
"errors"
_ "fmt"
"sync"
"unsafe"
)
// RTMPSession provides a crude interface for sending flv tags over rtmp
type RTMPSession interface {
StartSession() error
WriteFrame(data []byte, dataLength uint) error
EndSession() error
StartSession() error
WriteFrame(data []byte, dataLength uint) error
EndSession() error
}
// rtmpSession provides parameters required for an rtmp communication session
type rtmpSession struct {
url string
timeout uint
running bool
mutex *sync.Mutex
url string
timeout uint
running bool
mutex *sync.Mutex
}
// NewRTMPSession returns a new instance of an rtmpSession struct
func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession){
session = new(rtmpSession)
session.url = url
session.timeout = connectTimeout
session.mutex = &sync.Mutex{}
return
func NewRTMPSession(url string, connectTimeout uint) (session *rtmpSession) {
session = new(rtmpSession)
session.url = url
session.timeout = connectTimeout
session.mutex = &sync.Mutex{}
return
}
// StartSession establishes an rtmp connection with the url passed into the
// constructor
func (s *rtmpSession) StartSession() error {
if !s.running {
if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) {
return errors.New("RTMP start error! Check rtmp log for details!")
}
s.running = true
} else {
return errors.New("Tried to start rtmp session, but already started!")
}
return nil
if !s.running {
if !uintToBool(uint(C.RTMP_start_session(C.CString(s.url), C.uint(s.timeout)))) {
return errors.New("RTMP start error! Check rtmp log for details!")
}
s.running = true
} else {
return errors.New("Tried to start rtmp session, but already started!")
}
return nil
}
// WriteFrame writes a frame (flv tag) to the rtmp connection
// TODO: Remove mutex
func (s *rtmpSession) WriteFrame(data []byte, dataLength uint) error {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.running {
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) {
return errors.New("RTMP write error! Check rtmp log for details!")
}
} else {
return errors.New("RTMP session not running, can't write!")
}
return nil
s.mutex.Lock()
defer s.mutex.Unlock()
if s.running {
dataCopy := make([]byte, len(data))
copy(dataCopy, data)
if !uintToBool(uint(C.RTMP_write_frame((*C.char)(unsafe.Pointer(&dataCopy[0])), C.uint(dataLength)))) {
return errors.New("RTMP write error! Check rtmp log for details!")
}
} else {
return errors.New("RTMP session not running, can't write!")
}
return nil
}
// EndSession terminates the rtmp connection
func (s *rtmpSession) EndSession() error {
if s.running {
if !uintToBool(uint(C.RTMP_end_session())) {
return errors.New("RTMP end session error! Check rtmp log for details!")
}
s.running = false
} else {
return errors.New("Tried to stop rtmp session, but not running!")
}
return nil
if s.running {
if !uintToBool(uint(C.RTMP_end_session())) {
return errors.New("RTMP end session error! Check rtmp log for details!")
}
s.running = false
} else {
return errors.New("Tried to stop rtmp session, but not running!")
}
return nil
}
// uintToBool takes a uint and returns the bool equivalent
func uintToBool(x uint) bool {
return x != 0
return x != 0
}

View File

@ -1,4 +1,3 @@
package rtmp
/*

View File

@ -29,8 +29,8 @@ LICENSE
package tools
import (
_"os"
_"fmt"
_ "fmt"
_ "os"
)
func BoolToByte(in bool) (out byte) {

View File

@ -29,45 +29,45 @@ LICENSE
package tools
import (
"testing"
"testing"
)
func TestH264Parsing(t *testing.T){
// Using file
func TestH264Parsing(t *testing.T) {
// Using file
/*
file, err := os.Open(fileName)
if err != nil {
panic("Could not open file!")
return
}
stats, err := file.Stat()
if err != nil {
panic("Could not get file stats!")
}
buffer := make([]byte, stats.Size())
_, err = file.Read(buffer)
if err != nil {
panic("Could not read file!")
}
file, err := os.Open(fileName)
if err != nil {
panic("Could not open file!")
return
}
stats, err := file.Stat()
if err != nil {
panic("Could not get file stats!")
}
buffer := make([]byte, stats.Size())
_, err = file.Read(buffer)
if err != nil {
panic("Could not read file!")
}
*/
// straight from buffer
someData := []byte{
0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5,
56,76,4,234,78,65,34,34,43,0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6,
4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45,
0, 0, 1, 7, 59, 100, 45, 82, 93, 0, 0, 1, 8, 23, 78, 65, 0, 0, 1, 6, 45, 34, 23, 3, 2, 0, 0, 1, 5, 3, 4, 5,
56, 76, 4, 234, 78, 65, 34, 34, 43, 0, 0, 1, 7, 67, 10, 45, 8, 93, 0, 0, 1, 8, 23, 7, 5, 0, 0, 1, 6,
4, 34, 2, 3, 2, 0, 0, 1, 1, 3, 4, 5, 5, 76, 4, 234, 78, 65, 34, 34, 43, 45,
}
nalAccess1 := []byte{
0,0,1,9,240,0,0,1,7,59,100,45,82,93,0,0,1,8,23,78,65,0,0,1,6,45,34,23,3,2,0,0,1,5,3,4,5,
56,76,4,234,78,65,34,34,43,
0, 0, 1, 9, 240, 0, 0, 1, 7, 59, 100, 45, 82, 93, 0, 0, 1, 8, 23, 78, 65, 0, 0, 1, 6, 45, 34, 23, 3, 2, 0, 0, 1, 5, 3, 4, 5,
56, 76, 4, 234, 78, 65, 34, 34, 43,
}
nalAccess2 := []byte{
0,0,1,9,240,0,0,1,7,67,10,45,8,93,0,0,1,8,23,7,5,0,0,1,6,
4,34,2,3,2,0,0,1,1,3,4,5,5,76,4,234,78,65,34,34,43,45,
0, 0, 1, 9, 240, 0, 0, 1, 7, 67, 10, 45, 8, 93, 0, 0, 1, 8, 23, 7, 5, 0, 0, 1, 6,
4, 34, 2, 3, 2, 0, 0, 1, 1, 3, 4, 5, 5, 76, 4, 234, 78, 65, 34, 34, 43, 45,
}
aChannel := make(chan []byte, 10)
var nalAccessChan chan<- []byte
nalAccessChan = aChannel
go ParseH264Buffer(someData,nalAccessChan)
go ParseH264Buffer(someData, nalAccessChan)
anAccessUnit := <-aChannel
for i := range anAccessUnit {
if anAccessUnit[i] != nalAccess1[i] {