first commit

This commit is contained in:
Josh Baker 2017-07-03 20:39:18 -07:00
commit f36b6b2ca7
9 changed files with 1246 additions and 0 deletions

20
LICENSE Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2017 Joshua J Baker
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

175
README.md Normal file
View File

@ -0,0 +1,175 @@
# `✨ shiny ✨`
Shiny is an alternative networking framework for Go that uses I/O multiplexing.
It makes direct [epoll](https://en.wikipedia.org/wiki/Epoll) and [kqueue](https://en.wikipedia.org/wiki/Kqueue) syscalls rather than the standard Go [net](https://golang.org/pkg/net/) package.
This is similar to the way that [libuv](https://github.com/libuv/libuv), [libevent](https://github.com/libevent/libevent), [haproxy](http://www.haproxy.org/), [nginx](http://nginx.org/), [redis](http://redis.io/), and other high performance network servers work.
The reason for this project is that I want to upgrade the networking for [Tile38](http://github.com/tidwall/tile38) so that it will perform on par with Redis, but without having to interop with Cgo. Early benchmarks are exceeding my expectations.
**This project is a (sweet) work in progress. The API will likely change between now and Tile38 v2.0 release.**
## Features
- Simple API. Only one entrypoint and four event functions
- Low memory usage
- Very fast single-threaded support
- Support for non-epoll/kqueue operating systems by simulating events with the net package.
## Getting Started
### Installing
To start using Shiny, install Go and run `go get`:
```sh
$ go get -u github.com/tidwall/shiny
```
This will retrieve the library.
### Usage
There's only the one function:
```go
func Serve(net, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepserving bool),
context interface{}) error
```
## Example
Please check out the [examples](examples) subdirectory for a simplified [redis](examples/redis-server.go) clone and an [echo](examples/echo-server.go) server.
Here's a basic echo server:
```go
package main
import (
"flag"
"fmt"
"log"
"github.com/tidwall/shiny"
)
var shutdown bool
var started bool
var port int
func main() {
flag.IntVar(&port, "port", 9999, "server port")
flag.Parse()
log.Fatal(shiny.Serve("tcp", fmt.Sprintf(":%d", port),
handle, accept, closed, ticker, nil))
}
// handle - the incoming client socket data.
func handle(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
keepopen = true
if string(data) == "shutdown\r\n" {
shutdown = true
} else if string(data) == "quit\r\n" {
keepopen = false
}
return data, keepopen
}
// accept - a new client socket has opened.
// 'wake' is a function that when called will fire a 'handle' event
// for the specified ID, and is goroutine-safe.
func accept(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
// this is a good place to create a user-defined socket context.
return []byte(
"Welcome to the echo server!\n" +
"Enter 'quit' to close your connection or " +
"'shutdown' to close the server.\n"), true
}
// closed - a client socket has closed
func closed(id int, err error, ctx interface{}) {
// teardown the socket context here
}
// ticker - a ticker that fires between 1 and 1/20 of a second
// depending on the traffic.
func ticker(ctx interface{}) (keepserving bool) {
if shutdown {
// do server teardown here
return false
}
if !started {
fmt.Printf("echo server started on port %d\n", port)
started = true
}
// perform various non-socket-io related operation here
return true
}
```
Run the example:
```
$ go run examples/echo-server.go
```
Connect to the server:
```
$ telnet localhost 9999
```
## Performance
The benchmarks below use pipelining which allows for combining multiple Redis commands into a single packet.
**Redis**
```
$ redis-server --port 6379 --appendonly no
```
```
redis-benchmark -p 6379 -t ping,set,get -q -P 128
PING_INLINE: 961538.44 requests per second
PING_BULK: 1960784.38 requests per second
SET: 943396.25 requests per second
GET: 1369863.00 requests per second
```
**Shiny**
```
$ go run examples/redis-server.go --port 6380 --appendonly no
```
```
redis-benchmark -p 6380 -t ping,set,get -q -P 128
PING_INLINE: 3846153.75 requests per second
PING_BULK: 4166666.75 requests per second
SET: 3703703.50 requests per second
GET: 3846153.75 requests per second
```
*Running on a MacBook Pro 15" 2.8 GHz Intel Core i7 using Go 1.7*
## Contact
Josh Baker [@tidwall](http://twitter.com/tidwall)
## License
Shiny source code is available under the MIT [License](/LICENSE).

18
examples/README.md Normal file
View File

@ -0,0 +1,18 @@
# `✨ shiny ✨ examples`
## redis-server
```
go run examples/redis-server.go [--port int] [--appendonly yes/no]
```
- `GET`, `SET`, `DEL`, `QUIT`, `PING`, `SHUTDOWN` commands.
- `--appendonly yes` option for disk persistence.
- Compatible with the [redis-cli](https://redis.io/topics/rediscli) and all [redis clients](https://redis.io/clients).
## echo-server
```
go run examples/echo-server.go [--port int]
```

64
examples/echo-server.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"flag"
"fmt"
"github.com/tidwall/shiny"
)
func main() {
var port int
flag.IntVar(&port, "port", 9999, "server port")
flag.Parse()
var shutdown bool
var started bool
fmt.Println(shiny.Serve("tcp", fmt.Sprintf(":%d", port),
// handle - the incoming client socket data.
func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
keepopen = true
if string(data) == "shutdown\r\n" {
shutdown = true
} else if string(data) == "quit\r\n" {
keepopen = false
}
return data, keepopen
},
// accept - a new client socket has opened.
// 'wake' is a function that when called will fire a 'handle' event
// for the specified ID, and is goroutine-safe.
func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
// this is a good place to create a user-defined socket context.
return []byte(
"Welcome to the echo server!\n" +
"Enter 'quit' to close your connection or " +
"'shutdown' to close the server.\n"), true
},
// closed - a client socket has closed
func(id int, err error, ctx interface{}) {
// teardown the socket context here
},
// ticker - a ticker that fires between 1 and 1/20 of a second
// depending on the traffic.
func(ctx interface{}) (keepserveropen bool) {
if shutdown {
// do server teardown here
return false
}
if !started {
fmt.Printf("echo server started on port %d\n", port)
started = true
}
// perform various non-socket-io related operation here
return true
},
// an optional user-defined context
nil))
}

179
examples/redis-server.go Normal file
View File

@ -0,0 +1,179 @@
package main
import (
"flag"
"fmt"
"io"
"log"
"os"
"strings"
"github.com/tidwall/redcon"
"github.com/tidwall/shiny"
)
func main() {
var port int
var appendonly string
flag.IntVar(&port, "port", 6380, "server port")
flag.StringVar(&appendonly, "appendonly", "no", "use appendonly file (yes or no)")
flag.Parse()
var resp []byte
var aofw []byte
var args [][]byte
var shutdown bool
var started bool
var bufs = make(map[int][]byte)
var keys = make(map[string]string)
var f *os.File
if appendonly == "yes" {
var err error
f, err = os.OpenFile("appendonly.aof", os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
log.Fatal(err)
}
defer f.Close()
rd := redcon.NewReader(f)
for {
cmd, err := rd.ReadCommand()
if err == io.EOF {
break
} else if err != nil {
log.Fatal(err)
}
switch strings.ToUpper(string(cmd.Args[0])) {
default:
log.Fatal("bad aof")
case "SET":
keys[string(cmd.Args[1])] = string(cmd.Args[2])
}
}
}
log.Fatal(shiny.Serve("tcp", fmt.Sprintf(":%d", port),
func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
buf := bufs[id]
if len(buf) > 0 {
data = append(buf, data...)
}
keepopen = true
resp = resp[:0]
aofw = aofw[:0]
var complete bool
var err error
for {
complete, args, _, data, err = redcon.ReadNextCommand(data, args[:0])
if err != nil {
keepopen = false
resp = redcon.AppendError(resp, err.Error())
break
}
if !complete {
break
}
switch strings.ToUpper(string(args[0])) {
default:
resp = redcon.AppendError(resp, fmt.Sprintf("ERR unknown command '%s'", args[0]))
case "PING":
if len(args) > 2 {
resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0]))
continue
} else if len(args) == 1 {
resp = redcon.AppendString(resp, "PONG")
} else {
resp = redcon.AppendBulk(resp, args[1])
}
case "QUIT":
keepopen = false
resp = redcon.AppendOK(resp)
case "SHUTDOWN":
shutdown = true
keepopen = false
resp = redcon.AppendOK(resp)
case "SET":
if len(args) != 3 {
resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0]))
continue
}
keys[string(args[1])] = string(args[2])
resp = redcon.AppendOK(resp)
if appendonly == "yes" {
// create the append only entry
aofw = redcon.AppendArray(aofw, 3)
aofw = redcon.AppendBulkString(aofw, "SET")
aofw = redcon.AppendBulk(aofw, args[1])
aofw = redcon.AppendBulk(aofw, args[2])
}
case "GET":
if len(args) != 2 {
resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0]))
continue
}
val, ok := keys[string(args[1])]
if !ok {
resp = redcon.AppendNull(resp)
} else {
resp = redcon.AppendBulkString(resp, val)
}
case "DEL":
if len(args) < 2 {
resp = redcon.AppendError(resp, fmt.Sprintf("ERR wrong number of arguments for '%s' command", args[0]))
continue
}
var n int64
for i := 1; i < len(args); i++ {
if _, ok := keys[string(args[i])]; ok {
delete(keys, string(args[i]))
n++
}
}
resp = redcon.AppendInt(resp, n)
}
}
if len(data) > 0 {
bufs[id] = append(buf[:0], data...)
} else if len(buf) > 0 {
bufs[id] = buf[:0]
}
if len(aofw) > 0 {
if _, err := f.Write(aofw); err != nil {
log.Fatal(err)
}
}
return resp, keepopen
},
// accept - a new client socket has opened
func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) {
if shutdown {
return nil, false
}
// create a new socket context here
return nil, true
},
// closed - a client socket has closed
func(id int, err error, ctx interface{}) {
// teardown the socket context here
delete(bufs, id)
},
// ticker - a ticker that fires between 1 and 1/20 of a second
// depending on the traffic.
func(ctx interface{}) (keepserveropen bool) {
if shutdown {
// do server teardown here
return false
}
if !started {
fmt.Printf("redis(ish) server started on port %d\n", port)
started = true
}
// perform various non-socket-io related operation here
return true
},
// an optional user-defined context
nil))
}

