From f36b6b2ca7a77289260c727e47cd3b4d0bf6b8d8 Mon Sep 17 00:00:00 2001 From: Josh Baker Date: Mon, 3 Jul 2017 20:39:18 -0700 Subject: [PATCH] first commit --- LICENSE | 20 +++ README.md | 175 +++++++++++++++++++++++ examples/README.md | 18 +++ examples/echo-server.go | 64 +++++++++ examples/redis-server.go | 179 ++++++++++++++++++++++++ shiny.go | 202 +++++++++++++++++++++++++++ shiny_epoll.go | 285 ++++++++++++++++++++++++++++++++++++++ shiny_kqueue.go | 291 +++++++++++++++++++++++++++++++++++++++ shiny_other.go | 12 ++ 9 files changed, 1246 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 examples/README.md create mode 100644 examples/echo-server.go create mode 100644 examples/redis-server.go create mode 100644 shiny.go create mode 100644 shiny_epoll.go create mode 100644 shiny_kqueue.go create mode 100644 shiny_other.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..92a9728 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2017 Joshua J Baker + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..fac4970 --- /dev/null +++ b/README.md @@ -0,0 +1,175 @@ +# `✨ shiny ✨` + +Shiny is an alternative networking framework for Go that uses I/O multiplexing. +It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than the standard Go [net](https://golang.org/pkg/net/) package. + +This is similar to the way that [libuv](https://github.com/libuv/libuv), [libevent](https://github.com/libevent/libevent), [haproxy](http://www.haproxy.org/), [nginx](http://nginx.org/), [redis](http://redis.io/), and other high performance network servers work. + +The reason for this project is that I want to upgrade the networking for [Tile38](http://github.com/tidwall/tile38) so that it will perform on par with Redis, but without having to interop with Cgo. Early benchmarks are exceeding my expectations. + +**This project is a (sweet) work in progress. The API will likely change between now and Tile38 v2.0 release.** + + +## Features + +- Simple API. Only one entrypoint and four event functions +- Low memory usage +- Very fast single-threaded support +- Support for non-epoll/kqueue operating systems by simulating events with the net package. + + +## Getting Started + +### Installing + +To start using Shiny, install Go and run `go get`: + +```sh +$ go get -u github.com/tidwall/shiny +``` + +This will retrieve the library. + +### Usage + +There's only the one function: + +```go +func Serve(net, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepserving bool), + context interface{}) error +``` + +## Example + +Please check out the [examples](examples) subdirectory for a simplified [redis](examples/redis-server.go) clone and an [echo](examples/echo-server.go) server. + +Here's a basic echo server: + +```go +package main + +import ( + "flag" + "fmt" + "log" + + "github.com/tidwall/shiny" +) + +var shutdown bool +var started bool +var port int + +func main() { + flag.IntVar(&port, "port", 9999, "server port") + flag.Parse() + log.Fatal(shiny.Serve("tcp", fmt.Sprintf(":%d", port), + handle, accept, closed, ticker, nil)) +} + +// handle - the incoming client socket data. +func handle(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + keepopen = true + if string(data) == "shutdown\r\n" { + shutdown = true + } else if string(data) == "quit\r\n" { + keepopen = false + } + return data, keepopen +} + +// accept - a new client socket has opened. +// 'wake' is a function that when called will fire a 'handle' event +// for the specified ID, and is goroutine-safe. +func accept(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + // this is a good place to create a user-defined socket context. + return []byte( + "Welcome to the echo server!\n" + + "Enter 'quit' to close your connection or " + + "'shutdown' to close the server.\n"), true +} + +// closed - a client socket has closed +func closed(id int, err error, ctx interface{}) { + // teardown the socket context here +} + +// ticker - a ticker that fires between 1 and 1/20 of a second +// depending on the traffic. +func ticker(ctx interface{}) (keepserving bool) { + if shutdown { + // do server teardown here + return false + } + if !started { + fmt.Printf("echo server started on port %d\n", port) + started = true + } + // perform various non-socket-io related operation here + return true +} +``` + +Run the example: + +``` +$ go run examples/echo-server.go +``` + +Connect to the server: + +``` +$ telnet localhost 9999 +``` + + +## Performance + +The benchmarks below use pipelining which allows for combining multiple Redis commands into a single packet. + +**Redis** + +``` +$ redis-server --port 6379 --appendonly no +``` +``` +redis-benchmark -p 6379 -t ping,set,get -q -P 128 +PING_INLINE: 961538.44 requests per second +PING_BULK: 1960784.38 requests per second +SET: 943396.25 requests per second +GET: 1369863.00 requests per second +``` + +**Shiny** + +``` +$ go run examples/redis-server.go --port 6380 --appendonly no +``` +``` +redis-benchmark -p 6380 -t ping,set,get -q -P 128 +PING_INLINE: 3846153.75 requests per second +PING_BULK: 4166666.75 requests per second +SET: 3703703.50 requests per second +GET: 3846153.75 requests per second +``` + +*Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7* + +## Contact + +Josh Baker [@tidwall](http://twitter.com/tidwall) + +## License + +Shiny source code is available under the MIT [License](/LICENSE). + diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..6787ef2 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,18 @@ +# `✨ shiny ✨ examples` + +## redis-server + +``` +go run examples/redis-server.go [--port int] [--appendonly yes/no] +``` + +- `GET`, `SET`, `DEL`, `QUIT`, `PING`, `SHUTDOWN` commands. +- `--appendonly yes` option for disk persistence. +- Compatible with the [redis-cli](https://redis.io/topics/rediscli) and all [redis clients](https://redis.io/clients). + + +## echo-server + +``` +go run examples/echo-server.go [--port int] +``` diff --git a/examples/echo-server.go b/examples/echo-server.go new file mode 100644 index 0000000..9141596 --- /dev/null +++ b/examples/echo-server.go @@ -0,0 +1,64 @@ +package main + +import ( + "flag" + "fmt" + + "github.com/tidwall/shiny" +) + +func main() { + var port int + flag.IntVar(&port, "port", 9999, "server port") + flag.Parse() + + var shutdown bool + var started bool + fmt.Println(shiny.Serve("tcp", fmt.Sprintf(":%d", port), + // handle - the incoming client socket data. + func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + keepopen = true + if string(data) == "shutdown\r\n" { + shutdown = true + } else if string(data) == "quit\r\n" { + keepopen = false + } + return data, keepopen + }, + // accept - a new client socket has opened. + // 'wake' is a function that when called will fire a 'handle' event + // for the specified ID, and is goroutine-safe. + func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + // this is a good place to create a user-defined socket context. + return []byte( + "Welcome to the echo server!\n" + + "Enter 'quit' to close your connection or " + + "'shutdown' to close the server.\n"), true + }, + // closed - a client socket has closed + func(id int, err error, ctx interface{}) { + // teardown the socket context here + }, + // ticker - a ticker that fires between 1 and 1/20 of a second + // depending on the traffic. + func(ctx interface{}) (keepserveropen bool) { + if shutdown { + // do server teardown here + return false + } + if !started { + fmt.Printf("echo server started on port %d\n", port) + started = true + } + // perform various non-socket-io related operation here + return true + }, + // an optional user-defined context + nil)) +} diff --git a/examples/redis-server.go b/examples/redis-server.go new file mode 100644 index 0000000..bd4c189 --- /dev/null +++ b/examples/redis-server.go @@ -0,0 +1,179 @@ +package main + +import ( + "flag" + "fmt" + "io" + "log" + "os" + "strings" + + "github.com/tidwall/redcon" + "github.com/tidwall/shiny" +) + +func main() { + var port int + var appendonly string + flag.IntVar(&port, "port", 6380, "server port") + flag.StringVar(&appendonly, "appendonly", "no", "use appendonly file (yes or no)") + flag.Parse() + + var resp []byte + var aofw []byte + var args [][]byte + var shutdown bool + var started bool + var bufs = make(map[int][]byte) + var keys = make(map[string]string) + + var f *os.File + if appendonly == "yes" { + var err error + f, err = os.OpenFile("appendonly.aof", os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + log.Fatal(err) + } + defer f.Close() + + rd := redcon.NewReader(f) + for { + cmd, err := rd.ReadCommand() + if err == io.EOF { + break + } else if err != nil { + log.Fatal(err) + } + switch strings.ToUpper(string(cmd.Args[0])) { + default: + log.Fatal("bad aof") + case "SET": + keys[string(cmd.Args[1])] = string(cmd.Args[2]) + } + } + } + log.Fatal(shiny.Serve("tcp", fmt.Sprintf(":%d", port), + func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + buf := bufs[id] + if len(buf) > 0 { + data = append(buf, data...) + } + keepopen = true + resp = resp[:0] + aofw = aofw[:0] + var complete bool + var err error + for { + complete, args, _, data, err = redcon.ReadNextCommand(data, args[:0]) + if err != nil { + keepopen = false + resp = redcon.AppendError(resp, err.Error()) + break + } + if !complete { + break + } + switch strings.ToUpper(string(args[0])) { + default: + resp = redcon.AppendError(resp, fmt.Sprintf("ERR unknown command '%s'", args[0])) + case "PING": + if len(args) > 2 { + resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0])) + continue + } else if len(args) == 1 { + resp = redcon.AppendString(resp, "PONG") + } else { + resp = redcon.AppendBulk(resp, args[1]) + } + case "QUIT": + keepopen = false + resp = redcon.AppendOK(resp) + case "SHUTDOWN": + shutdown = true + keepopen = false + resp = redcon.AppendOK(resp) + case "SET": + if len(args) != 3 { + resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0])) + continue + } + keys[string(args[1])] = string(args[2]) + resp = redcon.AppendOK(resp) + if appendonly == "yes" { + // create the append only entry + aofw = redcon.AppendArray(aofw, 3) + aofw = redcon.AppendBulkString(aofw, "SET") + aofw = redcon.AppendBulk(aofw, args[1]) + aofw = redcon.AppendBulk(aofw, args[2]) + } + case "GET": + if len(args) != 2 { + resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0])) + continue + } + val, ok := keys[string(args[1])] + if !ok { + resp = redcon.AppendNull(resp) + } else { + resp = redcon.AppendBulkString(resp, val) + } + case "DEL": + if len(args) < 2 { + resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0])) + continue + } + var n int64 + for i := 1; i < len(args); i++ { + if _, ok := keys[string(args[i])]; ok { + delete(keys, string(args[i])) + n++ + } + } + resp = redcon.AppendInt(resp, n) + } + } + if len(data) > 0 { + bufs[id] = append(buf[:0], data...) + } else if len(buf) > 0 { + bufs[id] = buf[:0] + } + if len(aofw) > 0 { + if _, err := f.Write(aofw); err != nil { + log.Fatal(err) + } + } + return resp, keepopen + }, + // accept - a new client socket has opened + func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) { + if shutdown { + return nil, false + } + // create a new socket context here + return nil, true + }, + // closed - a client socket has closed + func(id int, err error, ctx interface{}) { + // teardown the socket context here + delete(bufs, id) + }, + // ticker - a ticker that fires between 1 and 1/20 of a second + // depending on the traffic. + func(ctx interface{}) (keepserveropen bool) { + if shutdown { + // do server teardown here + return false + } + if !started { + fmt.Printf("redis(ish) server started on port %d\n", port) + started = true + } + // perform various non-socket-io related operation here + return true + }, + // an optional user-defined context + nil)) +} diff --git a/shiny.go b/shiny.go new file mode 100644 index 0000000..d1064ab --- /dev/null +++ b/shiny.go @@ -0,0 +1,202 @@ +package shiny + +import ( + "io" + "net" + "strings" + "sync" + "time" +) + +// Serve +func Serve(net, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepopen bool), + context interface{}) error { + if strings.HasSuffix(net, "-compat") { + net = net[:len(net)-len("-compat")] + } else { + switch net { + case "tcp", "tcp4", "tcp6": + return eventServe(net, addr, handle, accept, closed, ticker, context) + } + } + return compatServe(net, addr, handle, accept, closed, ticker, context) +} + +func compatServe(net_, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepopen bool), + ctx interface{}) error { + if handle == nil { + handle = func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) { + return nil, true + } + } + if accept == nil { + accept = func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) { + return nil, true + } + } + if closed == nil { + closed = func(id int, err error, ctx interface{}) {} + } + if ticker == nil { + ticker = func(ctx interface{}) (keepopen bool) { return true } + } + + ln, err := net.Listen(net_, addr) + if err != nil { + return err + } + defer ln.Close() + if !ticker(ctx) { + return nil + } + var mu sync.Mutex + var conns = make(map[net.Conn]bool) + defer func() { + mu.Lock() + for c := range conns { + c.Close() + } + mu.Unlock() + }() + var id int + var done bool + var shutdown bool + donech := make(chan bool) + var lastwrite time.Time + lasttick := time.Now() + go func() { + t := time.NewTicker(time.Second / 20) + defer t.Stop() + for { + select { + case <-donech: + return + case <-t.C: + now := time.Now() + if now.Sub(lastwrite) < time.Second || now.Sub(lasttick) >= time.Second { + mu.Lock() + if !done && !ticker(ctx) { + shutdown = true + ln.Close() + } + lasttick = now + mu.Unlock() + } + } + } + }() + defer func() { + mu.Lock() + done = true + mu.Unlock() + donech <- true + }() + for { + c, err := ln.Accept() + if err != nil { + mu.Lock() + if shutdown { + mu.Unlock() + return nil + } + mu.Unlock() + return err + } + id++ + func() { + wake := func(c net.Conn, id int) func() { + return func() { + go func() { + mu.Lock() + defer mu.Unlock() + send, keepopen := handle(id, nil, ctx) + if len(send) > 0 { + lastwrite = time.Now() + if _, err := c.Write(send); err != nil { + c.Close() + return + } + } + if !keepopen { + c.Close() + return + } + }() + } + }(c, id) + if !func() bool { + mu.Lock() + defer mu.Unlock() + send, keepopen := accept(id, c.RemoteAddr().String(), wake, ctx) + if len(send) > 0 { + lastwrite = time.Now() + if _, err := c.Write(send); err != nil { + c.Close() + return false + } + } + if !keepopen { + c.Close() + return false + } + conns[c] = true + return true + }() { + return + } + go func(id int, c net.Conn) { + var ferr error + defer func() { + mu.Lock() + defer mu.Unlock() + if ferr == io.EOF { + ferr = nil + } + if operr, ok := ferr.(*net.OpError); ok { + ferr = operr.Err + switch ferr.Error() { + case "use of closed network connection", + "read: connection reset by peer": + ferr = nil + } + } + delete(conns, c) + closed(id, ferr, ctx) + c.Close() + }() + packet := make([]byte, 4096) + for { + n, err := c.Read(packet) + if err != nil { + ferr = err + return + } + func() { + mu.Lock() + defer mu.Unlock() + send, keepopen := handle(id, packet[:n], ctx) + if len(send) > 0 { + lastwrite = time.Now() + if _, err := c.Write(send); err != nil { + c.Close() + return + } + } + if !keepopen { + c.Close() + return + } + }() + } + }(id, c) + }() + } +} diff --git a/shiny_epoll.go b/shiny_epoll.go new file mode 100644 index 0000000..1407eba --- /dev/null +++ b/shiny_epoll.go @@ -0,0 +1,285 @@ +//+build linux + +package shiny + +import ( + "net" + "strconv" + "strings" + "sync" + "syscall" + "time" + "unsafe" +) + +type conn struct { + fd int + id int + addr string + err error + hup bool +} + +func eventServe(net_, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepopen bool), + ctx interface{}) error { + + lna, err := net.Listen(net_, addr) + if err != nil { + return err + } + defer lna.Close() + ln := lna.(*net.TCPListener) + f, err := ln.File() + if err != nil { + return err + } + defer f.Close() + ln.Close() + sfd := int(f.Fd()) + + q, err := syscall.EpollCreate1(0) + if err != nil { + return err + } + defer syscall.Close(q) + event := syscall.EpollEvent{ + Fd: int32(sfd), + Events: syscall.EPOLLIN, + } + if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, sfd, &event); err != nil { + return err + } + + var conns = make(map[int]*conn) + closeAndRemove := func(c *conn) { + if c.hup { + return + } + c.hup = true + syscall.Close(c.fd) + delete(conns, c.fd) + closed(c.id, c.err, ctx) + } + + defer func() { + for _, c := range conns { + closeAndRemove(c) + } + }() + + var lastwriteorwake time.Time + write := func(nfd int, send []byte) (err error) { + res1, res2, errn := syscall.Syscall(syscall.SYS_WRITE, uintptr(nfd), + uintptr(unsafe.Pointer(&send[0])), uintptr(len(send))) + if errn != 0 { + _, _ = res1, res2 + if errn == syscall.EAGAIN { + // This should not happen because we are running the + // server in blocking mode and withoug socket timeouts. + panic("EAGAIN") + } + return errn + } + lastwriteorwake = time.Now() + return nil + } + + // add wake event + var wakemu sync.Mutex + var wakeable = true + var wakers = make(map[int]int) // FD->ID map + var wakersarr []int + var wakezero = []byte{0, 1, 2, 3, 4, 5, 6, 7} + // SYS_EVENTFD is not implemented in Go yet. + // SYS_EVENTFD = 284 + // SYS_EVENTFD2 = 290 + r1, _, errn := syscall.Syscall(284, 0, 0, 0) + if errn != 0 { + return errn + } + var efd = int(r1) + defer func() { + wakemu.Lock() + wakeable = false + syscall.Close(efd) + wakemu.Unlock() + }() + + event = syscall.EpollEvent{ + Fd: int32(efd), + Events: syscall.EPOLLIN, + } + if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, efd, &event); err != nil { + return err + } + + shandle := func(c *conn, data []byte) { + send, keepalive := handle(c.id, data, ctx) + if len(send) > 0 { + if err := write(c.fd, send); err != nil { + c.err = err + closeAndRemove(c) + return + } + } + if !keepalive { + closeAndRemove(c) + } + } + + var lastts time.Time + if ticker != nil { + if !ticker(ctx) { + syscall.Close(q) + return nil + } + lastts = time.Now() + } + + var id int + var packet [65535]byte + var evs [32]syscall.EpollEvent + for { + var ts int + if time.Since(lastwriteorwake) < time.Second { + ts = 50 + } else { + ts = 1000 + } + n, err := syscall.EpollWait(q, evs[:], ts) + if err != nil { + if err == syscall.EINTR { + continue + } + break + } + for i := 0; ; i++ { + now := time.Now() + if now.Sub(lastts) >= time.Second/20 { + if !ticker(ctx) { + syscall.Close(q) + break + } + lastts = now + } + if i >= n { + break + } + if evs[i].Fd == int32(sfd) { + nfd, sa, err := syscall.Accept(sfd) + if err != nil { + continue + } + var addr string + var port int + switch sa := sa.(type) { + default: + case *syscall.SockaddrInet4: + addr = net.IP(sa.Addr[:]).String() + port = sa.Port + case *syscall.SockaddrInet6: + addr = net.IP(sa.Addr[:]).String() + port = sa.Port + } + var res []byte + if strings.Contains(addr, ":") { + res = append(append(append(res, '['), addr...), ']', ':') + } else { + res = append(append(res, addr...), ':') + } + addr = string(strconv.AppendInt(res, int64(port), 10)) + + id++ + wake := func(nfd, id int) func() { + return func() { + // NOTE: This is the one and only entrypoint that is + // not thread-safe. Use a mutex. + wakemu.Lock() + if wakeable { + wakers[nfd] = id + syscall.Write(efd, wakezero[:]) + } + wakemu.Unlock() + } + }(nfd, id) + + send, keepalive := accept(id, addr, wake, ctx) + if !keepalive { + syscall.Close(nfd) + continue + } + + // 500 second keepalive + kerr1 := syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1) + kerr2 := syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 500) + kerr3 := syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 500) + if kerr1 != nil || kerr2 != nil || kerr3 != nil { + //fmt.Printf("%v %v %v\n", kerr1, kerr2, kerr3) + } + + // add read + event := syscall.EpollEvent{ + Fd: int32(nfd), + Events: syscall.EPOLLIN | syscall.EPOLLRDHUP, // | (syscall.EPOLLET & 0xffffffff), + } + if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, nfd, &event); err != nil { + syscall.Close(nfd) + continue + } + c := &conn{fd: nfd, addr: addr, id: id} + conns[nfd] = c + if len(send) > 0 { + if err := write(c.fd, send); err != nil { + c.err = err + closeAndRemove(c) + continue + } + } + } else if evs[i].Fd == int32(efd) { + // NOTE: This is a wakeup call. Use a mutex when accessing + // the `wakers` field. + wakersarr = wakersarr[:0] + wakemu.Lock() + for nfd, id := range wakers { + wakersarr = append(wakersarr, nfd, id) + } + wakers = make(map[int]int) + var data [8]byte + _, err := syscall.Read(efd, data[:]) + wakemu.Unlock() + // exit the lock and read from the array + if err != nil { + return err + } + for i := 0; i < len(wakersarr); i += 2 { + nfd := wakersarr[i] + id := wakersarr[i+1] + c := conns[nfd] + if c != nil && c.id == id { + shandle(c, nil) + } + } + lastwriteorwake = time.Now() + } else if evs[i].Events&syscall.EPOLLRDHUP != 0 { + closeAndRemove(conns[int(evs[i].Fd)]) + } else { + c := conns[int(evs[i].Fd)] + res, _, errn := syscall.Syscall(syscall.SYS_READ, uintptr(c.fd), + uintptr(unsafe.Pointer(&packet[0])), uintptr(len(packet))) + if errn != 0 || res == 0 { + if errn != 0 { + c.err = errn + } + closeAndRemove(c) + continue + } + shandle(c, packet[:res]) + } + } + } + return nil +} diff --git a/shiny_kqueue.go b/shiny_kqueue.go new file mode 100644 index 0000000..854aa61 --- /dev/null +++ b/shiny_kqueue.go @@ -0,0 +1,291 @@ +//+build darwin freebsd + +package shiny + +import ( + "fmt" + "net" + "runtime" + "strconv" + "strings" + "sync" + "syscall" + "time" + "unsafe" +) + +type conn struct { + fd int + id int + addr string + err error +} + +func eventServe(net_, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepopen bool), + ctx interface{}) error { + + lna, err := net.Listen(net_, addr) + if err != nil { + return err + } + defer lna.Close() + ln := lna.(*net.TCPListener) + f, err := ln.File() + if err != nil { + return err + } + defer f.Close() + ln.Close() + sfd := int(f.Fd()) + q, err := syscall.Kqueue() + if err != nil { + return err + } + defer syscall.Close(q) + ev := []syscall.Kevent_t{{ + Ident: uint64(sfd), + Flags: syscall.EV_ADD, + Filter: syscall.EVFILT_READ, + }} + if _, err := syscall.Kevent(q, ev, nil, nil); err != nil { + return err + } + var conns = make(map[int]*conn) + defer func() { + for _, conn := range conns { + syscall.Close(conn.fd) + closed(conn.id, conn.err, ctx) + } + }() + + var lastwriteorwake time.Time + write := func(nfd int, send []byte) (err error) { + res1, res2, errn := syscall.Syscall(syscall.SYS_WRITE, uintptr(nfd), + uintptr(unsafe.Pointer(&send[0])), uintptr(len(send))) + if errn != 0 { + _, _ = res1, res2 + if errn == syscall.EAGAIN { + // This should not happen because we are running the + // server in blocking mode and withoug socket timeouts. + panic("EAGAIN") + } + return errn + } + lastwriteorwake = time.Now() + return nil + } + + // add wake event + var wakemu sync.Mutex + var wakeable = true + var wakers = make(map[int]int) // FD->ID map + var wakersarr []int + defer func() { + wakemu.Lock() + wakeable = false + wakemu.Unlock() + }() + ev = []syscall.Kevent_t{{ + Ident: 0, + Flags: syscall.EV_ADD, + Filter: syscall.EVFILT_USER, + }} + if _, err := syscall.Kevent(q, ev, nil, nil); err != nil { + return err + } + + shandle := func(c *conn, data []byte) { + send, keepalive := handle(c.id, data, ctx) + if len(send) > 0 { + if err := write(c.fd, send); err != nil { + c.err = err + syscall.Close(c.fd) + return + } + } + if !keepalive { + syscall.Close(c.fd) + } + } + + var lastts time.Time + if ticker != nil { + if !ticker(ctx) { + syscall.Close(q) + return nil + } + lastts = time.Now() + } + + var id int + var packet [65535]byte + var evs [32]syscall.Kevent_t + for { + var ts syscall.Timespec + if time.Since(lastwriteorwake) < time.Second { + ts = syscall.Timespec{Sec: 0, Nsec: int64(time.Second / 20)} + } else { + ts = syscall.Timespec{Sec: 1, Nsec: 0} + } + n, err := syscall.Kevent(q, nil, evs[:], &ts) + if err != nil { + if err == syscall.EINTR { + continue + } + break + } + for i := 0; ; i++ { + now := time.Now() + if now.Sub(lastts) >= time.Second/20 { + if !ticker(ctx) { + syscall.Close(q) + break + } + lastts = now + } + if i >= n { + break + } + if evs[i].Flags&syscall.EV_EOF != 0 { + c := conns[int(evs[i].Ident)] + syscall.Close(int(evs[i].Ident)) + delete(conns, int(evs[i].Ident)) + if c != nil { + closed(c.id, c.err, ctx) + } + } else if evs[i].Ident == uint64(sfd) { + nfd, sa, err := syscall.Accept(sfd) + if err != nil { + continue + } + var addr string + var port int + switch sa := sa.(type) { + default: + case *syscall.SockaddrInet4: + addr = net.IP(sa.Addr[:]).String() + port = sa.Port + case *syscall.SockaddrInet6: + addr = net.IP(sa.Addr[:]).String() + port = sa.Port + } + var res []byte + if strings.Contains(addr, ":") { + res = append(append(append(res, '['), addr...), ']', ':') + } else { + res = append(append(res, addr...), ':') + } + addr = string(strconv.AppendInt(res, int64(port), 10)) + + id++ + wake := func(nfd, id int) func() { + return func() { + // NOTE: This is the one and only entrypoint that is + // not thread-safe. Use a mutex. + wakemu.Lock() + ev := []syscall.Kevent_t{{ + Ident: 0, + Flags: syscall.EV_ENABLE, + Filter: syscall.EVFILT_USER, + Fflags: syscall.NOTE_TRIGGER, + }} + if wakeable { + wakers[nfd] = id + syscall.Kevent(q, ev, nil, nil) + } + wakemu.Unlock() + } + }(nfd, id) + + send, keepalive := accept(id, addr, wake, ctx) + if !keepalive { + syscall.Close(nfd) + continue + } + + // 500 second keepalive + var kerr1, kerr2, kerr3 error + if runtime.GOOS == "darwin" { + kerr1 = syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, 0x8, 1) + kerr2 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x101, 500) + kerr3 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x10, 500) + } else { + // freebsd + kerr1 = syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, 0x8, 1) + kerr2 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x200, 500) + kerr3 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x100, 500) + } + if kerr1 != nil || kerr2 != nil || kerr3 != nil { + fmt.Printf("%v %v %v\n", kerr1, kerr2, kerr3) + } + + // add read + ev := []syscall.Kevent_t{{ + Ident: uint64(nfd), + Flags: syscall.EV_ADD, + Filter: syscall.EVFILT_READ, + }} + if _, err := syscall.Kevent(q, ev, nil, nil); err != nil { + syscall.Close(nfd) + continue + } + c := &conn{fd: nfd, addr: addr, id: id} + conns[nfd] = c + if len(send) > 0 { + if err := write(c.fd, send); err != nil { + c.err = err + syscall.Close(c.fd) + continue + } + } + } else if evs[i].Ident == 0 { + // NOTE: This is a wakeup call. Use a mutex when accessing + // the `wakers` field. + wakersarr = wakersarr[:0] + wakemu.Lock() + for nfd, id := range wakers { + wakersarr = append(wakersarr, nfd, id) + } + wakers = make(map[int]int) + ev := []syscall.Kevent_t{{ + Ident: 0, + Flags: syscall.EV_DISABLE, + Filter: syscall.EVFILT_USER, + Fflags: syscall.NOTE_TRIGGER, + }} + _, err := syscall.Kevent(q, ev, nil, nil) + wakemu.Unlock() + // exit the lock and read from the array + if err != nil { + return err + } + for i := 0; i < len(wakersarr); i += 2 { + nfd := wakersarr[i] + id := wakersarr[i+1] + c := conns[nfd] + if c != nil && c.id == id { + shandle(c, nil) + } + } + lastwriteorwake = time.Now() + } else { + c := conns[int(evs[i].Ident)] + res, _, errn := syscall.Syscall(syscall.SYS_READ, uintptr(c.fd), + uintptr(unsafe.Pointer(&packet[0])), uintptr(len(packet))) + if errn != 0 || res == 0 { + if errn != 0 { + c.err = errn + } + syscall.Close(c.fd) + continue + } + shandle(c, packet[:res]) + } + } + } + return nil +} diff --git a/shiny_other.go b/shiny_other.go new file mode 100644 index 0000000..57c9ecd --- /dev/null +++ b/shiny_other.go @@ -0,0 +1,12 @@ +//+build !darwin,!freebsd,!linux + +package shiny + +func eventServe(net, addr string, + handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool), + accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool), + closed func(id int, err error, ctx interface{}), + ticker func(ctx interface{}) (keepopen bool), + context interface{}) error { + return compatServe(net, addr, handle, accept, closed, ticker, context) +}