This commit is contained in:
Josh Baker 2017-11-07 06:49:33 -07:00
parent e9ee8f49e1
commit 9471b43256
7 changed files with 489 additions and 224 deletions

176
evio.go
View File

@ -32,40 +32,48 @@ type Options struct {
TCPKeepAlive time.Duration TCPKeepAlive time.Duration
} }
// Addr represents the connection's remote and local addresses. // Conn represents a connection context which provides information
type Addr struct { // about the connection.
// Index is the index of server address that was passed to the Serve call. type Conn struct {
Index int // Closing is true when the connection is about to close. Expect a Closed
// Local is the connection's local socket address. // event to fire soon.
Local net.Addr Closing bool
// Local is the connection's remote peer address. // AddrIndex is the index of server address that was passed to the Serve call.
Remote net.Addr AddrIndex int
// LocalAddr is the connection's local socket address.
LocalAddr net.Addr
// RemoteAddr is the connection's remote peer address.
RemoteAddr net.Addr
} }
// Context represents a server context which provides information about the // Server represents a server context which provides information about the
// running server and has control functions for managing some state // running server and has control functions for managing some state
type Context struct { type Server struct {
// The addrs parameter is an array of listening addresses that align
// with the addr strings passed to the Serve function.
Addrs []net.Addr Addrs []net.Addr
// Wake is a goroutine-safe function that triggers a Data event
// (with a nil `in` parameter) for the specified id.
Wake func(id int) bool Wake func(id int) bool
Attach func(v interface{}) error // Dial makes a connection to an external server and returns a new
// connection id. The new connection is added to the event loop and
// is managed exactly the same way as all the other connections.
Dial func(addr string, timeout time.Duration) (id int, err error)
} }
// Events represents the server events for the Serve call. // Events represents the server events for the Serve call.
// Each event has an Action return value that is used manage the state // Each event has an Action return value that is used manage the state
// of the connection and server. // of the connection and server.
type Events struct { type Events struct {
// Serving fires when the server can accept connections. // Serving fires when the server can accept connections. The context
// The wake parameter is a goroutine-safe function that triggers // parameter has various utilities that may help with managing the
// a Data event (with a nil `in` parameter) for the specified id. // event loop.
// The addrs parameter is an array of listening addresses that align Serving func(s Server) (action Action)
// with the addr strings passed to the Serve function.
Serving func(c Context) (action Action)
// Opened fires when a new connection has opened. // Opened fires when a new connection has opened.
// The addr parameter is the connection's local and remote addresses. // The addr parameter is the connection's local and remote addresses.
// Use the out return value to write data to the connection. // Use the out return value to write data to the connection.
// The opts return value is used to set connection options. // The opts return value is used to set connection options.
Opened func(id int, addr Addr) (out []byte, opts Options, action Action) Opened func(id int, c Conn) (out []byte, opts Options, action Action)
Attached func(id int, v interface{}) (out []byte, opts Options, action Action)
// Closed fires when a connection has closed. // Closed fires when a connection has closed.
// The err parameter is the last known connection error, usually nil. // The err parameter is the last known connection error, usually nil.
Closed func(id int, err error) (action Action) Closed func(id int, err error) (action Action)
@ -116,14 +124,11 @@ func Serve(events Events, addr ...string) error {
}() }()
var stdlib bool var stdlib bool
for _, addr := range addr { for _, addr := range addr {
ln := listener{network: "tcp", addr: addr} var ln listener
if strings.Contains(addr, "://") { var stdlibt bool
ln.network = strings.Split(addr, "://")[0] ln.network, ln.addr, stdlibt = parseAddr(addr)
ln.addr = strings.Split(addr, "://")[1] if stdlibt {
}
if strings.HasSuffix(ln.network, "-net") {
stdlib = true stdlib = true
ln.network = ln.network[:len(ln.network)-4]
} }
if ln.network == "unix" { if ln.network == "unix" {
os.RemoveAll(ln.addr) os.RemoveAll(ln.addr)
@ -181,3 +186,122 @@ type listener struct {
addr string addr string
naddr net.Addr naddr net.Addr
} }
func parseAddr(addr string) (network, address string, stdlib bool) {
network = "tcp"
address = addr
if strings.Contains(address, "://") {
network = strings.Split(address, "://")[0]
address = strings.Split(address, "://")[1]
}
if strings.HasSuffix(network, "-net") {
stdlib = true
network = network[:len(network)-4]
}
return
}
// // type timeoutHeap []timeoutHeapItem
// // func (h timeoutHeap) Len() int { return len(h) }
// // func (h timeoutHeap) Less(i, j int) bool { return h[i].timeout < h[j].timeout }
// // func (h timeoutHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// // func (h *timeoutHeap) Push(x interface{}) {
// // *h = append(*h, x.(timeoutHeapItem))
// // }
// // func (h *timeoutHeap) Pop() interface{} {
// // old := *h
// // n := len(old)
// // x := old[n-1]
// // *h = old[0 : n-1]
// // return x
// // }
// type timeoutQueue struct {
// h *timeoutHeap
// }
// func newTimeoutQueue() *timeoutQueue {
// q := &timeoutQueue{&timeoutHeap{}}
// heap.Init(q.h)
// return q
// }
// func (q *timeoutQueue) len() int {
// return q.h.Len()
// }
// func (q *timeoutQueue) push(id int, timeout int64) {
// heap.Push(q.h, timeoutHeapItem{id: id, timeout: timeout})
// }
// func (q *timeoutQueue) peek() (id int, timeout int64) {
// if q.len() > 0 {
// id = (*(q.h))[0].id
// timeout = (*(q.h))[0].timeout
// }
// return
// }
// func (q *timeoutQueue) pop() (id int, timeout int64) {
// if q.len() > 0 {
// item := q.h.Pop().(timeoutHeapItem)
// id = item.id
// timeout = item.timeout
// }
// return
// }
// // func init() {
// // rand.Seed(time.Now().UnixNano())
// // q := newTimeoutQueue()
// // for i := 0; i < 1000; i++ {
// // q.push(i, rand.Int63()%9000)
// // }
// // for q.len() > 0 {
// // id, timeout := q.pop()
// // fmt.Printf("%05d %05d\n", id, timeout)
// // }
// // }
// type timeoutHeapItem struct {
// id int
// timeout int64
// }
// type timeoutHeap []timeoutHeapItem
// func (h timeoutHeap) Len() int { return len(h) }
// func (h timeoutHeap) Less(i, j int) bool { return h[i].timeout < h[j].timeout }
// func (h timeoutHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// func (h *timeoutHeap) Push(x interface{}) {
// // Push and Pop use pointer receivers because they modify the slice's length,
// // not just its contents.
// *h = append(*h, x.(timeoutHeapItem))
// }
// func (h *timeoutHeap) Pop() interface{} {
// old := *h
// n := len(old)
// x := old[n-1]
// *h = old[0 : n-1]
// return x
// }
// // This example inserts several ints into an IntHeap, checks the minimum,
// // and removes them in order of priority.
// func init() {
// q := newTimeoutQueue()
// rand.Seed(time.Now().UnixNano())
// // h := &timeoutHeap{}
// // heap.Init(h)
// for i := 10; i < 20; i++ {
// //heap.Push(h, timeoutHeapItem{i, rand.Int63() % 10})
// q.push(i, rand.Int63()%10)
// }
// _, timeout := q.peek()
// fmt.Printf("minimum: %d\n", timeout)
// for q.len() > 0 {
// //v := heap.Pop(h).(timeoutHeapItem)
// _, timeout = q.pop()
// fmt.Printf("%d ", timeout)
// }
// fmt.Printf("\n")
// }

View File

@ -54,19 +54,22 @@ func (ln *listener) system() error {
} }
type unixConn struct { type unixConn struct {
id, fd, p int id, fd int
outbuf []byte outbuf []byte
outpos int outpos int
action Action action Action
opts Options opts Options
raddr net.Addr timeout time.Time
laddr net.Addr
err error err error
wake bool wake bool
writeon bool writeon bool
detached bool detached bool
attaching bool
closed bool closed bool
opening bool
}
func (c *unixConn) Timeout() time.Time {
return c.timeout
} }
func (c *unixConn) Read(p []byte) (n int, err error) { func (c *unixConn) Read(p []byte) (n int, err error) {
@ -113,7 +116,6 @@ func (c *unixConn) Close() error {
c.closed = true c.closed = true
return err return err
} }
func serve(events Events, lns []*listener) error { func serve(events Events, lns []*listener) error {
p, err := internal.MakePoll() p, err := internal.MakePoll()
if err != nil { if err != nil {
@ -130,8 +132,10 @@ func serve(events Events, lns []*listener) error {
unlock := func() { mu.Unlock() } unlock := func() { mu.Unlock() }
fdconn := make(map[int]*unixConn) fdconn := make(map[int]*unixConn)
idconn := make(map[int]*unixConn) idconn := make(map[int]*unixConn)
timeoutqueue := internal.NewTimeoutQueue()
var id int var id int
ctx := Context{ ctx := Server{
Wake: func(id int) bool { Wake: func(id int) bool {
var ok = true var ok = true
var err error var err error
@ -141,7 +145,7 @@ func serve(events Events, lns []*listener) error {
ok = false ok = false
} else if !c.wake { } else if !c.wake {
c.wake = true c.wake = true
err = internal.AddWrite(c.p, c.fd, &c.writeon) err = internal.AddWrite(p, c.fd, &c.writeon)
} }
unlock() unlock()
if err != nil { if err != nil {
@ -149,69 +153,77 @@ func serve(events Events, lns []*listener) error {
} }
return ok return ok
}, },
Attach: func(v interface{}) error { Dial: func(addr string, timeout time.Duration) (int, error) {
var fd int network, address, _ := parseAddr(addr)
var taddr net.Addr
var err error var err error
switch v := v.(type) { switch network {
default: default:
return errors.New("invalid type") return 0, errors.New("invalid network")
case *net.TCPConn: case "unix":
f, err := v.File() case "tcp", "tcp4", "tcp6":
taddr, err = net.ResolveTCPAddr(network, address)
if err != nil { if err != nil {
return err return 0, err
}
}
var fd int
var sa syscall.Sockaddr
switch taddr := taddr.(type) {
case *net.UnixAddr:
sa = &syscall.SockaddrUnix{Name: taddr.Name}
case *net.TCPAddr:
if len(taddr.IP) == 4 {
var sa4 syscall.SockaddrInet4
copy(sa4.Addr[:], taddr.IP[:])
sa4.Port = taddr.Port
sa = &sa4
fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
} else if len(taddr.IP) == 16 {
var sa6 syscall.SockaddrInet6
copy(sa6.Addr[:], taddr.IP[:])
sa6.Port = taddr.Port
sa = &sa6
fd, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_STREAM, 0)
} else {
return 0, errors.New("invalid network")
}
} }
fd = int(f.Fd())
case *net.UnixConn:
f, err := v.File()
if err != nil { if err != nil {
return err return 0, err
} }
fd = int(f.Fd()) if err := syscall.SetNonblock(fd, true); err != nil {
case *os.File: syscall.Close(fd)
fd = int(v.Fd()) return 0, err
case int:
fd = v
case uintptr:
fd = int(v)
} }
err = syscall.SetNonblock(fd, true) err = syscall.Connect(fd, sa)
if err != nil { if err != nil && err != syscall.EINPROGRESS {
println(456) syscall.Close(fd)
return err return 0, err
} }
lock() lock()
err = internal.AddRead(p, fd) err = internal.AddRead(p, fd)
if err != nil { if err != nil {
unlock() unlock()
return err syscall.Close(fd)
return 0, err
} }
id++ id++
c := &unixConn{id: id, fd: fd, p: p} c := &unixConn{id: id, fd: fd, opening: true}
c.attaching = true
fdconn[fd] = c
idconn[id] = c
if events.Attached != nil {
unlock()
out, opts, action := events.Attached(id, v)
lock()
if opts.TCPKeepAlive > 0 {
internal.SetKeepAlive(fd, int(c.opts.TCPKeepAlive/time.Second))
}
c.action = action
if len(out) > 0 {
c.outbuf = append(c.outbuf, out...)
}
}
// if len(c.outbuf) > 0 || c.action != None {
err = internal.AddWrite(p, fd, &c.writeon) err = internal.AddWrite(p, fd, &c.writeon)
if err != nil { if err != nil {
unlock() unlock()
panic(err) syscall.Close(fd)
return 0, err
}
fdconn[fd] = c
idconn[id] = c
if timeout != 0 {
c.timeout = time.Now().Add(timeout)
timeoutqueue.Push(c)
} }
// }
unlock() unlock()
// println("---") return id, nil
return nil
}, },
} }
ctx.Addrs = make([]net.Addr, len(lns)) ctx.Addrs = make([]net.Addr, len(lns))
@ -228,46 +240,108 @@ func serve(events Events, lns []*listener) error {
lock() lock()
type fdid struct { type fdid struct {
fd, id int fd, id int
opts Options opening bool
} }
var fdids []fdid var fdids []fdid
for fd, c := range fdconn { for fd, c := range fdconn {
fdids = append(fdids, fdid{fd, c.id, c.opts}) fdids = append(fdids, fdid{fd, c.id, c.opening})
} }
sort.Slice(fdids, func(i, j int) bool { sort.Slice(fdids, func(i, j int) bool {
return fdids[j].id < fdids[i].id return fdids[j].id < fdids[i].id
}) })
for _, fdid := range fdids { for _, fdid := range fdids {
syscall.Close(fdid.fd) syscall.Close(fdid.fd)
if fdid.opening {
if events.Opened != nil {
laddr := getlocaladdr(fdid.fd)
raddr := getremoteaddr(fdid.fd)
unlock()
events.Opened(fdid.id, Conn{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
RemoteAddr: raddr,
})
lock()
}
}
if events.Closed != nil { if events.Closed != nil {
unlock() unlock()
events.Closed(fdid.id, nil) events.Closed(fdid.id, nil)
lock() lock()
} }
} }
syscall.Close(p)
unlock() unlock()
}() }()
var packet [0xFFFF]byte var packet [0xFFFF]byte
var evs = internal.MakeEvents(64) var evs = internal.MakeEvents(64)
var lastTicker time.Time nextTicker := time.Now()
var tickerDelay time.Duration
if events.Tick == nil {
tickerDelay = time.Hour
}
for { for {
pn, err := internal.Wait(p, evs, tickerDelay) delay := nextTicker.Sub(time.Now())
if delay < 0 {
delay = 0
} else if delay > time.Second/4 {
delay = time.Second / 4
}
pn, err := internal.Wait(p, evs, delay)
if err != nil && err != syscall.EINTR { if err != nil && err != syscall.EINTR {
return err return err
} }
if events.Tick != nil { if events.Tick != nil {
now := time.Now() remain := nextTicker.Sub(time.Now())
if now.Sub(lastTicker) > tickerDelay { if remain < 0 {
var tickerDelay time.Duration
var action Action var action Action
if events.Tick != nil {
tickerDelay, action = events.Tick() tickerDelay, action = events.Tick()
if action == Shutdown { if action == Shutdown {
return nil return nil
} }
lastTicker = now } else {
tickerDelay = time.Hour
}
nextTicker = time.Now().Add(tickerDelay + remain)
}
}
// check timeouts
if timeoutqueue.Len() > 0 {
var count int
now := time.Now()
for {
v := timeoutqueue.Peek()
if v == nil {
break
}
c := v.(*unixConn)
if now.After(v.Timeout()) {
timeoutqueue.Pop()
if _, ok := idconn[c.id]; ok {
delete(idconn, c.id)
delete(fdconn, c.fd)
syscall.Close(c.fd)
if events.Opened != nil {
laddr := getlocaladdr(c.fd)
raddr := getremoteaddr(c.fd)
events.Opened(c.id, Conn{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
RemoteAddr: raddr,
})
}
if events.Closed != nil {
events.Closed(c.id, syscall.ETIMEDOUT)
}
count++
}
} else {
break
}
}
if count > 0 {
// invalidate the current events and wait for more
continue
} }
} }
lock() lock()
@ -280,39 +354,25 @@ func serve(events Events, lns []*listener) error {
var ln *listener var ln *listener
var lnidx int var lnidx int
var fd = internal.GetFD(evs, i) var fd = internal.GetFD(evs, i)
var sa syscall.Sockaddr
for lnidx, ln = range lns { for lnidx, ln = range lns {
if fd == ln.fd { if fd == ln.fd {
goto accept goto accept
} }
} }
ln = nil
c = fdconn[fd] c = fdconn[fd]
if c == nil { if c == nil {
syscall.Close(fd) syscall.Close(fd)
goto next goto next
} }
if c.attaching { if c.opening {
println(fd)
goto next
// opt, err := syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_ERROR)
// if opt != 0 {
// } lnidx = -1
// fmt.Printf(">> %v %v\n", opt, err) goto opened
switch evs.([]syscall.Kevent_t)[i].Filter {
case syscall.EVFILT_WRITE:
println(123)
goto write
case syscall.EVFILT_READ:
println(456)
goto read
default:
goto next
}
} }
goto read goto read
accept: accept:
nfd, sa, err = syscall.Accept(fd) nfd, _, err = syscall.Accept(fd)
if err != nil { if err != nil {
goto next goto next
} }
@ -323,26 +383,32 @@ func serve(events Events, lns []*listener) error {
goto fail goto fail
} }
id++ id++
c = &unixConn{id: id, fd: nfd, p: p} c = &unixConn{id: id, fd: nfd}
fdconn[nfd] = c fdconn[nfd] = c
idconn[id] = c idconn[id] = c
c.laddr = getlocaladdr(fd, ln.ln) goto opened
c.raddr = getaddr(sa, ln.ln) opened:
if events.Opened != nil { if events.Opened != nil {
laddr := getlocaladdr(fd)
raddr := getremoteaddr(fd)
unlock() unlock()
out, c.opts, c.action = events.Opened(c.id, Addr{lnidx, c.laddr, c.raddr}) out, c.opts, c.action = events.Opened(c.id, Conn{
AddrIndex: lnidx,
LocalAddr: laddr,
RemoteAddr: raddr,
})
lock() lock()
if c.opts.TCPKeepAlive > 0 { if c.opts.TCPKeepAlive > 0 {
if _, ok := ln.ln.(*net.TCPListener); ok { internal.SetKeepAlive(c.fd, int(c.opts.TCPKeepAlive/time.Second))
if err = internal.SetKeepAlive(c.fd, int(c.opts.TCPKeepAlive/time.Second)); err != nil {
goto fail
}
}
} }
if len(out) > 0 { if len(out) > 0 {
c.outbuf = append(c.outbuf, out...) c.outbuf = append(c.outbuf, out...)
} }
} }
if c.opening {
c.opening = false
goto next
}
goto write goto write
read: read:
if c.action != None { if c.action != None {
@ -394,12 +460,11 @@ func serve(events Events, lns []*listener) error {
} }
} }
if n == 0 || err != nil { if n == 0 || err != nil {
println("C")
if c.action == Shutdown { if c.action == Shutdown {
goto close goto close
} }
if err == syscall.EAGAIN { if err == syscall.EAGAIN {
if err = internal.AddWrite(c.p, c.fd, &c.writeon); err != nil { if err = internal.AddWrite(p, c.fd, &c.writeon); err != nil {
goto fail goto fail
} }
goto next goto next
@ -418,7 +483,7 @@ func serve(events Events, lns []*listener) error {
} }
if len(c.outbuf)-c.outpos == 0 { if len(c.outbuf)-c.outpos == 0 {
if !c.wake { if !c.wake {
if err = internal.DelWrite(c.p, c.fd, &c.writeon); err != nil { if err = internal.DelWrite(p, c.fd, &c.writeon); err != nil {
goto fail goto fail
} }
} }
@ -426,7 +491,7 @@ func serve(events Events, lns []*listener) error {
goto close goto close
} }
} else { } else {
if err = internal.AddWrite(c.p, c.fd, &c.writeon); err != nil { if err = internal.AddWrite(p, c.fd, &c.writeon); err != nil {
goto fail goto fail
} }
} }
@ -434,6 +499,7 @@ func serve(events Events, lns []*listener) error {
close: close:
delete(fdconn, c.fd) delete(fdconn, c.fd)
delete(idconn, c.id) delete(idconn, c.id)
//delete(idtimeout, c.id)
if c.action == Detach { if c.action == Detach {
if events.Detached != nil { if events.Detached != nil {
c.detached = true c.detached = true
@ -476,30 +542,23 @@ func serve(events Events, lns []*listener) error {
} }
} }
func getlocaladdr(fd int, ln net.Listener) net.Addr { func getlocaladdr(fd int) net.Addr {
sa, _ := syscall.Getsockname(fd) sa, _ := syscall.Getsockname(fd)
return getaddr(sa, ln) return getaddr(sa)
} }
func getremoteaddr(fd int) net.Addr {
func getaddr(sa syscall.Sockaddr, ln net.Listener) net.Addr { sa, _ := syscall.Getpeername(fd)
switch ln.(type) { return getaddr(sa)
case *net.UnixListener: }
return ln.Addr() func getaddr(sa syscall.Sockaddr) net.Addr {
case *net.TCPListener:
var addr net.TCPAddr
switch sa := sa.(type) { switch sa := sa.(type) {
case *syscall.SockaddrInet4: default:
addr.IP = net.IP(sa.Addr[:])
addr.Port = sa.Port
return &addr
case *syscall.SockaddrInet6:
addr.IP = net.IP(sa.Addr[:])
addr.Port = sa.Port
if sa.ZoneId != 0 {
addr.Zone = strconv.FormatInt(int64(sa.ZoneId), 10)
}
return &addr
}
}
return nil return nil
case *syscall.SockaddrInet4:
return &net.TCPAddr{IP: net.IP(sa.Addr[:]), Port: sa.Port}
case *syscall.SockaddrInet6:
return &net.TCPAddr{IP: net.IP(sa.Addr[:]), Port: sa.Port, Zone: strconv.FormatInt(int64(sa.ZoneId), 10)}
case *syscall.SockaddrUnix:
return &net.UnixAddr{Net: "unix", Name: sa.Name}
}
} }

View File

@ -68,7 +68,7 @@ func servenet(events Events, lns []*listener) error {
var cmu sync.Mutex var cmu sync.Mutex
var idconn = make(map[int]*netConn) var idconn = make(map[int]*netConn)
var done bool var done bool
ctx := Context{ ctx := Server{
Wake: func(id int) bool { Wake: func(id int) bool {
cmu.Lock() cmu.Lock()
c := idconn[id] c := idconn[id]
@ -166,7 +166,11 @@ func servenet(events Events, lns []*listener) error {
var action Action var action Action
mu.Lock() mu.Lock()
if !done { if !done {
out, opts, action = events.Opened(id, Addr{lnidx, conn.LocalAddr(), conn.RemoteAddr()}) out, opts, action = events.Opened(id, Conn{
AddrIndex: lnidx,
LocalAddr: conn.LocalAddr(),
RemoteAddr: conn.RemoteAddr(),
})
} }
mu.Unlock() mu.Unlock()
if opts.TCPKeepAlive > 0 { if opts.TCPKeepAlive > 0 {

View File

@ -70,11 +70,11 @@ func NopConn(rw io.ReadWriter) net.Conn {
// that performs the actual translation. // that performs the actual translation.
func Translate( func Translate(
events Events, events Events,
should func(id int, addr Addr) bool, should func(id int, conn Conn) bool,
translate func(id int, rd io.ReadWriter) io.ReadWriter, translate func(id int, rd io.ReadWriter) io.ReadWriter,
) Events { ) Events {
tevents := events tevents := events
var ctx Context var ctx Server
var mu sync.Mutex var mu sync.Mutex
idc := make(map[int]*tconn) idc := make(map[int]*tconn)
get := func(id int) *tconn { get := func(id int) *tconn {
@ -180,23 +180,23 @@ func Translate(
c.mu.Unlock() c.mu.Unlock()
return err return err
} }
tevents.Serving = func(ctxin Context) (action Action) { tevents.Serving = func(ctxin Server) (action Action) {
ctx = ctxin ctx = ctxin
if events.Serving != nil { if events.Serving != nil {
action = events.Serving(ctx) action = events.Serving(ctx)
} }
return return
} }
tevents.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) { tevents.Opened = func(id int, conn Conn) (out []byte, opts Options, action Action) {
if should != nil && !should(id, addr) { if should != nil && !should(id, conn) {
if events.Opened != nil { if events.Opened != nil {
out, opts, action = events.Opened(id, addr) out, opts, action = events.Opened(id, conn)
} }
return return
} }
c := create(id) c := create(id)
if events.Opened != nil { if events.Opened != nil {
out, opts, c.action = events.Opened(id, addr) out, opts, c.action = events.Opened(id, conn)
if len(out) > 0 { if len(out) > 0 {
c.write(1, out) c.write(1, out)
out = nil out = nil

View File

@ -8,9 +8,8 @@ import (
"flag" "flag"
"fmt" "fmt"
"log" "log"
"net" "strconv"
"strings" "strings"
"syscall"
"time" "time"
"github.com/tidwall/evio" "github.com/tidwall/evio"
@ -23,61 +22,49 @@ type conn struct {
wget bool wget bool
} }
func Dial(network, addr string) (fd int, err error) {
taddr, err := net.ResolveTCPAddr(network, addr)
if err != nil {
return 0, err
}
fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
if err != nil {
return 0, err
}
if err := syscall.SetNonblock(fd, true); err != nil {
syscall.Close(fd)
return 0, err
}
var sa syscall.SockaddrInet4
copy(sa.Addr[:], taddr.IP[:])
sa.Port = taddr.Port
err = syscall.Connect(fd, &sa)
if err != nil && err != syscall.EINPROGRESS {
syscall.Close(fd)
return 0, err
}
return fd, nil
}
func main() { func main() {
var port int var port int
var unixsocket string var unixsocket string
var ctx evio.Context var srv evio.Server
flag.IntVar(&port, "port", 6380, "server port") flag.IntVar(&port, "port", 6380, "server port")
flag.StringVar(&unixsocket, "unixsocket", "socket", "unix socket") flag.StringVar(&unixsocket, "unixsocket", "socket", "unix socket")
flag.Parse() flag.Parse()
var conns = make(map[int]*conn) var conns = make(map[int]*conn)
var keys = make(map[string]string) var keys = make(map[string]string)
var events evio.Events var events evio.Events
events.Serving = func(ctxin evio.Context) (action evio.Action) { events.Serving = func(srvin evio.Server) (action evio.Action) {
ctx = ctxin srv = srvin
log.Printf("redis server started on port %d", port) log.Printf("redis server started on port %d", port)
if unixsocket != "" { if unixsocket != "" {
log.Printf("redis server started at %s", unixsocket) log.Printf("redis server started at %s", unixsocket)
} }
return return
} }
events.Attached = func(id int, v interface{}) (out []byte, opts evio.Options, action evio.Action) { wgetids := make(map[int]time.Time)
conns[id] = &conn{wget: true} events.Opened = func(id int, cn evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
println("attached", id) c := &conn{}
if !wgetids[id].IsZero() {
delete(wgetids, id)
c.wget = true
}
conns[id] = c
println("opened", id, c.wget)
if c.wget {
out = []byte("GET / HTTP/1.0\r\n\r\n") out = []byte("GET / HTTP/1.0\r\n\r\n")
}
return return
} }
events.Opened = func(id int, addr evio.Addr) (out []byte, opts evio.Options, action evio.Action) { events.Tick = func() (delay time.Duration, action evio.Action) {
println("opened", id) now := time.Now()
conns[id] = &conn{} for id, t := range wgetids {
if now.Sub(t) > time.Second {
srv.Wake(id)
}
}
delay = time.Second
return return
} }
events.Closed = func(id int, err error) (action evio.Action) { events.Closed = func(id int, err error) (action evio.Action) {
fmt.Printf("closed %d %v\n", id, err) fmt.Printf("closed %d %v\n", id, err)
delete(conns, id) delete(conns, id)
return return
@ -86,6 +73,7 @@ func main() {
c := conns[id] c := conns[id]
if c.wget { if c.wget {
println(string(in)) println(string(in))
action = evio.Close
return return
} }
data := c.is.Begin(in) data := c.is.Begin(in)
@ -109,33 +97,19 @@ func main() {
default: default:
out = redcon.AppendError(out, "ERR unknown command '"+string(args[0])+"'") out = redcon.AppendError(out, "ERR unknown command '"+string(args[0])+"'")
case "WGET": case "WGET":
if len(args) != 2 { if len(args) != 3 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command") out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else { } else {
start := time.Now() start := time.Now()
fd, err := Dial("tcp", string(args[1])) n, _ := strconv.ParseInt(string(args[2]), 10, 63)
cid, err := srv.Dial("tcp://"+string(args[1]), time.Duration(n)*time.Second)
if err != nil { if err != nil {
out = redcon.AppendError(out, err.Error()) out = redcon.AppendError(out, err.Error())
} else { } else {
time.Since(start) wgetids[cid] = time.Now()
println(cid, time.Since(start).String())
out = redcon.AppendOK(out) out = redcon.AppendOK(out)
ctx.Attach(fd)
} }
// conn, err := net.Dial("tcp", string(args[1]))
// if err != nil {
// out = redcon.AppendError(out, err.Error())
// } else {
// println(time.Since(start).String())
// f, err := conn.(*net.TCPConn).File()
// if err != nil {
// conn.Close()
// out = redcon.AppendError(out, err.Error())
// } else {
// out = redcon.AppendOK(out)
// ctx.Attach(f.Fd())
// }
// }
} }
case "PING": case "PING":
if len(args) > 2 { if len(args) > 2 {

34
internal/internal_test.go Normal file
View File

@ -0,0 +1,34 @@
package internal
import (
"fmt"
"testing"
"time"
)
type queueItem struct {
timeout time.Time
}
func (item *queueItem) Timeout() time.Time {
return item.timeout
}
func TestQueue(t *testing.T) {
q := NewTimeoutQueue()
item := &queueItem{timeout: time.Unix(0, 5)}
q.Push(item)
q.Push(&queueItem{timeout: time.Unix(0, 3)})
q.Push(&queueItem{timeout: time.Unix(0, 20)})
q.Push(&queueItem{timeout: time.Unix(0, 13)})
var out string
for q.Len() > 0 {
pitem := q.Peek()
item := q.Pop()
out += fmt.Sprintf("(%v:%v) ", pitem.Timeout().UnixNano(), item.Timeout().UnixNano())
}
exp := "(3:3) (5:5) (13:13) (20:20) "
if out != exp {
t.Fatalf("expected '%v', got '%v'", exp, out)
}
}

70
internal/timeoutqueue.go Normal file
View File

@ -0,0 +1,70 @@
package internal
import (
"container/heap"
"time"
)
// TimeoutQueueItem is an item for TimeoutQueue
type TimeoutQueueItem interface {
Timeout() time.Time
}
type timeoutPriorityQueue []TimeoutQueueItem
func (pq timeoutPriorityQueue) Len() int { return len(pq) }
func (pq timeoutPriorityQueue) Less(i, j int) bool {
return pq[i].Timeout().Before(pq[j].Timeout())
}
func (pq timeoutPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *timeoutPriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(TimeoutQueueItem))
}
func (pq *timeoutPriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// TimeoutQueue is a priority queue ordere be ascending time.Time.
type TimeoutQueue struct {
pq timeoutPriorityQueue
}
// NewTimeoutQueue returns a new TimeoutQueue.
func NewTimeoutQueue() *TimeoutQueue {
q := &TimeoutQueue{}
heap.Init(&q.pq)
return q
}
// Push adds a new item.
func (q *TimeoutQueue) Push(x TimeoutQueueItem) {
heap.Push(&q.pq, x)
}
// Pop removes and returns the items with the smallest value.
func (q *TimeoutQueue) Pop() TimeoutQueueItem {
return heap.Pop(&q.pq).(TimeoutQueueItem)
}
// Peek returns the items with the smallest value, but does not remove it.
func (q *TimeoutQueue) Peek() TimeoutQueueItem {
if q.Len() > 0 {
return q.pq[0]
}
return nil
}
// Len returns the number of items in the queue
func (q *TimeoutQueue) Len() int {
return q.pq.Len()
}