202
shiny.go Normal file
View File

@ -0,0 +1,202 @@
package shiny
import (
"io"
"net"
"strings"
"sync"
"time"
)
// Serve
func Serve(net, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepopen bool),
context interface{}) error {
if strings.HasSuffix(net, "-compat") {
net = net[:len(net)-len("-compat")]
} else {
switch net {
case "tcp", "tcp4", "tcp6":
return eventServe(net, addr, handle, accept, closed, ticker, context)
}
}
return compatServe(net, addr, handle, accept, closed, ticker, context)
}
func compatServe(net_, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepopen bool),
ctx interface{}) error {
if handle == nil {
handle = func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool) {
return nil, true
}
}
if accept == nil {
accept = func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool) {
return nil, true
}
}
if closed == nil {
closed = func(id int, err error, ctx interface{}) {}
}
if ticker == nil {
ticker = func(ctx interface{}) (keepopen bool) { return true }
}
ln, err := net.Listen(net_, addr)
if err != nil {
return err
}
defer ln.Close()
if !ticker(ctx) {
return nil
}
var mu sync.Mutex
var conns = make(map[net.Conn]bool)
defer func() {
mu.Lock()
for c := range conns {
c.Close()
}
mu.Unlock()
}()
var id int
var done bool
var shutdown bool
donech := make(chan bool)
var lastwrite time.Time
lasttick := time.Now()
go func() {
t := time.NewTicker(time.Second / 20)
defer t.Stop()
for {
select {
case <-donech:
return
case <-t.C:
now := time.Now()
if now.Sub(lastwrite) < time.Second || now.Sub(lasttick) >= time.Second {
mu.Lock()
if !done && !ticker(ctx) {
shutdown = true
ln.Close()
}
lasttick = now
mu.Unlock()
}
}
}
}()
defer func() {
mu.Lock()
done = true
mu.Unlock()
donech <- true
}()
for {
c, err := ln.Accept()
if err != nil {
mu.Lock()
if shutdown {
mu.Unlock()
return nil
}
mu.Unlock()
return err
}
id++
func() {
wake := func(c net.Conn, id int) func() {
return func() {
go func() {
mu.Lock()
defer mu.Unlock()
send, keepopen := handle(id, nil, ctx)
if len(send) > 0 {
lastwrite = time.Now()
if _, err := c.Write(send); err != nil {
c.Close()
return
}
}
if !keepopen {
c.Close()
return
}
}()
}
}(c, id)
if !func() bool {
mu.Lock()
defer mu.Unlock()
send, keepopen := accept(id, c.RemoteAddr().String(), wake, ctx)
if len(send) > 0 {
lastwrite = time.Now()
if _, err := c.Write(send); err != nil {
c.Close()
return false
}
}
if !keepopen {
c.Close()
return false
}
conns[c] = true
return true
}() {
return
}
go func(id int, c net.Conn) {
var ferr error
defer func() {
mu.Lock()
defer mu.Unlock()
if ferr == io.EOF {
ferr = nil
}
if operr, ok := ferr.(*net.OpError); ok {
ferr = operr.Err
switch ferr.Error() {
case "use of closed network connection",
"read: connection reset by peer":
ferr = nil
}
}
delete(conns, c)
closed(id, ferr, ctx)
c.Close()
}()
packet := make([]byte, 4096)
for {
n, err := c.Read(packet)
if err != nil {
ferr = err
return
}
func() {
mu.Lock()
defer mu.Unlock()
send, keepopen := handle(id, packet[:n], ctx)
if len(send) > 0 {
lastwrite = time.Now()
if _, err := c.Write(send); err != nil {
c.Close()
return
}
}
if !keepopen {
c.Close()
return
}
}()
}
}(id, c)
}()
}
}

