/*
NAME
  senders.go

DESCRIPTION
  See Readme.md

AUTHORS
  Saxon A. Nelson-Milton <saxon@ausocean.org>
  Alan Noble <alan@ausocean.org>

LICENSE
  revid is Copyright (C) 2017-2018 the Australian Ocean Lab (AusOcean)

  It is free software: you can redistribute it and/or modify them
  under the terms of the GNU General Public License as published by the
  Free Software Foundation, either version 3 of the License, or (at your
  option) any later version.

  It is distributed in the hope that it will be useful, but WITHOUT
  ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
 for more details.

  You should have received a copy of the GNU General Public License
  along with revid in gpl.txt.  If not, see http://www.gnu.org/licenses.
*/

package revid

import (
	"fmt"
	"io"
	"net"
	"os"
	"os/exec"
	"strconv"

	"bitbucket.org/ausocean/av/rtmp"
	"bitbucket.org/ausocean/av/stream/mts"
	"bitbucket.org/ausocean/av/stream/rtp"
	"bitbucket.org/ausocean/iot/pi/netsender"
	"bitbucket.org/ausocean/utils/logger"
	"bitbucket.org/ausocean/utils/ring"
	"github.com/Comcast/gots/packet"
)

type sender interface {
	send(d []byte) error
}

// httpSender implements loadSender for posting HTTP to NetReceiver
type minimalHttpSender struct {
	client *netsender.Sender

	log func(lvl int8, msg string, args ...interface{})
}

func newMinimalHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *minimalHttpSender {
	return &minimalHttpSender{
		client: ns,
		log:    log,
	}
}

func (s *minimalHttpSender) send(d []byte) error {
	return httpSend(d, s.client, s.log)
}

// loadSender is a destination to send a *ring.Chunk to.
// When a loadSender has finished using the *ring.Chunk
// it must be Closed.
type loadSender interface {
	// load assigns the *ring.Chunk to the loadSender.
	// The load call may fail, but must not mutate the
	// the chunk.
	load(*ring.Chunk) error

	// send performs a destination-specific send
	// operation. It must not mutate the chunk.
	send() error

	// release releases the *ring.Chunk.
	release()

	// close cleans up after use of the loadSender.
	close() error
}

// restart is an optional interface for loadSenders that
// can restart their connection.
type restarter interface {
	restart() error
}

// fileSender implements loadSender for a local file destination.
type fileSender struct {
	file *os.File

	chunk *ring.Chunk
}

func newFileSender(path string) (*fileSender, error) {
	f, err := os.Create(path)
	if err != nil {
		return nil, err
	}
	return &fileSender{file: f}, nil
}

func (s *fileSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *fileSender) send() error {
	_, err := s.chunk.WriteTo(s.file)
	return err
}

func (s *fileSender) release() {
	s.chunk.Close()
	s.chunk = nil
}

func (s *fileSender) close() error {
	return s.file.Close()
}

// mtsSender provides sending capability specifically for use with
// mpegts packetization. It handles the construction of appropriately lengthed
// clips based on PSI. It also fixes accounts for discontinuities by setting
// the discontinuity indicator for the first packet of a clip.
type mtsSender struct {
	sender   sender
	buf      []byte
	pkt      [mts.PacketSize]byte
	fail     bool
	discard  bool
	repairer *mts.DiscontinuityRepairer
	chunk    *ring.Chunk
}

// newmtsSender returns a new mtsSender.
func newMtsSender(s sender, log func(lvl int8, msg string, args ...interface{})) *mtsSender {
	return &mtsSender{
		sender:   s,
		repairer: mts.NewDiscontinuityRepairer(),
	}
}

