This commit is contained in:
Josh Baker 2017-11-07 09:52:16 -07:00
parent 9471b43256
commit 749915306d
6 changed files with 211 additions and 249 deletions

140
evio.go
View File

@ -32,9 +32,8 @@ type Options struct {
TCPKeepAlive time.Duration
}
// Conn represents a connection context which provides information
// about the connection.
type Conn struct {
// Info represents a information about the connection
type Info struct {
// Closing is true when the connection is about to close. Expect a Closed
// event to fire soon.
Closing bool
@ -47,35 +46,39 @@ type Conn struct {
}
// Server represents a server context which provides information about the
// running server and has control functions for managing some state
// running server and has control functions for managing state.
type Server struct {
// The addrs parameter is an array of listening addresses that align
// with the addr strings passed to the Serve function.
Addrs []net.Addr
// Wake is a goroutine-safe function that triggers a Data event
// (with a nil `in` parameter) for the specified id.
Wake func(id int) bool
// Dial makes a connection to an external server and returns a new
// connection id. The new connection is added to the event loop and
// is managed exactly the same way as all the other connections.
Dial func(addr string, timeout time.Duration) (id int, err error)
Wake func(id int) (ok bool)
// Dial is a goroutine-safe function makes a connection to an external
// server and returns a new connection id. The new connection is added
// to the event loop and is managed exactly the same way as all the
// other connections. This operation only fails if the server/loop has
// been shut down. When `ok` is true there will always be exactly one
// Opened and one Closed event following this call. Look for socket
// errors from the Closed event.
Dial func(addr string, timeout time.Duration) (id int, ok bool)
}
// Events represents the server events for the Serve call.
// Each event has an Action return value that is used manage the state
// of the connection and server.
type Events struct {
// Serving fires when the server can accept connections. The context
// parameter has various utilities that may help with managing the
// event loop.
Serving func(s Server) (action Action)
// Serving fires when the server can accept connections. The server
// parameter has information and various utilities.
Serving func(server Server) (action Action)
// Opened fires when a new connection has opened.
// The addr parameter is the connection's local and remote addresses.
// The info parameter has information about the connection such as
// it's local and remote address.
// Use the out return value to write data to the connection.
// The opts return value is used to set connection options.
Opened func(id int, c Conn) (out []byte, opts Options, action Action)
Opened func(id int, info Info) (out []byte, opts Options, action Action)
// Closed fires when a connection has closed.
// The err parameter is the last known connection error, usually nil.
// The err parameter is the last known connection error.
Closed func(id int, err error) (action Action)
// Detached fires when a connection has been previously detached.
// Once detached it's up to the receiver of this event to manage the
@ -200,108 +203,3 @@ func parseAddr(addr string) (network, address string, stdlib bool) {
}
return
}
// // type timeoutHeap []timeoutHeapItem
// // func (h timeoutHeap) Len() int { return len(h) }
// // func (h timeoutHeap) Less(i, j int) bool { return h[i].timeout < h[j].timeout }
// // func (h timeoutHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// // func (h *timeoutHeap) Push(x interface{}) {
// // *h = append(*h, x.(timeoutHeapItem))
// // }
// // func (h *timeoutHeap) Pop() interface{} {
// // old := *h
// // n := len(old)
// // x := old[n-1]
// // *h = old[0 : n-1]
// // return x
// // }
// type timeoutQueue struct {
// h *timeoutHeap
// }
// func newTimeoutQueue() *timeoutQueue {
// q := &timeoutQueue{&timeoutHeap{}}
// heap.Init(q.h)
// return q
// }
// func (q *timeoutQueue) len() int {
// return q.h.Len()
// }
// func (q *timeoutQueue) push(id int, timeout int64) {
// heap.Push(q.h, timeoutHeapItem{id: id, timeout: timeout})
// }
// func (q *timeoutQueue) peek() (id int, timeout int64) {
// if q.len() > 0 {
// id = (*(q.h))[0].id
// timeout = (*(q.h))[0].timeout
// }
// return
// }
// func (q *timeoutQueue) pop() (id int, timeout int64) {
// if q.len() > 0 {
// item := q.h.Pop().(timeoutHeapItem)
// id = item.id
// timeout = item.timeout
// }
// return
// }
// // func init() {
// // rand.Seed(time.Now().UnixNano())
// // q := newTimeoutQueue()
// // for i := 0; i < 1000; i++ {
// // q.push(i, rand.Int63()%9000)
// // }
// // for q.len() > 0 {
// // id, timeout := q.pop()
// // fmt.Printf("%05d %05d\n", id, timeout)
// // }
// // }
// type timeoutHeapItem struct {
// id int
// timeout int64
// }
// type timeoutHeap []timeoutHeapItem
// func (h timeoutHeap) Len() int { return len(h) }
// func (h timeoutHeap) Less(i, j int) bool { return h[i].timeout < h[j].timeout }
// func (h timeoutHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// func (h *timeoutHeap) Push(x interface{}) {
// // Push and Pop use pointer receivers because they modify the slice's length,
// // not just its contents.
// *h = append(*h, x.(timeoutHeapItem))
// }
// func (h *timeoutHeap) Pop() interface{} {
// old := *h
// n := len(old)
// x := old[n-1]
// *h = old[0 : n-1]
// return x
// }
// // This example inserts several ints into an IntHeap, checks the minimum,
// // and removes them in order of priority.
// func init() {
// q := newTimeoutQueue()
// rand.Seed(time.Now().UnixNano())
// // h := &timeoutHeap{}
// // heap.Init(h)
// for i := 10; i < 20; i++ {
// //heap.Push(h, timeoutHeapItem{i, rand.Int63() % 10})
// q.push(i, rand.Int63()%10)
// }
// _, timeout := q.peek()
// fmt.Printf("minimum: %d\n", timeout)
// for q.len() > 0 {
// //v := heap.Pop(h).(timeoutHeapItem)
// _, timeout = q.pop()
// fmt.Printf("%d ", timeout)
// }
// fmt.Printf("\n")
// }

View File

@ -7,7 +7,6 @@
package evio
import (
"errors"
"net"
"os"
"sort"
@ -34,12 +33,13 @@ func (ln *listener) close() {
}
}
// system takes the net listener and detaches it from it's parent
// event loop, grabs the file descriptor, and makes it non-blocking.
func (ln *listener) system() error {
var err error
switch netln := ln.ln.(type) {
default:
ln.close()
return errors.New("network not supported")
panic("invalid listener type")
case *net.TCPListener:
ln.f, err = netln.File()
case *net.UnixListener:
@ -53,6 +53,8 @@ func (ln *listener) system() error {
return syscall.SetNonblock(ln.fd, true)
}
// unixConn represents the connection as the event loop sees it.
// This is also becomes a detached connection.
type unixConn struct {
id, fd int
outbuf []byte
@ -61,6 +63,7 @@ type unixConn struct {
opts Options
timeout time.Time
err error
dialerr error
wake bool
writeon bool
detached bool
@ -71,7 +74,6 @@ type unixConn struct {
func (c *unixConn) Timeout() time.Time {
return c.timeout
}
func (c *unixConn) Read(p []byte) (n int, err error) {
return syscall.Read(c.fd, p)
}
@ -128,104 +130,109 @@ func serve(events Events, lns []*listener) error {
}
}
var mu sync.Mutex
var done bool
lock := func() { mu.Lock() }
unlock := func() { mu.Unlock() }
fdconn := make(map[int]*unixConn)
idconn := make(map[int]*unixConn)
timeoutqueue := internal.NewTimeoutQueue()
var id int
ctx := Server{
Wake: func(id int) bool {
var ok = true
var err error
lock()
c := idconn[id]
if c == nil {
ok = false
} else if !c.wake {
c.wake = true
err = internal.AddWrite(p, c.fd, &c.writeon)
}
dial := func(addr string, timeout time.Duration) (int, bool) {
lock()
if done {
unlock()
if err != nil {
panic(err)
}
return ok
},
Dial: func(addr string, timeout time.Duration) (int, error) {
network, address, _ := parseAddr(addr)
var taddr net.Addr
var err error
switch network {
default:
return 0, errors.New("invalid network")
case "unix":
case "tcp", "tcp4", "tcp6":
taddr, err = net.ResolveTCPAddr(network, address)
return 0, false
}
id++
c := &unixConn{id: id, opening: true}
idconn[id] = c
if timeout != 0 {
c.timeout = time.Now().Add(timeout)
timeoutqueue.Push(c)
}
unlock()
// resolving an address blocks and we don't want blocking, like ever.
// but since we're leaving the event loop we'll need to complete the
// socket connection in a goroutine and add the read and write events
// to the loop to get back into the loop.
go func() {
err := func() error {
sa, err := resolve(addr)
if err != nil {
return 0, err
return err
}
}
var fd int
var sa syscall.Sockaddr
switch taddr := taddr.(type) {
case *net.UnixAddr:
sa = &syscall.SockaddrUnix{Name: taddr.Name}
case *net.TCPAddr:
if len(taddr.IP) == 4 {
var sa4 syscall.SockaddrInet4
copy(sa4.Addr[:], taddr.IP[:])
sa4.Port = taddr.Port
sa = &sa4
var fd int
switch sa.(type) {
case *syscall.SockaddrUnix:
fd, err = syscall.Socket(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
case *syscall.SockaddrInet4:
fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
} else if len(taddr.IP) == 16 {
var sa6 syscall.SockaddrInet6
copy(sa6.Addr[:], taddr.IP[:])
sa6.Port = taddr.Port
sa = &sa6
case *syscall.SockaddrInet6:
fd, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_STREAM, 0)
} else {
return 0, errors.New("invalid network")
}
}
if err != nil {
return 0, err
}
if err := syscall.SetNonblock(fd, true); err != nil {
syscall.Close(fd)
return 0, err
}
err = syscall.Connect(fd, sa)
if err != nil && err != syscall.EINPROGRESS {
syscall.Close(fd)
return 0, err
}
lock()
err = internal.AddRead(p, fd)
if err != nil {
err = syscall.Connect(fd, sa)
if err != nil && err != syscall.EINPROGRESS {
syscall.Close(fd)
return err
}
if err := syscall.SetNonblock(fd, true); err != nil {
syscall.Close(fd)
return err
}
lock()
err = internal.AddRead(p, fd)
if err != nil {
unlock()
syscall.Close(fd)
return err
}
err = internal.AddWrite(p, fd, &c.writeon)
if err != nil {
unlock()
syscall.Close(fd)
return err
}
c.fd = fd
fdconn[fd] = c
unlock()
syscall.Close(fd)
return 0, err
}
id++
c := &unixConn{id: id, fd: fd, opening: true}
err = internal.AddWrite(p, fd, &c.writeon)
return nil
}()
if err != nil {
unlock()
syscall.Close(fd)
return 0, err
}
fdconn[fd] = c
idconn[id] = c
if timeout != 0 {
c.timeout = time.Now().Add(timeout)
// set a dial error and timeout right away
lock()
c.dialerr = err
c.timeout = time.Now()
timeoutqueue.Push(c)
unlock()
}
unlock()
return id, nil
},
}()
return id, true
}
// wake wakes up a connection
wake := func(id int) bool {
var ok = true
var err error
lock()
if done {
unlock()
return false
}
c := idconn[id]
if c == nil || c.fd == 0 {
ok = false
} else if !c.wake {
c.wake = true
err = internal.AddWrite(p, c.fd, &c.writeon)
}
unlock()
if err != nil {
panic(err)
}
return ok
}
ctx := Server{Wake: wake, Dial: dial}
ctx.Addrs = make([]net.Addr, len(lns))
for i, ln := range lns {
ctx.Addrs[i] = ln.naddr
@ -238,25 +245,28 @@ func serve(events Events, lns []*listener) error {
}
defer func() {
lock()
done = true
type fdid struct {
fd, id int
opening bool
}
var fdids []fdid
for fd, c := range fdconn {
fdids = append(fdids, fdid{fd, c.id, c.opening})
for _, c := range idconn {
fdids = append(fdids, fdid{c.fd, c.id, c.opening})
}
sort.Slice(fdids, func(i, j int) bool {
return fdids[j].id < fdids[i].id
})
for _, fdid := range fdids {
syscall.Close(fdid.fd)
if fdid.fd != 0 {
syscall.Close(fdid.fd)
}
if fdid.opening {
if events.Opened != nil {
laddr := getlocaladdr(fdid.fd)
raddr := getremoteaddr(fdid.fd)
unlock()
events.Opened(fdid.id, Conn{
events.Opened(fdid.id, Info{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
@ -272,6 +282,8 @@ func serve(events Events, lns []*listener) error {
}
}
syscall.Close(p)
fdconn = nil
idconn = nil
unlock()
}()
var packet [0xFFFF]byte
@ -288,23 +300,21 @@ func serve(events Events, lns []*listener) error {
if err != nil && err != syscall.EINTR {
return err
}
if events.Tick != nil {
remain := nextTicker.Sub(time.Now())
if remain < 0 {
var tickerDelay time.Duration
var action Action
if events.Tick != nil {
tickerDelay, action = events.Tick()
if action == Shutdown {
return nil
}
} else {
tickerDelay = time.Hour
remain := nextTicker.Sub(time.Now())
if remain < 0 {
var tickerDelay time.Duration
var action Action
if events.Tick != nil {
tickerDelay, action = events.Tick()
if action == Shutdown {
return nil
}
nextTicker = time.Now().Add(tickerDelay + remain)
} else {
tickerDelay = time.Hour
}
nextTicker = time.Now().Add(tickerDelay + remain)
}
// check timeouts
// check for dial connection timeouts
if timeoutqueue.Len() > 0 {
var count int
now := time.Now()
@ -316,14 +326,14 @@ func serve(events Events, lns []*listener) error {
c := v.(*unixConn)
if now.After(v.Timeout()) {
timeoutqueue.Pop()
if _, ok := idconn[c.id]; ok {
if _, ok := idconn[c.id]; ok && c.opening {
delete(idconn, c.id)
delete(fdconn, c.fd)
syscall.Close(c.fd)
if events.Opened != nil {
laddr := getlocaladdr(c.fd)
raddr := getremoteaddr(c.fd)
events.Opened(c.id, Conn{
events.Opened(c.id, Info{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
@ -331,7 +341,11 @@ func serve(events Events, lns []*listener) error {
})
}
if events.Closed != nil {
events.Closed(c.id, syscall.ETIMEDOUT)
if c.dialerr != nil {
events.Closed(c.id, c.dialerr)
} else {
events.Closed(c.id, syscall.ETIMEDOUT)
}
}
count++
}
@ -392,7 +406,7 @@ func serve(events Events, lns []*listener) error {
laddr := getlocaladdr(fd)
raddr := getremoteaddr(fd)
unlock()
out, c.opts, c.action = events.Opened(c.id, Conn{
out, c.opts, c.action = events.Opened(c.id, Info{
AddrIndex: lnidx,
LocalAddr: laddr,
RemoteAddr: raddr,
@ -543,10 +557,16 @@ func serve(events Events, lns []*listener) error {
}
func getlocaladdr(fd int) net.Addr {
if fd == 0 {
return nil
}
sa, _ := syscall.Getsockname(fd)
return getaddr(sa)
}
func getremoteaddr(fd int) net.Addr {
if fd == 0 {
return nil
}
sa, _ := syscall.Getpeername(fd)
return getaddr(sa)
}
@ -562,3 +582,39 @@ func getaddr(sa syscall.Sockaddr) net.Addr {
return &net.UnixAddr{Net: "unix", Name: sa.Name}
}
}
// resolve resolves an evio address and retuns a sockaddr for socket
// connection to external servers.
func resolve(addr string) (sa syscall.Sockaddr, err error) {
network, address, _ := parseAddr(addr)
var taddr net.Addr
switch network {
default:
return nil, net.UnknownNetworkError(network)
case "unix":
taddr = &net.UnixAddr{Net: "unix", Name: address}
case "tcp", "tcp4", "tcp6":
// use the stdlib resolver because it's good.
taddr, err = net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}
}
switch taddr := taddr.(type) {
case *net.UnixAddr:
sa = &syscall.SockaddrUnix{Name: taddr.Name}
case *net.TCPAddr:
if len(taddr.IP) == 4 {
var sa4 syscall.SockaddrInet4
copy(sa4.Addr[:], taddr.IP[:])
sa4.Port = taddr.Port
sa = &sa4
} else if len(taddr.IP) == 16 {
var sa6 syscall.SockaddrInet6
copy(sa6.Addr[:], taddr.IP[:])
sa6.Port = taddr.Port
sa = &sa6
}
}
return sa, nil
}

View File

@ -166,7 +166,7 @@ func servenet(events Events, lns []*listener) error {
var action Action
mu.Lock()
if !done {
out, opts, action = events.Opened(id, Conn{
out, opts, action = events.Opened(id, Info{
AddrIndex: lnidx,
LocalAddr: conn.LocalAddr(),
RemoteAddr: conn.RemoteAddr(),

View File

@ -70,7 +70,7 @@ func NopConn(rw io.ReadWriter) net.Conn {
// that performs the actual translation.
func Translate(
events Events,
should func(id int, conn Conn) bool,
should func(id int, info Info) bool,
translate func(id int, rd io.ReadWriter) io.ReadWriter,
) Events {
tevents := events
@ -187,16 +187,16 @@ func Translate(
}
return
}
tevents.Opened = func(id int, conn Conn) (out []byte, opts Options, action Action) {
if should != nil && !should(id, conn) {
tevents.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
if should != nil && !should(id, info) {
if events.Opened != nil {
out, opts, action = events.Opened(id, conn)
out, opts, action = events.Opened(id, info)
}
return
}
c := create(id)
if events.Opened != nil {
out, opts, c.action = events.Opened(id, conn)
out, opts, c.action = events.Opened(id, info)
if len(out) > 0 {
c.write(1, out)
out = nil

View File

@ -29,7 +29,7 @@ type request struct {
}
type conn struct {
addr evio.Addr
info evio.Info
is evio.InputStream
}
@ -39,7 +39,8 @@ func main() {
var tlspem string
var aaaa bool
var noparse bool
var unixsocket string
flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
flag.IntVar(&port, "port", 8080, "server port")
flag.IntVar(&tlsport, "tlsport", 4443, "tls port")
flag.StringVar(&tlspem, "tlscert", "", "tls pem cert/key file")
@ -60,16 +61,19 @@ func main() {
var events evio.Events
var conns = make(map[int]*conn)
events.Serving = func(ctx evio.Context) (action evio.Action) {
events.Serving = func(server evio.Server) (action evio.Action) {
log.Printf("http server started on port %d", port)
if tlspem != "" {
log.Printf("https server started on port %d", tlsport)
}
if unixsocket != "" {
log.Printf("http server started at %s", unixsocket)
}
return
}
events.Opened = func(id int, addr evio.Addr) (out []byte, opts evio.Options, action evio.Action) {
conns[id] = &conn{addr: addr}
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
conns[id] = &conn{info: info}
//log.Printf("opened: %d: %s: %s", id, addr.Local.String(), addr.Remote.String())
return
}
@ -82,6 +86,7 @@ func main() {
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
println(2)
if in == nil {
return
}
@ -106,7 +111,7 @@ func main() {
break
}
// handle the request
req.remoteAddr = c.addr.Remote.String()
req.remoteAddr = c.info.RemoteAddr.String()
out = appendhandle(out, &req)
data = leftover
}
@ -127,9 +132,9 @@ func main() {
// TLS translate the events
events = evio.Translate(events,
func(id int, addr evio.Addr) bool {
func(id int, info evio.Info) bool {
// only translate for the second address.
return addr.Index == 1
return info.AddrIndex == 1
},
func(id int, rw io.ReadWriter) io.ReadWriter {
// Use the standard Go crypto/tls package and create a tls.Conn
@ -140,6 +145,9 @@ func main() {
},
)
}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix://%s", unixsocket))
}
// Start serving!
log.Fatal(evio.Serve(events, addrs...))
}

View File

@ -41,7 +41,7 @@ func main() {
return
}
wgetids := make(map[int]time.Time)
events.Opened = func(id int, cn evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
c := &conn{}
if !wgetids[id].IsZero() {
delete(wgetids, id)
@ -72,8 +72,8 @@ func main() {
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
c := conns[id]
if c.wget {
println(string(in))
action = evio.Close
print(string(in))
//action = evio.Close
return
}
data := c.is.Begin(in)
@ -102,12 +102,12 @@ func main() {
} else {
start := time.Now()
n, _ := strconv.ParseInt(string(args[2]), 10, 63)
cid, err := srv.Dial("tcp://"+string(args[1]), time.Duration(n)*time.Second)
if err != nil {
out = redcon.AppendError(out, err.Error())
cid, ok := srv.Dial("unix://"+string(args[1]), time.Duration(n)*time.Second)
if !ok {
out = redcon.AppendError(out, "failed to dial")
} else {
wgetids[cid] = time.Now()
println(cid, time.Since(start).String())
wgetids[cid] = time.Now()
out = redcon.AppendOK(out)
}
}