285
shiny_epoll.go Normal file
View File

@ -0,0 +1,285 @@
//+build linux
package shiny
import (
"net"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
)
type conn struct {
fd int
id int
addr string
err error
hup bool
}
func eventServe(net_, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepopen bool),
ctx interface{}) error {
lna, err := net.Listen(net_, addr)
if err != nil {
return err
}
defer lna.Close()
ln := lna.(*net.TCPListener)
f, err := ln.File()
if err != nil {
return err
}
defer f.Close()
ln.Close()
sfd := int(f.Fd())
q, err := syscall.EpollCreate1(0)
if err != nil {
return err
}
defer syscall.Close(q)
event := syscall.EpollEvent{
Fd: int32(sfd),
Events: syscall.EPOLLIN,
}
if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, sfd, &event); err != nil {
return err
}
var conns = make(map[int]*conn)
closeAndRemove := func(c *conn) {
if c.hup {
return
}
c.hup = true
syscall.Close(c.fd)
delete(conns, c.fd)
closed(c.id, c.err, ctx)
}
defer func() {
for _, c := range conns {
closeAndRemove(c)
}
}()
var lastwriteorwake time.Time
write := func(nfd int, send []byte) (err error) {
res1, res2, errn := syscall.Syscall(syscall.SYS_WRITE, uintptr(nfd),
uintptr(unsafe.Pointer(&send[0])), uintptr(len(send)))
if errn != 0 {
_, _ = res1, res2
if errn == syscall.EAGAIN {
// This should not happen because we are running the
// server in blocking mode and withoug socket timeouts.
panic("EAGAIN")
}
return errn
}
lastwriteorwake = time.Now()
return nil
}
// add wake event
var wakemu sync.Mutex
var wakeable = true
var wakers = make(map[int]int) // FD->ID map
var wakersarr []int
var wakezero = []byte{0, 1, 2, 3, 4, 5, 6, 7}
// SYS_EVENTFD is not implemented in Go yet.
// SYS_EVENTFD = 284
// SYS_EVENTFD2 = 290
r1, _, errn := syscall.Syscall(284, 0, 0, 0)
if errn != 0 {
return errn
}
var efd = int(r1)
defer func() {
wakemu.Lock()
wakeable = false
syscall.Close(efd)
wakemu.Unlock()
}()
event = syscall.EpollEvent{
Fd: int32(efd),
Events: syscall.EPOLLIN,
}
if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, efd, &event); err != nil {
return err
}
shandle := func(c *conn, data []byte) {
send, keepalive := handle(c.id, data, ctx)
if len(send) > 0 {
if err := write(c.fd, send); err != nil {
c.err = err
closeAndRemove(c)
return
}
}
if !keepalive {
closeAndRemove(c)
}
}
var lastts time.Time
if ticker != nil {
if !ticker(ctx) {
syscall.Close(q)
return nil
}
lastts = time.Now()
}
var id int
var packet [65535]byte
var evs [32]syscall.EpollEvent
for {
var ts int
if time.Since(lastwriteorwake) < time.Second {
ts = 50
} else {
ts = 1000
}
n, err := syscall.EpollWait(q, evs[:], ts)
if err != nil {
if err == syscall.EINTR {
continue
}
break
}
for i := 0; ; i++ {
now := time.Now()
if now.Sub(lastts) >= time.Second/20 {
if !ticker(ctx) {
syscall.Close(q)
break
}
lastts = now
}
if i >= n {
break
}
if evs[i].Fd == int32(sfd) {
nfd, sa, err := syscall.Accept(sfd)
if err != nil {
continue
}
var addr string
var port int
switch sa := sa.(type) {
default:
case *syscall.SockaddrInet4:
addr = net.IP(sa.Addr[:]).String()
port = sa.Port
case *syscall.SockaddrInet6:
addr = net.IP(sa.Addr[:]).String()
port = sa.Port
}
var res []byte
if strings.Contains(addr, ":") {
res = append(append(append(res, '['), addr...), ']', ':')
} else {
res = append(append(res, addr...), ':')
}
addr = string(strconv.AppendInt(res, int64(port), 10))
id++
wake := func(nfd, id int) func() {
return func() {
// NOTE: This is the one and only entrypoint that is
// not thread-safe. Use a mutex.
wakemu.Lock()
if wakeable {
wakers[nfd] = id
syscall.Write(efd, wakezero[:])
}
wakemu.Unlock()
}
}(nfd, id)
send, keepalive := accept(id, addr, wake, ctx)
if !keepalive {
syscall.Close(nfd)
continue
}
// 500 second keepalive
kerr1 := syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, 1)
kerr2 := syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, 500)
kerr3 := syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, 500)
if kerr1 != nil || kerr2 != nil || kerr3 != nil {
//fmt.Printf("%v %v %v\n", kerr1, kerr2, kerr3)
}
// add read
event := syscall.EpollEvent{
Fd: int32(nfd),
Events: syscall.EPOLLIN | syscall.EPOLLRDHUP, // | (syscall.EPOLLET & 0xffffffff),
}
if err := syscall.EpollCtl(q, syscall.EPOLL_CTL_ADD, nfd, &event); err != nil {
syscall.Close(nfd)
continue
}
c := &conn{fd: nfd, addr: addr, id: id}
conns[nfd] = c
if len(send) > 0 {
if err := write(c.fd, send); err != nil {
c.err = err
closeAndRemove(c)
continue
}
}
} else if evs[i].Fd == int32(efd) {
// NOTE: This is a wakeup call. Use a mutex when accessing
// the `wakers` field.
wakersarr = wakersarr[:0]
wakemu.Lock()
for nfd, id := range wakers {
wakersarr = append(wakersarr, nfd, id)
}
wakers = make(map[int]int)
var data [8]byte
_, err := syscall.Read(efd, data[:])
wakemu.Unlock()
// exit the lock and read from the array
if err != nil {
return err
}
for i := 0; i < len(wakersarr); i += 2 {
nfd := wakersarr[i]
id := wakersarr[i+1]
c := conns[nfd]
if c != nil && c.id == id {
shandle(c, nil)
}
}
lastwriteorwake = time.Now()
} else if evs[i].Events&syscall.EPOLLRDHUP != 0 {
closeAndRemove(conns[int(evs[i].Fd)])
} else {
c := conns[int(evs[i].Fd)]
res, _, errn := syscall.Syscall(syscall.SYS_READ, uintptr(c.fd),
uintptr(unsafe.Pointer(&packet[0])), uintptr(len(packet)))
if errn != 0 || res == 0 {
if errn != 0 {
c.err = errn
}
closeAndRemove(c)
continue
}
shandle(c, packet[:res])
}
}
}
return nil
}