// load takes a *ring.Chunk and extracts bytes copying into s.pkt for use by the sender.
func (s *mtsSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

// send checks the most recently loaded packet and if it is a PAT then the clip
// in s.buf is sent, otherwise the packet is added to s.buf.
func (s *mtsSender) send() error {
	copy(s.pkt[:], s.chunk.Bytes())
	p := (*packet.Packet)(&s.pkt)
	pid := p.PID()
	cc := p.ContinuityCounter()

	if s.discard {
		if pid != mts.PatPid {
			return nil
		}
		s.discard = false
	}

	if pid == mts.VideoPid {
		expect, exists := s.repairer.ExpectedCC(pid)
		if !exists {
			s.repairer.SetExpectedCC(pid, cc)
		} else if cc != expect {
			s.repairer.SetExpectedCC(pid, cc)
			s.discard = true
			s.buf = s.buf[:0]
			return nil
		}
	}

	if s.fail || (((*packet.Packet)(&s.pkt)).PID() == mts.PatPid && len(s.buf) != 0) {
		err := s.fixAndSend()
		if err != nil {
			s.failed()
			return err
		}
		s.fail = false
		s.buf = s.buf[:0]
	}
	s.buf = append(s.buf, s.chunk.Bytes()...)
	return nil
}

// failed sets the s.fail flag to true, and let's the discontinuity
// repairer know that there has been a failed send.
func (s *mtsSender) failed() {
	s.fail = true
	s.repairer.Failed()
}

// fixAndSend uses the discontinuity repairer to ensure there is not a
// discontinuity, and if so sets the discontinuity indicator of the PAT packet.
func (ms *mtsSender) fixAndSend() error {
	err := ms.repairer.Repair(ms.buf)
	if err != nil {
		return err
	}
	err = ms.sender.send(ms.buf)
	if err != nil {
		return err
	}
	return nil
}

func (s *mtsSender) close() error { return nil }

// release will set the s.fail flag to fals and clear the buffer if
// the previous send was a fail.
func (s *mtsSender) release() {
	if s.fail {
		s.fail = false
		s.buf = s.buf[:0]
	}
	s.chunk.Close()
	s.chunk = nil
}

// httpSender implements loadSender for posting HTTP to NetReceiver
type httpSender struct {
	client *netsender.Sender

	log func(lvl int8, msg string, args ...interface{})

	chunk *ring.Chunk
}

func newHttpSender(ns *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) *httpSender {
	return &httpSender{
		client: ns,
		log:    log,
	}
}

func (s *httpSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *httpSender) send() error {
	if s.chunk == nil {
		// Do not retry with httpSender,
		// so just return without error
		// if the chunk has been cleared.
		return nil
	}
	return httpSend(s.chunk.Bytes(), s.client, s.log)
}

func httpSend(d []byte, client *netsender.Sender, log func(lvl int8, msg string, args ...interface{})) error {
	// Only send if "V0" is configured as an input.
	send := false
	ip := client.Param("ip")
	pins := netsender.MakePins(ip, "V")
	for i, pin := range pins {
		if pin.Name == "V0" {
			send = true
			pins[i].Value = len(d)
			pins[i].Data = d
			pins[i].MimeType = "video/mp2t"
			break
		}
	}

	if !send {
		return nil
	}
	var err error
	var reply string
	reply, _, err = client.Send(netsender.RequestRecv, pins)
	if err != nil {
		return err
	}
	return extractMeta(reply, log)
}

// extractMeta looks at a reply at extracts any time or location data - then used
// to update time and location information in the mpegts encoder.
func extractMeta(r string, log func(lvl int8, msg string, args ...interface{})) error {
	dec, err := netsender.NewJSONDecoder(r)
	if err != nil {
		return nil
	}
	// Extract time from reply
	t, err := dec.Int("ts")
	if err != nil {
		log(logger.Warning, pkg+"No timestamp in reply")
	} else {
		log(logger.Debug, fmt.Sprintf("%v got timestamp: %v", pkg, t))
		mts.Meta.Add("ts", strconv.Itoa(t))
	}

	// Extract location from reply
	g, err := dec.String("ll")
	if err != nil {
		log(logger.Warning, pkg+"No location in reply")
	} else {
		log(logger.Debug, fmt.Sprintf("%v got location: %v", pkg, g))
		mts.Meta.Add("loc", g)
	}

	return nil
}

func (s *httpSender) release() {
	// We will not retry, so release
	// the chunk and clear it now.
	s.chunk.Close()
	s.chunk = nil
}

func (s *httpSender) close() error { return nil }

// ffmpegSender implements loadSender for an FFMPEG RTMP destination.
type ffmpegSender struct {
	ffmpeg io.WriteCloser

	chunk *ring.Chunk
}

func newFfmpegSender(url, framerate string) (*ffmpegSender, error) {
	cmd := exec.Command(ffmpegPath,
		"-f", "h264",
		"-r", framerate,
		"-i", "-",
		"-f", "lavfi",
		"-i", "aevalsrc=0",
		"-fflags", "nobuffer",
		"-vcodec", "copy",
		"-acodec", "aac",
		"-map", "0:0",
		"-map", "1:0",
		"-strict", "experimental",
		"-f", "flv",
		url,
	)
	w, err := cmd.StdinPipe()
	if err != nil {
		return nil, err
	}
	err = cmd.Start()
	if err != nil {
		return nil, err
	}
	return &ffmpegSender{ffmpeg: w}, nil
}

func (s *ffmpegSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *ffmpegSender) send() error {
	_, err := s.chunk.WriteTo(s.ffmpeg)
	return err
}

func (s *ffmpegSender) release() {
	s.chunk.Close()
	s.chunk = nil
}

func (s *ffmpegSender) close() error {
	return s.ffmpeg.Close()
}

// rtmpSender implements loadSender for a native RTMP destination.
type rtmpSender struct {
	conn *rtmp.Conn

	url     string
	timeout uint
	retries int
	log     func(lvl int8, msg string, args ...interface{})

	chunk *ring.Chunk
}

var _ restarter = (*rtmpSender)(nil)

func newRtmpSender(url string, timeout uint, retries int, log func(lvl int8, msg string, args ...interface{})) (*rtmpSender, error) {
	var conn *rtmp.Conn
	var err error
	for n := 0; n < retries; n++ {
		conn, err = rtmp.Dial(url, timeout, log)
		if err == nil {
			break
		}
		log(logger.Error, err.Error())
		conn.Close()
		if n < retries-1 {
			log(logger.Info, pkg+"retry rtmp connection")
		}
	}
	if err != nil {
		return nil, err
	}

	s := &rtmpSender{
		conn:    conn,
		url:     url,
		timeout: timeout,
		retries: retries,
		log:     log,
	}
	return s, nil
}

func (s *rtmpSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *rtmpSender) send() error {
	_, err := s.chunk.WriteTo(s.conn)
	return err
}

func (s *rtmpSender) release() {
	s.chunk.Close()
	s.chunk = nil
}

func (s *rtmpSender) restart() error {
	err := s.conn.Close()
	if err != nil {
		return err
	}
	for n := 0; n < s.retries; n++ {
		s.conn, err = rtmp.Dial(s.url, s.timeout, s.log)
		if err == nil {
			break
		}
		s.log(logger.Error, err.Error())
		s.conn.Close()
		if n < s.retries-1 {
			s.log(logger.Info, pkg+"retry rtmp connection")
		}
	}
	return err
}

func (s *rtmpSender) close() error {
	return s.conn.Close()
}

// udpSender implements loadSender for a native udp destination.
type udpSender struct {
	conn  net.Conn
	log   func(lvl int8, msg string, args ...interface{})
	chunk *ring.Chunk
}

func newUdpSender(addr string, log func(lvl int8, msg string, args ...interface{})) (*udpSender, error) {
	conn, err := net.Dial("udp", addr)
	if err != nil {
		return nil, err
	}
	return &udpSender{
		conn: conn,
		log:  log,
	}, nil
}

func (s *udpSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *udpSender) send() error {
	_, err := s.chunk.WriteTo(s.conn)
	return err
}

func (s *udpSender) release() {
	s.chunk.Close()
	s.chunk = nil
}

func (s *udpSender) close() error { return nil }

// TODO: Write restart func for rtpSender
// rtpSender implements loadSender for a native udp destination with rtp packetization.
type rtpSender struct {
	log     func(lvl int8, msg string, args ...interface{})
	encoder *rtp.Encoder
	chunk   *ring.Chunk
}

func newRtpSender(addr string, log func(lvl int8, msg string, args ...interface{}), fps uint) (*rtpSender, error) {
	conn, err := net.Dial("udp", addr)
	if err != nil {
		return nil, err
	}
	s := &rtpSender{
		log:     log,
		encoder: rtp.NewEncoder(conn, int(fps)),
	}
	return s, nil
}

func (s *rtpSender) load(c *ring.Chunk) error {
	s.chunk = c
	return nil
}

func (s *rtpSender) close() error { return nil }

func (s *rtpSender) release() {
	s.chunk.Close()
	s.chunk = nil
}

func (s *rtpSender) send() error {
	_, err := s.chunk.WriteTo(s.encoder)
	return err
}