return accurate local addr on open

Use the write event before the read event and wait for an accurate local
address prior to firing the Opened event.
This commit is contained in:
Josh Baker 2017-11-08 16:45:30 -07:00
parent 81bd0514f4
commit 5641498bcd
5 changed files with 227 additions and 99 deletions

View File

@ -10,7 +10,6 @@ import (
"net"
"os"
"sort"
"strconv"
"sync"
"syscall"
"time"
@ -62,9 +61,13 @@ type unixConn struct {
action Action
opts Options
timeout time.Time
raddr net.Addr // remote addr
laddr net.Addr // local addr
lnidx int
err error
dialerr error
wake bool
readon bool
writeon bool
detached bool
closed bool
@ -125,7 +128,7 @@ func serve(events Events, lns []*listener) error {
}
defer syscall.Close(p)
for _, ln := range lns {
if err := internal.AddRead(p, ln.fd); err != nil {
if err := internal.AddRead(p, ln.fd, nil, nil); err != nil {
return err
}
}
@ -144,7 +147,7 @@ func serve(events Events, lns []*listener) error {
return 0
}
id++
c := &unixConn{id: id, opening: true}
c := &unixConn{id: id, opening: true, lnidx: -1}
idconn[id] = c
if timeout != 0 {
c.timeout = time.Now().Add(timeout)
@ -183,13 +186,13 @@ func serve(events Events, lns []*listener) error {
return err
}
lock()
err = internal.AddRead(p, fd)
err = internal.AddRead(p, fd, &c.readon, &c.writeon)
if err != nil {
unlock()
syscall.Close(fd)
return err
}
err = internal.AddWrite(p, fd, &c.writeon)
err = internal.AddWrite(p, fd, &c.readon, &c.writeon)
if err != nil {
unlock()
syscall.Close(fd)
@ -227,7 +230,7 @@ func serve(events Events, lns []*listener) error {
ok = false
} else if !c.wake {
c.wake = true
err = internal.AddWrite(p, c.fd, &c.writeon)
err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon)
}
unlock()
if err != nil {
@ -252,10 +255,16 @@ func serve(events Events, lns []*listener) error {
type fdid struct {
fd, id int
opening bool
laddr net.Addr
raddr net.Addr
lnidx int
}
var fdids []fdid
for _, c := range idconn {
fdids = append(fdids, fdid{c.fd, c.id, c.opening})
if c.opening {
filladdrs(c)
}
fdids = append(fdids, fdid{c.fd, c.id, c.opening, c.laddr, c.raddr, c.lnidx})
}
sort.Slice(fdids, func(i, j int) bool {
return fdids[j].id < fdids[i].id
@ -266,14 +275,12 @@ func serve(events Events, lns []*listener) error {
}
if fdid.opening {
if events.Opened != nil {
laddr := getlocaladdr(fdid.fd)
raddr := getremoteaddr(fdid.fd)
unlock()
events.Opened(fdid.id, Info{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
RemoteAddr: raddr,
AddrIndex: fdid.lnidx,
LocalAddr: fdid.laddr,
RemoteAddr: fdid.raddr,
})
lock()
}
@ -289,6 +296,8 @@ func serve(events Events, lns []*listener) error {
idconn = nil
unlock()
}()
var rsa syscall.Sockaddr
var packet [0xFFFF]byte
var evs = internal.MakeEvents(64)
nextTicker := time.Now()
@ -332,15 +341,14 @@ func serve(events Events, lns []*listener) error {
if _, ok := idconn[c.id]; ok && c.opening {
delete(idconn, c.id)
delete(fdconn, c.fd)
filladdrs(c)
syscall.Close(c.fd)
if events.Opened != nil {
laddr := getlocaladdr(c.fd)
raddr := getremoteaddr(c.fd)
events.Opened(c.id, Info{
Closing: true,
AddrIndex: -1,
LocalAddr: laddr,
RemoteAddr: raddr,
AddrIndex: c.lnidx,
LocalAddr: c.laddr,
RemoteAddr: c.raddr,
})
}
if events.Closed != nil {
@ -383,36 +391,41 @@ func serve(events Events, lns []*listener) error {
goto next
}
if c.opening {
lnidx = -1
goto opened
}
goto read
accept:
nfd, _, err = syscall.Accept(fd)
nfd, rsa, err = syscall.Accept(fd)
if err != nil {
goto next
}
if err = syscall.SetNonblock(nfd, true); err != nil {
goto fail
}
if err = internal.AddRead(p, nfd); err != nil {
id++
c = &unixConn{id: id, fd: nfd,
opening: true,
lnidx: lnidx,
raddr: sockaddrToAddr(rsa),
}
// we have a remote address but the local address yet.
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
id++
c = &unixConn{id: id, fd: nfd}
fdconn[nfd] = c
idconn[id] = c
goto opened
goto next
opened:
filladdrs(c)
if err = internal.AddRead(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
if events.Opened != nil {
laddr := getlocaladdr(fd)
raddr := getremoteaddr(fd)
unlock()
out, c.opts, c.action = events.Opened(c.id, Info{
AddrIndex: lnidx,
LocalAddr: laddr,
RemoteAddr: raddr,
LocalAddr: c.laddr,
RemoteAddr: c.raddr,
})
lock()
if c.opts.TCPKeepAlive > 0 {
@ -444,6 +457,11 @@ func serve(events Events, lns []*listener) error {
}
in = append([]byte{}, packet[:n]...)
}
// if c.laddr == nil {
// // we need the local address and to open the socket
// lsa, _ = syscall.Getsockname(c.fd)
// c.laddr = sock
// }
if events.Data != nil {
unlock()
out, c.action = events.Data(c.id, in)
@ -481,7 +499,7 @@ func serve(events Events, lns []*listener) error {
goto close
}
if err == syscall.EAGAIN {
if err = internal.AddWrite(p, c.fd, &c.writeon); err != nil {
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
goto next
@ -500,7 +518,7 @@ func serve(events Events, lns []*listener) error {
}
if len(c.outbuf)-c.outpos == 0 {
if !c.wake {
if err = internal.DelWrite(p, c.fd, &c.writeon); err != nil {
if err = internal.DelWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
}
@ -508,7 +526,7 @@ func serve(events Events, lns []*listener) error {
goto close
}
} else {
if err = internal.AddWrite(p, c.fd, &c.writeon); err != nil {
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
}
@ -516,7 +534,6 @@ func serve(events Events, lns []*listener) error {
close:
delete(fdconn, c.fd)
delete(idconn, c.id)
//delete(idtimeout, c.id)
if c.action == Detach {
if events.Detached != nil {
c.detached = true
@ -559,33 +576,6 @@ func serve(events Events, lns []*listener) error {
}
}
func getlocaladdr(fd int) net.Addr {
if fd == 0 {
return nil
}
sa, _ := syscall.Getsockname(fd)
return getaddr(sa)
}
func getremoteaddr(fd int) net.Addr {
if fd == 0 {
return nil
}
sa, _ := syscall.Getpeername(fd)
return getaddr(sa)
}
func getaddr(sa syscall.Sockaddr) net.Addr {
switch sa := sa.(type) {
default:
return nil
case *syscall.SockaddrInet4:
return &net.TCPAddr{IP: net.IP(sa.Addr[:]), Port: sa.Port}
case *syscall.SockaddrInet6:
return &net.TCPAddr{IP: net.IP(sa.Addr[:]), Port: sa.Port, Zone: strconv.FormatInt(int64(sa.ZoneId), 10)}
case *syscall.SockaddrUnix:
return &net.UnixAddr{Net: "unix", Name: sa.Name}
}
}
// resolve resolves an evio address and retuns a sockaddr for socket
// connection to external servers.
func resolve(addr string) (sa syscall.Sockaddr, err error) {
@ -626,3 +616,42 @@ func resolve(addr string) (sa syscall.Sockaddr, err error) {
}
return sa, nil
}
func sockaddrToAddr(sa syscall.Sockaddr) net.Addr {
var a net.Addr
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
}
case *syscall.SockaddrInet6:
var zone string
if sa.ZoneId != 0 {
if ifi, err := net.InterfaceByIndex(int(sa.ZoneId)); err == nil {
zone = ifi.Name
}
}
if zone == "" && sa.ZoneId != 0 {
}
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
Zone: zone,
}
case *syscall.SockaddrUnix:
a = &net.UnixAddr{Net: "unix", Name: sa.Name}
}
return a
}
func filladdrs(c *unixConn) {
if c.laddr == nil {
sa, _ := syscall.Getsockname(c.fd)
c.laddr = sockaddrToAddr(sa)
}
if c.raddr == nil {
sa, _ := syscall.Getsockname(c.fd)
c.raddr = sockaddrToAddr(sa)
}
}

View File

@ -40,12 +40,14 @@ func main() {
var aaaa bool
var noparse bool
var unixsocket string
var stdlib bool
flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
flag.IntVar(&port, "port", 8080, "server port")
flag.IntVar(&tlsport, "tlsport", 4443, "tls port")
flag.StringVar(&tlspem, "tlscert", "", "tls pem cert/key file")
flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.Parse()
if os.Getenv("NOPARSE") == "1" {
@ -69,18 +71,25 @@ func main() {
if unixsocket != "" {
log.Printf("http server started at %s", unixsocket)
}
if stdlib {
log.Printf("stdlib")
}
return
}
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
conns[id] = &conn{info: info}
//log.Printf("opened: %d: %s: %s", id, addr.Local.String(), addr.Remote.String())
log.Printf("opened: %d: laddr: %v: raddr: %v", id, info.LocalAddr, info.RemoteAddr)
// println(info.LocalAddr.(*net.TCPAddr).Zone)
// fmt.Printf("%#v\n", info.LocalAddr)
// fmt.Printf("%#v\n", (&net.TCPAddr{IP: make([]byte, 16)}))
return
}
events.Closed = func(id int, err error) (action evio.Action) {
// c := conns[id]
// log.Printf("closed: %d: %s: %s", id, c.addr.Local.String(), c.addr.Remote.String())
c := conns[id]
log.Printf("closed: %d: %s: %s", id, c.info.LocalAddr.String(), c.info.RemoteAddr.String())
delete(conns, id)
return
}
@ -117,8 +126,12 @@ func main() {
c.is.End(data)
return
}
var ssuf string
if stdlib {
ssuf = "-net"
}
// We at least want the single http address.
addrs := []string{fmt.Sprintf("tcp://:%d", port)}
addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}
if tlspem != "" {
// load the cert and key pair from the concat'd pem file.
cer, err := tls.LoadX509KeyPair(tlspem, tlspem)
@ -127,7 +140,7 @@ func main() {
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
// Update the address list to include https.
addrs = append(addrs, fmt.Sprintf("tcp://:%d", tlsport))
addrs = append(addrs, fmt.Sprintf("tcp"+ssuf+"://:%d", tlsport))
// TLS translate the events
events = evio.Translate(events,
@ -145,7 +158,7 @@ func main() {
)
}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix://%s", unixsocket))
addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))
}
// Start serving!
log.Fatal(evio.Serve(events, addrs...))

View File

@ -25,10 +25,13 @@ type conn struct {
func main() {
var port int
var unixsocket string
var srv evio.Server
var stdlib bool
flag.IntVar(&port, "port", 6380, "server port")
flag.StringVar(&unixsocket, "unixsocket", "socket", "unix socket")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.Parse()
var srv evio.Server
var conns = make(map[int]*conn)
var keys = make(map[string]string)
var events evio.Events
@ -38,6 +41,9 @@ func main() {
if unixsocket != "" {
log.Printf("redis server started at %s", unixsocket)
}
if stdlib {
log.Printf("stdlib")
}
return
}
wgetids := make(map[int]time.Time)
@ -48,7 +54,9 @@ func main() {
c.wget = true
}
conns[id] = c
println("opened", id, c.wget)
if c.wget {
log.Printf("opened: %d, wget: %t, laddr: %v, laddr: %v", id, c.wget, info.LocalAddr, info.RemoteAddr)
}
if c.wget {
out = []byte("GET / HTTP/1.0\r\n\r\n")
}
@ -65,7 +73,10 @@ func main() {
return
}
events.Closed = func(id int, err error) (action evio.Action) {
fmt.Printf("closed %d %v\n", id, err)
c := conns[id]
if c.wget {
fmt.Printf("closed %d %v\n", id, err)
}
delete(conns, id)
return
}
@ -169,9 +180,13 @@ func main() {
c.is.End(data)
return
}
addrs := []string{fmt.Sprintf("tcp://:%d", port)}
var ssuf string
if stdlib {
ssuf = "-net"
}
addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix://%s", unixsocket))
addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))
}
err := evio.Serve(events, addrs...)
if err != nil {

View File

@ -11,22 +11,53 @@ import (
"time"
)
func AddWrite(p, fd int, on *bool) error {
if *on {
return nil
func AddRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if *readon {
return nil
}
*readon = true
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ}},
nil, nil)
return err
}
func DelRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if !*readon {
return nil
}
*readon = false
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ}},
nil, nil)
return err
}
func AddWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if *writeon {
return nil
}
*writeon = true
}
*on = true
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE}},
nil, nil)
return err
}
func DelWrite(p, fd int, on *bool) error {
if !*on {
return nil
func DelWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if !*writeon {
return nil
}
*writeon = false
}
*on = false
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE}},
@ -37,13 +68,6 @@ func DelWrite(p, fd int, on *bool) error {
func MakePoll() (p int, err error) {
return syscall.Kqueue()
}
func AddRead(p, fd int) error {
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ}},
nil, nil)
return err
}
func MakeEvents(n int) interface{} {
return make([]syscall.Kevent_t, n)
}

View File

@ -9,21 +9,74 @@ import (
"time"
)
func AddWrite(p, fd int, on *bool) error {
if *on {
return nil
func AddRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if *readon {
return nil
}
*readon = true
}
if writeon == nil || !*writeon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
*on = true
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
})
}
func DelWrite(p, fd int, on *bool) error {
if !*on {
return nil
func DelRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if !*readon {
return nil
}
*readon = false
}
if writeon == nil || !*writeon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_DEL, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
}
func AddWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if *writeon {
return nil
}
*writeon = true
}
if readon == nil || !*readon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
})
}
func DelWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if !*writeon {
return nil
}
*writeon = false
}
if readon == nil || !*readon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_DEL, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
}
*on = false
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
@ -32,12 +85,6 @@ func DelWrite(p, fd int, on *bool) error {
func MakePoll() (p int, err error) {
return syscall.EpollCreate1(0)
}
func AddRead(p, fd int) error {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
func MakeEvents(n int) interface{} {
return make([]syscall.EpollEvent, n)
}