291
shiny_kqueue.go Normal file
View File

@ -0,0 +1,291 @@
//+build darwin freebsd
package shiny
import (
"fmt"
"net"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"
"unsafe"
)
type conn struct {
fd int
id int
addr string
err error
}
func eventServe(net_, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepopen bool),
ctx interface{}) error {
lna, err := net.Listen(net_, addr)
if err != nil {
return err
}
defer lna.Close()
ln := lna.(*net.TCPListener)
f, err := ln.File()
if err != nil {
return err
}
defer f.Close()
ln.Close()
sfd := int(f.Fd())
q, err := syscall.Kqueue()
if err != nil {
return err
}
defer syscall.Close(q)
ev := []syscall.Kevent_t{{
Ident: uint64(sfd),
Flags: syscall.EV_ADD,
Filter: syscall.EVFILT_READ,
}}
if _, err := syscall.Kevent(q, ev, nil, nil); err != nil {
return err
}
var conns = make(map[int]*conn)
defer func() {
for _, conn := range conns {
syscall.Close(conn.fd)
closed(conn.id, conn.err, ctx)
}
}()
var lastwriteorwake time.Time
write := func(nfd int, send []byte) (err error) {
res1, res2, errn := syscall.Syscall(syscall.SYS_WRITE, uintptr(nfd),
uintptr(unsafe.Pointer(&send[0])), uintptr(len(send)))
if errn != 0 {
_, _ = res1, res2
if errn == syscall.EAGAIN {
// This should not happen because we are running the
// server in blocking mode and withoug socket timeouts.
panic("EAGAIN")
}
return errn
}
lastwriteorwake = time.Now()
return nil
}
// add wake event
var wakemu sync.Mutex
var wakeable = true
var wakers = make(map[int]int) // FD->ID map
var wakersarr []int
defer func() {
wakemu.Lock()
wakeable = false
wakemu.Unlock()
}()
ev = []syscall.Kevent_t{{
Ident: 0,
Flags: syscall.EV_ADD,
Filter: syscall.EVFILT_USER,
}}
if _, err := syscall.Kevent(q, ev, nil, nil); err != nil {
return err
}
shandle := func(c *conn, data []byte) {
send, keepalive := handle(c.id, data, ctx)
if len(send) > 0 {
if err := write(c.fd, send); err != nil {
c.err = err
syscall.Close(c.fd)
return
}
}
if !keepalive {
syscall.Close(c.fd)
}
}
var lastts time.Time
if ticker != nil {
if !ticker(ctx) {
syscall.Close(q)
return nil
}
lastts = time.Now()
}
var id int
var packet [65535]byte
var evs [32]syscall.Kevent_t
for {
var ts syscall.Timespec
if time.Since(lastwriteorwake) < time.Second {
ts = syscall.Timespec{Sec: 0, Nsec: int64(time.Second / 20)}
} else {
ts = syscall.Timespec{Sec: 1, Nsec: 0}
}
n, err := syscall.Kevent(q, nil, evs[:], &ts)
if err != nil {
if err == syscall.EINTR {
continue
}
break
}
for i := 0; ; i++ {
now := time.Now()
if now.Sub(lastts) >= time.Second/20 {
if !ticker(ctx) {
syscall.Close(q)
break
}
lastts = now
}
if i >= n {
break
}
if evs[i].Flags&syscall.EV_EOF != 0 {
c := conns[int(evs[i].Ident)]
syscall.Close(int(evs[i].Ident))
delete(conns, int(evs[i].Ident))
if c != nil {
closed(c.id, c.err, ctx)
}
} else if evs[i].Ident == uint64(sfd) {
nfd, sa, err := syscall.Accept(sfd)
if err != nil {
continue
}
var addr string
var port int
switch sa := sa.(type) {
default:
case *syscall.SockaddrInet4:
addr = net.IP(sa.Addr[:]).String()
port = sa.Port
case *syscall.SockaddrInet6:
addr = net.IP(sa.Addr[:]).String()
port = sa.Port
}
var res []byte
if strings.Contains(addr, ":") {
res = append(append(append(res, '['), addr...), ']', ':')
} else {
res = append(append(res, addr...), ':')
}
addr = string(strconv.AppendInt(res, int64(port), 10))
id++
wake := func(nfd, id int) func() {
return func() {
// NOTE: This is the one and only entrypoint that is
// not thread-safe. Use a mutex.
wakemu.Lock()
ev := []syscall.Kevent_t{{
Ident: 0,
Flags: syscall.EV_ENABLE,
Filter: syscall.EVFILT_USER,
Fflags: syscall.NOTE_TRIGGER,
}}
if wakeable {
wakers[nfd] = id
syscall.Kevent(q, ev, nil, nil)
}
wakemu.Unlock()
}
}(nfd, id)
send, keepalive := accept(id, addr, wake, ctx)
if !keepalive {
syscall.Close(nfd)
continue
}
// 500 second keepalive
var kerr1, kerr2, kerr3 error
if runtime.GOOS == "darwin" {
kerr1 = syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, 0x8, 1)
kerr2 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x101, 500)
kerr3 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x10, 500)
} else {
// freebsd
kerr1 = syscall.SetsockoptInt(nfd, syscall.SOL_SOCKET, 0x8, 1)
kerr2 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x200, 500)
kerr3 = syscall.SetsockoptInt(nfd, syscall.IPPROTO_TCP, 0x100, 500)
}
if kerr1 != nil || kerr2 != nil || kerr3 != nil {
fmt.Printf("%v %v %v\n", kerr1, kerr2, kerr3)
}
// add read
ev := []syscall.Kevent_t{{
Ident: uint64(nfd),
Flags: syscall.EV_ADD,
Filter: syscall.EVFILT_READ,
}}
if _, err := syscall.Kevent(q, ev, nil, nil); err != nil {
syscall.Close(nfd)
continue
}
c := &conn{fd: nfd, addr: addr, id: id}
conns[nfd] = c
if len(send) > 0 {
if err := write(c.fd, send); err != nil {
c.err = err
syscall.Close(c.fd)
continue
}
}
} else if evs[i].Ident == 0 {
// NOTE: This is a wakeup call. Use a mutex when accessing
// the `wakers` field.
wakersarr = wakersarr[:0]
wakemu.Lock()
for nfd, id := range wakers {
wakersarr = append(wakersarr, nfd, id)
}
wakers = make(map[int]int)
ev := []syscall.Kevent_t{{
Ident: 0,
Flags: syscall.EV_DISABLE,
Filter: syscall.EVFILT_USER,
Fflags: syscall.NOTE_TRIGGER,
}}
_, err := syscall.Kevent(q, ev, nil, nil)
wakemu.Unlock()
// exit the lock and read from the array
if err != nil {
return err
}
for i := 0; i < len(wakersarr); i += 2 {
nfd := wakersarr[i]
id := wakersarr[i+1]
c := conns[nfd]
if c != nil && c.id == id {
shandle(c, nil)
}
}
lastwriteorwake = time.Now()
} else {
c := conns[int(evs[i].Ident)]
res, _, errn := syscall.Syscall(syscall.SYS_READ, uintptr(c.fd),
uintptr(unsafe.Pointer(&packet[0])), uintptr(len(packet)))
if errn != 0 || res == 0 {
if errn != 0 {
c.err = errn
}
syscall.Close(c.fd)
continue
}
shandle(c, packet[:res])
}
}
}
return nil
}

12
shiny_other.go Normal file
View File

@ -0,0 +1,12 @@
//+build !darwin,!freebsd,!linux
package shiny
func eventServe(net, addr string,
handle func(id int, data []byte, ctx interface{}) (send []byte, keepopen bool),
accept func(id int, addr string, wake func(), ctx interface{}) (send []byte, keepopen bool),
closed func(id int, err error, ctx interface{}),
ticker func(ctx interface{}) (keepopen bool),
context interface{}) error {
return compatServe(net, addr, handle, accept, closed, ticker, context)
}