Added multithreaded support

It's now possible to fire up multiple event loops in separate
goroutines. All that's needed is to set the `events.NumLoops` options
before calling `Serve`.

There are a few breaking API changes.

- The events pass an evio.Conn param that represents the unique
incoming socket connection.
- Prewrite and Postwrite events have been removed.
- Wake and Dial functions have been removed.
- The Transform utility has been removed.

The older version has been tagged as `v0.1.0` for vendoring purposes.
This commit is contained in:
Josh Baker 2018-05-23 16:49:45 -07:00
parent 751f59108a
commit dd88755b17
21 changed files with 1537 additions and 2649 deletions

150
README.md
View File

@ -17,23 +17,20 @@ This project is not intended to be a general purpose replacement for the standar
You would not want to use this framework if you need to handle long-running requests (milliseconds or more). For example, a web api that needs to connect to a mongo database, authenticate, and respond; just use the Go net/http package instead.
There are many popular event loop based applications in the wild such as Nginx, Haproxy, Redis, and Memcached. All of these are single-threaded and very fast and written in C.
The reason I wrote this framework is so I can build certain network services that perform like the C apps above, but I also want to continue to work in Go.
There are many popular event loop based applications in the wild such as Nginx, Haproxy, Redis, and Memcached. All of these are very fast and written in C.
The reason I wrote this framework is so that I can build certain networking services that perform like the C apps above, but I also want to continue to work in Go.
## Features
- [Fast](#performance) single-threaded event loop
- [Fast](#performance) single-threaded or [multithreaded](#multithreaded) event loop
- Built-in [load balancing](#load-balancing) options
- Simple API
- Low memory usage
- Supports tcp, [udp](#udp), and unix sockets
- Allows [multiple network binding](#multiple-addresses) on the same event loop
- Flexible [ticker](#ticker) event
- Fallback for non-epoll/kqueue operating systems by simulating events with the [net](https://golang.org/pkg/net/) package
- Ability to [wake up](#wake-up) connections from long running background operations
- [Dial](#dial-out) an outbound connection and process/proxy on the event loop
- [SO_REUSEPORT](#so_reuseport) socket option
## Getting Started
@ -61,7 +58,7 @@ import "github.com/tidwall/evio"
func main() {
var events evio.Events
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
out = in
return
}
@ -89,8 +86,6 @@ The event type has a bunch of handy events:
- `Closed` fires when a connection has closed.
- `Detach` fires when a connection has been detached using the `Detach` return action.
- `Data` fires when the server receives new data from a connection.
- `Prewrite` fires prior to all write attempts from the server.
- `Postwrite` fires immediately after every write attempt.
- `Tick` fires immediately after the server starts and will fire again after a specified interval.
### Multiple addresses
@ -114,127 +109,30 @@ events.Tick = func() (delay time.Duration, action Action){
}
```
### Wake up
A connection can be woken up using the `Wake` function that is made available through the `Serving` event. This is useful for when you need to offload an operation to a background goroutine and then later notify the event loop that it's time to send some data.
Example echo server that when encountering the line "exec" it waits 5 seconds before responding.
```go
var srv evio.Server
var mu sync.Mutex
var execs = make(map[int]int)
events.Serving = func(srvin evio.Server) (action evio.Action) {
srv = srvin // hang on to the server control, which has the Wake function
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
if in == nil {
// look for `in` param equal to `nil` following a wake call.
mu.Lock()
for execs[id] > 0 {
out = append(out, "exec\r\n"...)
execs[id]--
}
mu.Unlock()
} else if string(in) == "exec\r\n" {
go func(){
// do some long running operation
time.Sleep(time.Second*5)
mu.Lock()
execs[id]++
mu.Unlock()
srv.Wake(id)
}()
} else {
out = in
}
return
}
```
### Dial out
An outbound connection can be created by using the `Dial` function that is made available through the `Serving` event. Dialing a new connection will return a new connection ID and attach that connection to the event loop in the same manner as incoming connections. This operation is completely non-blocking including any DNS resolution.
All new outbound connection attempts will immediately fire an `Opened` event and end with a `Closed` event. A failed connection will send the connection error through the `Closed` event.
```go
var srv evio.Server
var mu sync.Mutex
var execs = make(map[int]int)
events.Serving = func(srvin evio.Server) (action evio.Action) {
srv = srvin // hang on to the server control, which has the Dial function
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
if string(in) == "dial\r\n" {
id := srv.Dial("tcp://google.com:80")
// We now established an outbound connection to google.
// Treat it like you would incoming connection.
} else {
out = in
}
return
}
```
### Data translations
The `Translate` function wraps events and provides a `ReadWriter` that can be used to translate data off the wire from one format to another. This can be useful for transparently adding compression or encryption.
For example, let's say we need TLS support:
```go
var events Events
// ... fill the events with happy functions
cer, err := tls.LoadX509KeyPair("certs/ssl-cert-snakeoil.pem", "certs/ssl-cert-snakeoil.key")
if err != nil {
log.Fatal(err)
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
// wrap the events with a TLS translator
events = evio.Translate(events, nil,
func(id int, rw io.ReadWriter) io.ReadWriter {
return tls.Server(evio.NopConn(rw), config)
},
)
log.Fatal(evio.Serve(events, "tcp://0.0.0.0:443"))
```
Here we wrapped the event with a TLS translator. The `evio.NopConn` function is used to converts the `ReadWriter` a `net.Conn` so the `tls.Server()` call will work.
There's a working TLS example at [examples/http-server/main.go](examples/http-server/main.go) that binds to port 8080 and 4443 using an developer SSL certificate. The 8080 connections will be insecure and the 4443 will be secure.
```sh
$ cd examples/http-server
$ go run main.go --tlscert example.pem
2017/11/02 06:24:33 http server started on port 8080
2017/11/02 06:24:33 https server started on port 4443
```
```sh
$ curl http://localhost:8080
Hello World!
$ curl -k https://localhost:4443
Hello World!
```
## UDP
The `Serve` function can bind to UDP addresses.
- The `Opened` event will fire when a UDP packet is received from a new remote address.
- The `Closed` event will fire when the server is shutdown or the `Close` action is explicitly returned from an event.
- The `Wake` and `Dial` operations are not available to UDP connections.
- All incoming and outgoing packets are not buffered and sent individually.
- The `Opened` and `Closed` events are not availble for UDP sockets, only the `Data` event.
## Multithreaded
The `events.NumLoops` options sets the number of loops to use for the server.
Setting this to a value greater than 1 will effectively make the server multithreaded for multi-core machines.
Which means you must take care with synchonizing memory between all event callbacks.
Setting to 0 or 1 will run the server single-threaded.
Setting to -1 will automatically assign this value equal to `runtime.NumProcs()`.
## Load balancing
The `events.LoadBalance` options sets the load balancing method.
Load balancing is always a best effort to attempt to distribute the incoming connections between multiple loops.
This option is only available when `events.NumLoops` is set.
- `Random` requests that connections are randomly distributed.
- `RoundRobin` requests that connections are distributed to a loop in a round-robin fashion.
- `LeastConnections` assigns the next accepted connection to the loop with the least number of active connections.
## SO_REUSEPORT

133
evio.go
View File

@ -1,4 +1,4 @@
// Copyright 2017 Joshua J Baker. All rights reserved.
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
@ -10,8 +10,6 @@ import (
"os"
"strings"
"time"
"github.com/kavu/go_reuseport"
)
// Action is an action that occurs after the completion of an event.
@ -20,9 +18,9 @@ type Action int
const (
// None indicates that no action should occur following an event.
None Action = iota
// Detach detaches the client. Not available for UDP connections.
// Detach detaches a connection. Not available for UDP connections.
Detach
// Close closes the client.
// Close closes the connection.
Close
// Shutdown shutdowns the server.
Shutdown
@ -40,44 +38,59 @@ type Options struct {
ReuseInputBuffer bool
}
// Info represents a information about the connection
type Info struct {
// Closing is true when the connection is about to close. Expect a Closed
// event to fire soon.
Closing bool
// AddrIndex is the index of server address that was passed to the Serve call.
AddrIndex int
// LocalAddr is the connection's local socket address.
LocalAddr net.Addr
// RemoteAddr is the connection's remote peer address.
RemoteAddr net.Addr
}
// Server represents a server context which provides information about the
// running server and has control functions for managing state.
type Server struct {
// The addrs parameter is an array of listening addresses that align
// with the addr strings passed to the Serve function.
Addrs []net.Addr
// Wake is a goroutine-safe function that triggers a Data event
// (with a nil `in` parameter) for the specified id. Not available for
// UDP connections.
Wake func(id int) (ok bool)
// Dial is a goroutine-safe function makes a connection to an external
// server and returns a new connection id. The new connection is added
// to the event loop and is managed exactly the same way as all the
// other connections. This operation only fails if the server/loop has
// been shut down. An `id` that is not zero means the operation succeeded
// and then there always be exactly one Opened and one Closed event
// following this call. Look for socket errors from the Closed event.
// Not available for UDP connections.
Dial func(addr string, timeout time.Duration) (id int)
// NumLoops is the number of loops that the server is using.
NumLoops int
}
// Conn is an evio connection.
type Conn interface {
// Context returns a user-defined context.
Context() interface{}
// SetContext sets a user-defined context.
SetContext(interface{})
// AddrIndex is the index of server address that was passed to the Serve call.
AddrIndex() int
// LocalAddr is the connection's local socket address.
LocalAddr() net.Addr
// RemoteAddr is the connection's remote peer address.
RemoteAddr() net.Addr
}
// LoadBalance sets the load balancing method.
type LoadBalance int
const (
// Random requests that connections are randomly distributed.
Random LoadBalance = iota
// RoundRobin requests that connections are distributed to a loop in a
// round-robin fashion.
RoundRobin
// LeastConnections assigns the next accepted connection to the loop with
// the least number of active connections.
LeastConnections
)
// Events represents the server events for the Serve call.
// Each event has an Action return value that is used manage the state
// of the connection and server.
type Events struct {
// NumLoops sets the number of loops to use for the server. Setting this
// to a value greater than 1 will effectively make the server
// multithreaded for multi-core machines. Which means you must take care
// with synchonizing memory between all event callbacks. Setting to 0 or 1
// will run the server single-threaded. Setting to -1 will automatically
// assign this value equal to runtime.NumProcs().
NumLoops int
// LoadBalance sets the load balancing method. Load balancing is always a
// best effort to attempt to distribute the incoming connections between
// multiple loops. This option is only works when NumLoops is set.
LoadBalance LoadBalance
// Serving fires when the server can accept connections. The server
// parameter has information and various utilities.
Serving func(server Server) (action Action)
@ -86,10 +99,10 @@ type Events struct {
// it's local and remote address.
// Use the out return value to write data to the connection.
// The opts return value is used to set connection options.
Opened func(id int, info Info) (out []byte, opts Options, action Action)
Opened func(c Conn) (out []byte, opts Options, action Action)
// Closed fires when a connection has closed.
// The err parameter is the last known connection error.
Closed func(id int, err error) (action Action)
Closed func(c Conn, err error) (action Action)
// Detached fires when a connection has been previously detached.
// Once detached it's up to the receiver of this event to manage the
// state of the connection. The Closed event will not be called for
@ -97,21 +110,11 @@ type Events struct {
// The conn parameter is a ReadWriteCloser that represents the
// underlying socket connection. It can be freely used in goroutines
// and should be closed when it's no longer needed.
Detached func(id int, rwc io.ReadWriteCloser) (action Action)
Detached func(c Conn, rwc io.ReadWriteCloser) (action Action)
// Data fires when a connection sends the server data.
// The in parameter is the incoming data.
// Use the out return value to write data to the connection.
Data func(id int, in []byte) (out []byte, action Action)
// Prewrite fires prior to every write attempt.
// The amount parameter is the number of bytes that will be attempted
// to be written to the connection.
Prewrite func(id int, amount int) (action Action)
// Postwrite fires immediately after every write attempt.
// The amount parameter is the number of bytes that was written to the
// connection.
// The remaining parameter is the number of bytes that still remain in
// the buffer scheduled to be written.
Postwrite func(id int, amount, remaining int) (action Action)
Data func(c Conn, in []byte) (out []byte, action Action)
// Tick fires immediately after the server starts and will fire again
// following the duration specified by the delay return value.
Tick func() (delay time.Duration, action Action)
@ -151,14 +154,14 @@ func Serve(events Events, addr ...string) error {
}
var err error
if ln.network == "udp" {
if ln.opts.reusePort() {
ln.pconn, err = reuseport.ListenPacket(ln.network, ln.addr)
if ln.opts.reusePort {
ln.pconn, err = reuseportListenPacket(ln.network, ln.addr)
} else {
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
}
} else {
if ln.opts.reusePort() {
ln.ln, err = reuseport.Listen(ln.network, ln.addr)
if ln.opts.reusePort {
ln.ln, err = reuseportListen(ln.network, ln.addr)
} else {
ln.ln, err = net.Listen(ln.network, ln.addr)
}
@ -179,13 +182,13 @@ func Serve(events Events, addr ...string) error {
lns = append(lns, &ln)
}
if stdlib {
return servenet(events, lns)
return stdserve(events, lns)
}
return serve(events, lns)
}
// InputStream is a helper type for managing input streams inside the
// Data event.
// InputStream is a helper type for managing input streams from inside
// the Data event.
type InputStream struct{ b []byte }
// Begin accepts a new packet and returns a working sequence of
@ -199,7 +202,7 @@ func (is *InputStream) Begin(packet []byte) (data []byte) {
return data
}
// End shift the stream to match the unprocessed data.
// End shifts the stream to match the unprocessed data.
func (is *InputStream) End(data []byte) {
if len(data) > 0 {
if len(data) != len(is.b) {
@ -221,20 +224,14 @@ type listener struct {
addr string
}
type addrOpts map[string]string
func (opts addrOpts) reusePort() bool {
switch opts["reuseport"] {
case "yes", "true", "1":
return true
}
return false
type addrOpts struct {
reusePort bool
}
func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool) {
network = "tcp"
address = addr
opts = make(map[string]string)
opts.reusePort = false
if strings.Contains(address, "://") {
network = strings.Split(address, "://")[0]
address = strings.Split(address, "://")[1]
@ -248,7 +245,17 @@ func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool
for _, part := range strings.Split(address[q+1:], "&") {
kv := strings.Split(part, "=")
if len(kv) == 2 {
opts[kv[0]] = kv[1]
switch kv[0] {
case "reuseport":
if len(kv[1]) != 0 {
switch kv[1][0] {
default:
opts.reusePort = kv[1][0] >= '1' && kv[1][0] <= '9'
case 'T', 't', 'Y', 'y':
opts.reusePort = true
}
}
}
}
}
address = address[:q]

View File

@ -1,807 +0,0 @@
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
// +build netbsd openbsd freebsd darwin dragonfly linux
package evio
import (
"net"
"os"
"sort"
"sync"
"syscall"
"time"
"github.com/tidwall/evio/internal"
)
func (ln *listener) close() {
if ln.fd != 0 {
syscall.Close(ln.fd)
}
if ln.f != nil {
ln.f.Close()
}
if ln.ln != nil {
ln.ln.Close()
}
if ln.pconn != nil {
ln.pconn.Close()
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
}
// system takes the net listener and detaches it from it's parent
// event loop, grabs the file descriptor, and makes it non-blocking.
func (ln *listener) system() error {
var err error
switch netln := ln.ln.(type) {
default:
panic("invalid listener type")
case nil:
switch pconn := ln.pconn.(type) {
default:
panic("invalid packetconn type")
case *net.UDPConn:
ln.f, err = pconn.File()
}
case *net.TCPListener:
ln.f, err = netln.File()
case *net.UnixListener:
ln.f, err = netln.File()
}
if err != nil {
ln.close()
return err
}
ln.fd = int(ln.f.Fd())
return syscall.SetNonblock(ln.fd, true)
}
// unixConn represents the connection as the event loop sees it.
// This is also becomes a detached connection.
type unixConn struct {
id, fd int
outbuf []byte
outpos int
action Action
opts Options
timeout time.Time
raddr net.Addr // remote addr
laddr net.Addr // local addr
lnidx int
err error
dialerr error
wake bool
readon bool
writeon bool
detached bool
closed bool
opening bool
}
func (c *unixConn) Timeout() time.Time {
return c.timeout
}
func (c *unixConn) Read(p []byte) (n int, err error) {
return syscall.Read(c.fd, p)
}
func (c *unixConn) Write(p []byte) (n int, err error) {
if c.detached {
if len(c.outbuf) > 0 {
for len(c.outbuf) > 0 {
n, err = syscall.Write(c.fd, c.outbuf)
if n > 0 {
c.outbuf = c.outbuf[n:]
}
if err != nil {
return 0, err
}
}
c.outbuf = nil
}
var tn int
if len(p) > 0 {
for len(p) > 0 {
n, err = syscall.Write(c.fd, p)
if n > 0 {
p = p[n:]
tn += n
}
if err != nil {
return tn, err
}
}
p = nil
}
return tn, nil
}
return syscall.Write(c.fd, p)
}
func (c *unixConn) Close() error {
if c.closed {
return syscall.EINVAL
}
err := syscall.Close(c.fd)
c.fd = -1
c.closed = true
return err
}
func serve(events Events, lns []*listener) error {
p, err := internal.MakePoll()
if err != nil {
return err
}
defer syscall.Close(p)
for _, ln := range lns {
if err := internal.AddRead(p, ln.fd, nil, nil); err != nil {
return err
}
}
var mu sync.Mutex
var done bool
lock := func() { mu.Lock() }
unlock := func() { mu.Unlock() }
fdconn := make(map[int]*unixConn)
idconn := make(map[int]*unixConn)
udpconn := make(map[syscall.SockaddrInet6]*unixConn)
timeoutqueue := internal.NewTimeoutQueue()
var id int
dial := func(addr string, timeout time.Duration) int {
lock()
if done {
unlock()
return 0
}
id++
c := &unixConn{id: id, opening: true, lnidx: -1}
idconn[id] = c
if timeout != 0 {
c.timeout = time.Now().Add(timeout)
timeoutqueue.Push(c)
}
unlock()
// resolving an address blocks and we don't want blocking, like ever.
// but since we're leaving the event loop we'll need to complete the
// socket connection in a goroutine and add the read and write events
// to the loop to get back into the loop.
go func() {
err := func() error {
sa, err := resolve(addr)
if err != nil {
return err
}
var fd int
switch sa.(type) {
case *syscall.SockaddrUnix:
fd, err = syscall.Socket(syscall.AF_UNIX, syscall.SOCK_STREAM, 0)
case *syscall.SockaddrInet4:
fd, err = syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
case *syscall.SockaddrInet6:
fd, err = syscall.Socket(syscall.AF_INET6, syscall.SOCK_STREAM, 0)
}
if err != nil {
return err
}
err = syscall.Connect(fd, sa)
if err != nil && err != syscall.EINPROGRESS {
syscall.Close(fd)
return err
}
if err := syscall.SetNonblock(fd, true); err != nil {
syscall.Close(fd)
return err
}
lock()
err = internal.AddRead(p, fd, &c.readon, &c.writeon)
if err != nil {
unlock()
syscall.Close(fd)
return err
}
err = internal.AddWrite(p, fd, &c.readon, &c.writeon)
if err != nil {
unlock()
syscall.Close(fd)
return err
}
c.fd = fd
fdconn[fd] = c
unlock()
return nil
}()
if err != nil {
// set a dial error and timeout right away
lock()
c.dialerr = err
c.timeout = time.Now()
timeoutqueue.Push(c)
unlock()
}
}()
return id
}
// wake wakes up a connection
wake := func(id int) bool {
var ok = true
var err error
lock()
if done {
unlock()
return false
}
c := idconn[id]
if c == nil || c.fd == 0 {
if c != nil && c.opening {
c.wake = true
ok = true
} else {
ok = false
}
} else if !c.wake {
c.wake = true
err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon)
}
unlock()
if err != nil {
panic(err)
}
return ok
}
ctx := Server{Wake: wake, Dial: dial}
ctx.Addrs = make([]net.Addr, len(lns))
for i, ln := range lns {
ctx.Addrs[i] = ln.lnaddr
}
if events.Serving != nil {
switch events.Serving(ctx) {
case Shutdown:
return nil
}
}
defer func() {
lock()
done = true
type fdid struct {
fd, id int
opening bool
laddr net.Addr
raddr net.Addr
lnidx int
}
var fdids []fdid
for _, c := range idconn {
if c.opening {
filladdrs(c)
}
fdids = append(fdids, fdid{c.fd, c.id, c.opening, c.laddr, c.raddr, c.lnidx})
}
sort.Slice(fdids, func(i, j int) bool {
return fdids[j].id < fdids[i].id
})
for _, fdid := range fdids {
if fdid.fd != 0 {
syscall.Close(fdid.fd)
}
if fdid.opening {
if events.Opened != nil {
unlock()
events.Opened(fdid.id, Info{
Closing: true,
AddrIndex: fdid.lnidx,
LocalAddr: fdid.laddr,
RemoteAddr: fdid.raddr,
})
lock()
}
}
if events.Closed != nil {
unlock()
events.Closed(fdid.id, nil)
lock()
}
}
for _, c := range udpconn {
if events.Closed != nil {
unlock()
events.Closed(c.id, nil)
lock()
}
}
syscall.Close(p)
fdconn = nil
idconn = nil
udpconn = nil
unlock()
}()
var rsa syscall.Sockaddr
var sa6 syscall.SockaddrInet6
var detached []int
var packet [0xFFFF]byte
var evs = internal.MakeEvents(64)
nextTicker := time.Now()
for {
delay := nextTicker.Sub(time.Now())
if delay < 0 {
delay = 0
} else if delay > time.Second/4 {
delay = time.Second / 4
}
pn, err := internal.Wait(p, evs, delay)
if err != nil && err != syscall.EINTR {
return err
}
remain := nextTicker.Sub(time.Now())
if remain < 0 {
var tickerDelay time.Duration
var action Action
if events.Tick != nil {
tickerDelay, action = events.Tick()
if action == Shutdown {
return nil
}
} else {
tickerDelay = time.Hour
}
nextTicker = time.Now().Add(tickerDelay + remain)
}
// check for dial connection timeouts
if timeoutqueue.Len() > 0 {
var count int
now := time.Now()
for {
v := timeoutqueue.Peek()
if v == nil {
break
}
c := v.(*unixConn)
if now.After(v.Timeout()) {
timeoutqueue.Pop()
lock()
if _, ok := idconn[c.id]; ok && c.opening {
delete(idconn, c.id)
delete(fdconn, c.fd)
unlock()
filladdrs(c)
syscall.Close(c.fd)
if events.Opened != nil {
events.Opened(c.id, Info{
Closing: true,
AddrIndex: c.lnidx,
LocalAddr: c.laddr,
RemoteAddr: c.raddr,
})
}
if events.Closed != nil {
if c.dialerr != nil {
events.Closed(c.id, c.dialerr)
} else {
events.Closed(c.id, syscall.ETIMEDOUT)
}
}
count++
} else {
unlock()
}
} else {
break
}
}
if count > 0 {
// invalidate the current events and wait for more
continue
}
}
detached = detached[:0]
lock()
for i := 0; i < pn; i++ {
var in []byte
var sa syscall.Sockaddr
var c *unixConn
var nfd int
var n int
var out []byte
var ln *listener
var lnidx int
var fd = internal.GetFD(evs, i)
for lnidx, ln = range lns {
if fd == ln.fd {
if ln.pconn != nil {
goto udpread
}
goto accept
}
}
ln = nil
c = fdconn[fd]
if c == nil {
var found bool
for _, dfd := range detached {
if dfd == fd {
found = true
break
}
}
if !found {
syscall.Close(fd)
}
goto next
}
if c.opening {
goto opened
}
goto read
accept:
nfd, rsa, err = syscall.Accept(fd)
if err != nil {
goto next
}
if err = syscall.SetNonblock(nfd, true); err != nil {
goto fail
}
id++
c = &unixConn{id: id, fd: nfd,
opening: true,
lnidx: lnidx,
raddr: sockaddrToAddr(rsa),
}
// we have a remote address but the local address yet.
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
fdconn[nfd] = c
idconn[id] = c
goto next
opened:
filladdrs(c)
if err = internal.AddRead(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
if events.Opened != nil {
unlock()
out, c.opts, c.action = events.Opened(c.id, Info{
AddrIndex: lnidx,
LocalAddr: c.laddr,
RemoteAddr: c.raddr,
})
lock()
if c.opts.TCPKeepAlive > 0 {
internal.SetKeepAlive(c.fd, int(c.opts.TCPKeepAlive/time.Second))
}
if len(out) > 0 {
c.outbuf = append(c.outbuf, out...)
}
}
if c.opening {
c.opening = false
goto next
}
goto write
udpread:
n, sa, err = syscall.Recvfrom(fd, packet[:], 0)
if err != nil || n == 0 {
goto next
}
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
sa6.ZoneId = 0
sa6.Port = sa.Port
for i := 0; i < 12; i++ {
sa6.Addr[i] = 0
}
sa6.Addr[12] = sa.Addr[0]
sa6.Addr[13] = sa.Addr[1]
sa6.Addr[14] = sa.Addr[2]
sa6.Addr[15] = sa.Addr[3]
case *syscall.SockaddrInet6:
sa6 = *sa
}
c = udpconn[sa6]
if c == nil {
id++
c = &unixConn{id: id,
lnidx: lnidx,
laddr: ln.lnaddr,
raddr: sockaddrToAddr(sa),
}
udpconn[sa6] = c
if events.Opened != nil {
unlock()
out, _, c.action = events.Opened(c.id, Info{AddrIndex: c.lnidx, LocalAddr: c.laddr, RemoteAddr: c.raddr})
lock()
if len(out) > 0 {
if events.Prewrite != nil {
unlock()
action := events.Prewrite(id, len(out))
lock()
if action == Shutdown {
c.action = action
}
}
syscall.Sendto(fd, out, 0, sa)
if events.Postwrite != nil {
unlock()
action := events.Postwrite(id, len(out), 0)
lock()
if action == Shutdown {
c.action = action
}
}
}
}
}
if c.action == None {
if events.Data != nil {
if c.opts.ReuseInputBuffer {
in = packet[:n]
} else {
in = append([]byte{}, packet[:n]...)
}
unlock()
out, c.action = events.Data(c.id, in)
lock()
if len(out) > 0 {
if events.Prewrite != nil {
unlock()
action := events.Prewrite(id, len(out))
lock()
if action == Shutdown {
c.action = action
}
}
syscall.Sendto(fd, out, 0, sa)
if events.Postwrite != nil {
unlock()
action := events.Postwrite(id, len(out), 0)
lock()
if action == Shutdown {
c.action = action
}
}
}
}
}
switch c.action {
case Close, Detach:
delete(udpconn, sa6)
if events.Closed != nil {
unlock()
action := events.Closed(id, nil)
lock()
if action == Shutdown {
c.action = action
}
}
}
if c.action == Shutdown {
err = nil
goto fail
}
goto next
read:
if c.action != None {
goto write
}
if c.wake {
c.wake = false
} else {
n, err = c.Read(packet[:])
if n == 0 || err != nil {
if err == syscall.EAGAIN {
goto write
}
c.err = err
goto close
}
if c.opts.ReuseInputBuffer {
in = packet[:n]
} else {
in = append([]byte{}, packet[:n]...)
}
}
if events.Data != nil {
unlock()
out, c.action = events.Data(c.id, in)
lock()
}
if len(out) > 0 {
c.outbuf = append(c.outbuf, out...)
}
goto write
write:
if len(c.outbuf)-c.outpos > 0 {
if events.Prewrite != nil {
unlock()
action := events.Prewrite(c.id, len(c.outbuf[c.outpos:]))
lock()
if action == Shutdown {
c.action = Shutdown
}
}
n, err = c.Write(c.outbuf[c.outpos:])
if events.Postwrite != nil {
amount := n
if amount < 0 {
amount = 0
}
unlock()
action := events.Postwrite(c.id, amount, len(c.outbuf)-c.outpos-amount)
lock()
if action == Shutdown {
c.action = Shutdown
}
}
if n == 0 || err != nil {
if c.action == Shutdown {
goto close
}
if err == syscall.EAGAIN {
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
goto next
}
c.err = err
goto close
}
c.outpos += n
if len(c.outbuf)-c.outpos == 0 {
c.outpos = 0
c.outbuf = c.outbuf[:0]
}
}
if c.action == Shutdown {
goto close
}
if len(c.outbuf)-c.outpos == 0 {
if !c.wake {
if err = internal.DelWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
}
if c.action != None {
goto close
}
} else {
if err = internal.AddWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
}
goto next
close:
delete(fdconn, c.fd)
delete(idconn, c.id)
if c.action == Detach {
if events.Detached != nil {
if err = internal.DelRead(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
if err = internal.DelWrite(p, c.fd, &c.readon, &c.writeon); err != nil {
goto fail
}
detached = append(detached, c.fd)
c.detached = true
if len(c.outbuf)-c.outpos > 0 {
c.outbuf = append(c.outbuf[:0], c.outbuf[c.outpos:]...)
} else {
c.outbuf = nil
}
c.outpos = 0
syscall.SetNonblock(c.fd, false)
unlock()
c.action = events.Detached(c.id, c)
lock()
if c.action == Shutdown {
goto fail
}
goto next
}
}
syscall.Close(c.fd)
if events.Closed != nil {
unlock()
action := events.Closed(c.id, c.err)
lock()
if action == Shutdown {
c.action = Shutdown
}
}
if c.action == Shutdown {
err = nil
goto fail
}
goto next
fail:
unlock()
return err
next:
}
unlock()
}
}
// resolve resolves an evio address and retuns a sockaddr for socket
// connection to external servers.
func resolve(addr string) (sa syscall.Sockaddr, err error) {
network, address, _, _ := parseAddr(addr)
var taddr net.Addr
switch network {
default:
return nil, net.UnknownNetworkError(network)
case "unix":
taddr = &net.UnixAddr{Net: "unix", Name: address}
case "tcp", "tcp4", "tcp6":
// use the stdlib resolver because it's good.
taddr, err = net.ResolveTCPAddr(network, address)
if err != nil {
return nil, err
}
}
switch taddr := taddr.(type) {
case *net.UnixAddr:
sa = &syscall.SockaddrUnix{Name: taddr.Name}
case *net.TCPAddr:
switch len(taddr.IP) {
case 0:
var sa4 syscall.SockaddrInet4
sa4.Port = taddr.Port
sa = &sa4
case 4:
var sa4 syscall.SockaddrInet4
copy(sa4.Addr[:], taddr.IP[:])
sa4.Port = taddr.Port
sa = &sa4
case 16:
var sa6 syscall.SockaddrInet6
copy(sa6.Addr[:], taddr.IP[:])
sa6.Port = taddr.Port
sa = &sa6
}
}
return sa, nil
}
func sockaddrToAddr(sa syscall.Sockaddr) net.Addr {
var a net.Addr
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
}
case *syscall.SockaddrInet6:
var zone string
if sa.ZoneId != 0 {
if ifi, err := net.InterfaceByIndex(int(sa.ZoneId)); err == nil {
zone = ifi.Name
}
}
if zone == "" && sa.ZoneId != 0 {
}
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
Zone: zone,
}
case *syscall.SockaddrUnix:
a = &net.UnixAddr{Net: "unix", Name: sa.Name}
}
return a
}
func filladdrs(c *unixConn) {
if c.laddr == nil && c.fd != 0 {
sa, _ := syscall.Getsockname(c.fd)
c.laddr = sockaddrToAddr(sa)
}
if c.raddr == nil && c.fd != 0 {
sa, _ := syscall.Getpeername(c.fd)
c.raddr = sockaddrToAddr(sa)
}
}

View File

@ -1,534 +0,0 @@
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package evio
import (
"io"
"net"
"sort"
"sync"
"sync/atomic"
"time"
)
type netConn struct {
id int
wake int64
conn net.Conn
udpaddr net.Addr
detached bool
outbuf []byte
err error
}
func (c *netConn) Read(p []byte) (n int, err error) {
return c.conn.Read(p)
}
func (c *netConn) Write(p []byte) (n int, err error) {
if c.detached {
if len(c.outbuf) > 0 {
for len(c.outbuf) > 0 {
n, err = c.conn.Write(c.outbuf)
if n > 0 {
c.outbuf = c.outbuf[n:]
}
if err != nil {
return 0, err
}
}
c.outbuf = nil
}
var tn int
if len(p) > 0 {
for len(p) > 0 {
n, err = c.conn.Write(p)
if n > 0 {
p = p[n:]
tn += n
}
if err != nil {
return tn, err
}
}
p = nil
}
return tn, nil
}
return c.conn.Write(p)
}
func (c *netConn) Close() error {
return c.conn.Close()
}
// servenet uses the stdlib net package instead of syscalls.
func servenet(events Events, lns []*listener) error {
type udpaddr struct {
IP [16]byte
Port int
Zone string
}
var idc int64
var mu sync.Mutex
var cmu sync.Mutex
var idconn = make(map[int]*netConn)
var udpconn = make(map[udpaddr]*netConn)
var done int64
var shutdown func(err error)
// connloop handles an individual connection
connloop := func(id int, conn net.Conn, lnidx int, ln net.Listener) {
var closed bool
defer func() {
if !closed {
conn.Close()
}
}()
var packet [0xFFFF]byte
var cout []byte
var caction Action
c := &netConn{id: id, conn: conn}
cmu.Lock()
idconn[id] = c
cmu.Unlock()
if events.Opened != nil {
var out []byte
var opts Options
var action Action
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, opts, action = events.Opened(id, Info{
AddrIndex: lnidx,
LocalAddr: conn.LocalAddr(),
RemoteAddr: conn.RemoteAddr(),
})
}
mu.Unlock()
if opts.TCPKeepAlive > 0 {
if conn, ok := conn.(*net.TCPConn); ok {
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(opts.TCPKeepAlive)
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
}
for {
var n int
var err error
var out []byte
var action Action
if caction != None {
goto write
}
if len(cout) > 0 || atomic.LoadInt64(&c.wake) != 0 {
conn.SetReadDeadline(time.Now().Add(time.Microsecond))
} else {
conn.SetReadDeadline(time.Now().Add(time.Second))
}
n, err = c.Read(packet[:])
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
if n > 0 {
if events.Data != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, action = events.Data(id, append([]byte{}, packet[:n]...))
}
mu.Unlock()
}
} else if atomic.LoadInt64(&c.wake) != 0 {
atomic.StoreInt64(&c.wake, 0)
if events.Data != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, action = events.Data(id, nil)
}
mu.Unlock()
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
goto write
write:
if len(cout) > 0 {
if events.Prewrite != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Prewrite(id, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
conn.SetWriteDeadline(time.Now().Add(time.Microsecond))
n, err := c.Write(cout)
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
cout = cout[n:]
if len(cout) == 0 {
cout = nil
}
if events.Postwrite != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Postwrite(id, n, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
}
if caction == Shutdown {
goto close
}
if len(cout) == 0 {
if caction != None {
goto close
}
}
continue
close:
cmu.Lock()
delete(idconn, c.id)
cmu.Unlock()
mu.Lock()
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
return
}
mu.Unlock()
if caction == Detach {
if events.Detached != nil {
if len(cout) > 0 {
c.outbuf = cout
}
c.detached = true
conn.SetDeadline(time.Time{})
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
caction = events.Detached(c.id, c)
}
mu.Unlock()
closed = true
if caction == Shutdown {
goto fail
}
return
}
}
conn.Close()
if events.Closed != nil {
var action Action
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Closed(c.id, c.err)
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
closed = true
if caction == Shutdown {
goto fail
}
return
fail:
shutdown(nil)
return
}
}
ctx := Server{
Wake: func(id int) bool {
cmu.Lock()
c := idconn[id]
cmu.Unlock()
if c == nil {
return false
}
atomic.StoreInt64(&c.wake, 1)
// force a quick wakeup
c.conn.SetDeadline(time.Time{}.Add(1))
return true
},
Dial: func(addr string, timeout time.Duration) int {
if atomic.LoadInt64(&done) != 0 {
return 0
}
id := int(atomic.AddInt64(&idc, 1))
go func() {
network, address, _, _ := parseAddr(addr)
var conn net.Conn
var err error
if timeout > 0 {
conn, err = net.DialTimeout(network, address, timeout)
} else {
conn, err = net.Dial(network, address)
}
if err != nil {
if events.Opened != nil {
mu.Lock()
_, _, action := events.Opened(id, Info{Closing: true, AddrIndex: -1})
mu.Unlock()
if action == Shutdown {
shutdown(nil)
return
}
}
if events.Closed != nil {
mu.Lock()
action := events.Closed(id, err)
mu.Unlock()
if action == Shutdown {
shutdown(nil)
return
}
}
return
}
go connloop(id, conn, -1, nil)
}()
return id
},
}
var swg sync.WaitGroup
swg.Add(1)
var ferr error
shutdown = func(err error) {
mu.Lock()
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
return
}
defer swg.Done()
atomic.StoreInt64(&done, 1)
ferr = err
for _, ln := range lns {
if ln.pconn != nil {
ln.pconn.Close()
} else {
ln.ln.Close()
}
}
type connid struct {
conn net.Conn
id int
}
var connids []connid
var udpids []int
cmu.Lock()
for id, conn := range idconn {
connids = append(connids, connid{conn.conn, id})
}
for _, c := range udpconn {
udpids = append(udpids, c.id)
}
idconn = make(map[int]*netConn)
udpconn = make(map[udpaddr]*netConn)
cmu.Unlock()
mu.Unlock()
sort.Slice(connids, func(i, j int) bool {
return connids[j].id < connids[i].id
})
for _, connid := range connids {
connid.conn.Close()
if events.Closed != nil {
mu.Lock()
events.Closed(connid.id, nil)
mu.Unlock()
}
}
for _, id := range udpids {
if events.Closed != nil {
mu.Lock()
events.Closed(id, nil)
mu.Unlock()
}
}
}
ctx.Addrs = make([]net.Addr, len(lns))
for i, ln := range lns {
ctx.Addrs[i] = ln.lnaddr
}
if events.Serving != nil {
if events.Serving(ctx) == Shutdown {
return nil
}
}
var lwg sync.WaitGroup
lwg.Add(len(lns))
for i, ln := range lns {
if ln.pconn != nil {
go func(lnidx int, pconn net.PacketConn) {
defer lwg.Done()
var packet [0xFFFF]byte
for {
n, addr, err := pconn.ReadFrom(packet[:])
if err != nil {
if err == io.EOF {
shutdown(nil)
} else {
shutdown(err)
}
return
}
var uaddr udpaddr
switch addr := addr.(type) {
case *net.TCPAddr:
copy(uaddr.IP[16-len(addr.IP):], addr.IP)
uaddr.Zone = addr.Zone
uaddr.Port = addr.Port
}
var out []byte
var action Action
var c *netConn
mu.Lock()
c = udpconn[uaddr]
mu.Unlock()
if c == nil {
id := int(atomic.AddInt64(&idc, 1))
c = &netConn{id: id, udpaddr: addr}
mu.Lock()
udpconn[uaddr] = c
mu.Unlock()
if events.Opened != nil {
mu.Lock()
out, _, action = events.Opened(c.id, Info{AddrIndex: lnidx, LocalAddr: pconn.LocalAddr(), RemoteAddr: addr})
mu.Unlock()
if len(out) > 0 {
if events.Prewrite != nil {
mu.Lock()
action2 := events.Prewrite(c.id, len(out))
mu.Unlock()
if action2 == Shutdown {
action = action2
}
}
pconn.WriteTo(out, addr)
if events.Prewrite != nil {
mu.Lock()
action2 := events.Postwrite(c.id, len(out), 0)
mu.Unlock()
if action2 == Shutdown {
action = action2
}
}
}
}
}
if action == None {
if events.Data != nil {
mu.Lock()
out, action = events.Data(c.id, append([]byte{}, packet[:n]...))
mu.Unlock()
if len(out) > 0 {
if events.Prewrite != nil {
mu.Lock()
action2 := events.Prewrite(c.id, len(out))
mu.Unlock()
if action2 == Shutdown {
action = action2
}
}
pconn.WriteTo(out, addr)
if events.Prewrite != nil {
mu.Lock()
action2 := events.Postwrite(c.id, len(out), 0)
mu.Unlock()
if action2 == Shutdown {
action = action2
}
}
}
}
}
switch action {
case Close, Detach:
mu.Lock()
delete(udpconn, uaddr)
if events.Closed != nil {
action = events.Closed(c.id, nil)
}
mu.Unlock()
}
if action == Shutdown {
shutdown(nil)
return
}
}
}(i, ln.pconn)
} else {
go func(lnidx int, ln net.Listener) {
defer lwg.Done()
for {
conn, err := ln.Accept()
if err != nil {
if err == io.EOF {
shutdown(nil)
} else {
shutdown(err)
}
return
}
id := int(atomic.AddInt64(&idc, 1))
go connloop(id, conn, lnidx, ln)
}
}(i, ln.ln)
}
}
go func() {
for {
mu.Lock()
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
break
}
mu.Unlock()
var delay time.Duration
var action Action
mu.Lock()
if events.Tick != nil {
if atomic.LoadInt64(&done) == 0 {
delay, action = events.Tick()
}
} else {
mu.Unlock()
break
}
mu.Unlock()
if action == Shutdown {
shutdown(nil)
return
}
time.Sleep(delay)
}
}()
lwg.Wait() // wait for listeners
swg.Wait() // wait for shutdown
return ferr
}
func istimeout(err error) bool {
if err, ok := err.(net.Error); ok && err.Timeout() {
return true
}
return false
}

View File

@ -1,4 +1,4 @@
// Copyright 2017 Joshua J Baker. All rights reserved.
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
@ -6,7 +6,11 @@
package evio
import "os"
import (
"errors"
"net"
"os"
)
func (ln *listener) close() {
if ln.ln != nil {
@ -20,10 +24,18 @@ func (ln *listener) close() {
}
}
func (ln *listener) system(opts map[string]string) error {
func (ln *listener) system() error {
return nil
}
func serve(events Events, lns []*listener) error {
return servenet(events, lns)
func serve(events Events, listeners []*listener) error {
return servenet(events, listeners)
}
func reuseportListenPacket(proto, addr string) (l net.PacketConn, err error) {
return nil, errors.New("reuseport is not available")
}
func reuseportListen(proto, addr string) (l net.Listener, err error) {
return nil, errors.New("reuseport is not available")
}

439
evio_std.go Normal file
View File

@ -0,0 +1,439 @@
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package evio
import (
"errors"
"io"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
)
var errClosing = errors.New("closing")
var errCloseConns = errors.New("close conns")
type stdserver struct {
events Events // user events
loops []*stdloop // all the loops
lns []*listener // all the listeners
loopwg sync.WaitGroup // loop close waitgroup
lnwg sync.WaitGroup // listener close waitgroup
cond *sync.Cond // shutdown signaler
serr error // signal error
accepted uintptr // accept counter
}
type stdudpconn struct {
addrIndex int
localAddr net.Addr
remoteAddr net.Addr
in []byte
}
func (c *stdudpconn) Context() interface{} { return nil }
func (c *stdudpconn) SetContext(ctx interface{}) {}
func (c *stdudpconn) AddrIndex() int { return c.addrIndex }
func (c *stdudpconn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdudpconn) RemoteAddr() net.Addr { return c.remoteAddr }
type stdloop struct {
idx int // loop index
ch chan interface{} // command channel
conns map[*stdconn]bool // track all the conns bound to this loop
}
type stdconn struct {
addrIndex int
localAddr net.Addr
remoteAddr net.Addr
conn net.Conn // original connection
ctx interface{} // user-defined context
loop *stdloop // owner loop
lnidx int // index of listener
donein []byte // extra data for done connection
done int32 // 0: attached, 1: closed, 2: detached
}
func (c *stdconn) Context() interface{} { return c.ctx }
func (c *stdconn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *stdconn) AddrIndex() int { return c.addrIndex }
func (c *stdconn) LocalAddr() net.Addr { return c.localAddr }
func (c *stdconn) RemoteAddr() net.Addr { return c.remoteAddr }
type stdin struct {
c *stdconn
in []byte
}
type stderr struct {
c *stdconn
err error
}
// waitForShutdown waits for a signal to shutdown
func (s *stdserver) waitForShutdown() error {
s.cond.L.Lock()
s.cond.Wait()
err := s.serr
s.cond.L.Unlock()
return err
}
// signalShutdown signals a shutdown an begins server closing
func (s *stdserver) signalShutdown(err error) {
s.cond.L.Lock()
s.serr = err
s.cond.Signal()
s.cond.L.Unlock()
}
func stdserve(events Events, listeners []*listener) error {
numLoops := events.NumLoops
if numLoops <= 0 {
if numLoops == 0 {
numLoops = 1
} else {
numLoops = runtime.NumCPU()
}
}
s := &stdserver{}
s.events = events
s.lns = listeners
s.cond = sync.NewCond(&sync.Mutex{})
//println("-- server starting")
if events.Serving != nil {
var svr Server
svr.NumLoops = numLoops
svr.Addrs = make([]net.Addr, len(listeners))
for i, ln := range listeners {
svr.Addrs[i] = ln.lnaddr
}
action := events.Serving(svr)
switch action {
case Shutdown:
return nil
}
}
for i := 0; i < numLoops; i++ {
s.loops = append(s.loops, &stdloop{
idx: i,
ch: make(chan interface{}),
conns: make(map[*stdconn]bool),
})
}
var ferr error
defer func() {
// wait on a signal for shutdown
ferr = s.waitForShutdown()
// notify all loops to close by closing all listeners
for _, l := range s.loops {
l.ch <- errClosing
}
// wait on all loops to main loop channel events
s.loopwg.Wait()
// shutdown all listeners
for i := 0; i < len(s.lns); i++ {
s.lns[i].close()
}
// wait on all listeners to complete
s.lnwg.Wait()
// close all connections
s.loopwg.Add(len(s.loops))
for _, l := range s.loops {
l.ch <- errCloseConns
}
s.loopwg.Wait()
}()
s.loopwg.Add(numLoops)
for i := 0; i < numLoops; i++ {
go stdloopRun(s, s.loops[i])
}
s.lnwg.Add(len(listeners))
for i := 0; i < len(listeners); i++ {
go stdlistenerRun(s, listeners[i], i)
}
return ferr
}
func stdlistenerRun(s *stdserver, ln *listener, lnidx int) {
var ferr error
defer func() {
s.signalShutdown(ferr)
s.lnwg.Done()
}()
var packet [0xFFFF]byte
for {
if ln.pconn != nil {
// udp
n, addr, err := ln.pconn.ReadFrom(packet[:])
if err != nil {
ferr = err
return
}
l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)]
l.ch <- &stdudpconn{
addrIndex: lnidx,
localAddr: ln.lnaddr,
remoteAddr: addr,
in: append([]byte{}, packet[:n]...),
}
} else {
// tcp
conn, err := ln.ln.Accept()
if err != nil {
ferr = err
return
}
l := s.loops[int(atomic.AddUintptr(&s.accepted, 1))%len(s.loops)]
c := &stdconn{conn: conn, loop: l, lnidx: lnidx}
l.ch <- c
go func(c *stdconn) {
var packet [0xFFFF]byte
for {
n, err := c.conn.Read(packet[:])
if err != nil {
c.conn.SetReadDeadline(time.Time{})
l.ch <- &stderr{c, err}
return
}
l.ch <- &stdin{c, append([]byte{}, packet[:n]...)}
}
}(c)
}
}
}
func stdloopRun(s *stdserver, l *stdloop) {
var err error
tick := make(chan bool)
tock := make(chan time.Duration)
defer func() {
//fmt.Println("-- loop stopped --", l.idx)
if l.idx == 0 && s.events.Tick != nil {
close(tock)
go func() {
for range tick {
}
}()
}
s.signalShutdown(err)
s.loopwg.Done()
stdloopEgress(s, l)
s.loopwg.Done()
}()
if l.idx == 0 && s.events.Tick != nil {
go func() {
for {
tick <- true
delay, ok := <-tock
if !ok {
break
}
time.Sleep(delay)
}
}()
}
//fmt.Println("-- loop started --", l.idx)
for {
select {
case <-tick:
delay, action := s.events.Tick()
switch action {
case Shutdown:
err = errClosing
}
tock <- delay
case v := <-l.ch:
switch v := v.(type) {
case error:
err = v
case *stdconn:
err = stdloopAccept(s, l, v)
case *stdin:
err = stdloopRead(s, l, v.c, v.in)
case *stdudpconn:
err = stdloopReadUDP(s, l, v)
case *stderr:
err = stdloopError(s, l, v.c, v.err)
}
}
if err != nil {
return
}
}
}
func stdloopEgress(s *stdserver, l *stdloop) {
var closed bool
loop:
for v := range l.ch {
switch v := v.(type) {
case error:
if v == errCloseConns {
closed = true
for c := range l.conns {
stdloopClose(s, l, c)
}
}
case *stderr:
stdloopError(s, l, v.c, v.err)
}
if len(l.conns) == 0 && closed {
break loop
}
}
}
func stdloopError(s *stdserver, l *stdloop, c *stdconn, err error) error {
delete(l.conns, c)
closeEvent := true
switch atomic.LoadInt32(&c.done) {
case 0: // read error
c.conn.Close()
if err == io.EOF {
err = nil
}
case 1: // closed
c.conn.Close()
err = nil
case 2: // detached
err = nil
if s.events.Detached == nil {
c.conn.Close()
} else {
closeEvent = false
switch s.events.Detached(c, &stddetachedConn{c.conn, c.donein}) {
case Shutdown:
return errClosing
}
}
}
if closeEvent {
if s.events.Closed != nil {
switch s.events.Closed(c, err) {
case Shutdown:
return errClosing
}
}
}
return nil
}
type stddetachedConn struct {
conn net.Conn // original conn
in []byte // extra input data
}
func (c *stddetachedConn) Read(p []byte) (n int, err error) {
if len(c.in) > 0 {
if len(c.in) <= len(p) {
copy(p, c.in)
n = len(c.in)
c.in = nil
return
}
copy(p, c.in[:len(p)])
n = len(p)
c.in = c.in[n:]
return
}
return c.conn.Read(p)
}
func (c *stddetachedConn) Write(p []byte) (n int, err error) {
return c.conn.Write(p)
}
func (c *stddetachedConn) Close() error {
return c.conn.Close()
}
func stdloopRead(s *stdserver, l *stdloop, c *stdconn, in []byte) error {
if atomic.LoadInt32(&c.done) == 2 {
// should not ignore reads for detached connections
c.donein = append(c.donein, in...)
return nil
}
if s.events.Data != nil {
out, action := s.events.Data(c, in)
if len(out) > 0 {
c.conn.Write(out)
}
switch action {
case Shutdown:
return errClosing
case Detach:
return stdloopDetach(s, l, c)
case Close:
return stdloopClose(s, l, c)
}
}
return nil
}
func stdloopReadUDP(s *stdserver, l *stdloop, c *stdudpconn) error {
if s.events.Data != nil {
out, action := s.events.Data(c, c.in)
if len(out) > 0 {
s.lns[c.addrIndex].pconn.WriteTo(out, c.remoteAddr)
}
switch action {
case Shutdown:
return errClosing
}
}
return nil
}
func stdloopDetach(s *stdserver, l *stdloop, c *stdconn) error {
atomic.StoreInt32(&c.done, 2)
c.conn.SetReadDeadline(time.Now())
return nil
}
func stdloopClose(s *stdserver, l *stdloop, c *stdconn) error {
atomic.StoreInt32(&c.done, 1)
c.conn.SetReadDeadline(time.Now())
return nil
}
func stdloopAccept(s *stdserver, l *stdloop, c *stdconn) error {
l.conns[c] = true
c.addrIndex = c.lnidx
c.localAddr = s.lns[c.lnidx].lnaddr
c.remoteAddr = c.conn.RemoteAddr()
if s.events.Opened != nil {
out, opts, action := s.events.Opened(c)
if len(out) > 0 {
c.conn.Write(out)
}
if opts.TCPKeepAlive > 0 {
if c, ok := c.conn.(*net.TCPConn); ok {
c.SetKeepAlive(true)
c.SetKeepAlivePeriod(opts.TCPKeepAlive)
}
}
switch action {
case Shutdown:
return errClosing
case Detach:
return stdloopDetach(s, l, c)
case Close:
return stdloopClose(s, l, c)
}
}
return nil
}

View File

@ -25,68 +25,102 @@ func TestServe(t *testing.T) {
// the writes to the server will be random sizes. 0KB - 1MB.
// the server will echo back the data.
// waits for graceful connection closing.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
testServe("tcp", ":9990", false, 10)
}()
wg.Add(1)
go func() {
defer wg.Done()
testServe("tcp", ":9991", true, 10)
}()
wg.Add(1)
go func() {
defer wg.Done()
testServe("tcp-net", ":9992", false, 10)
}()
wg.Add(1)
go func() {
defer wg.Done()
testServe("tcp-net", ":9993", true, 10)
}()
wg.Wait()
t.Run("stdlib", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe("tcp-net", ":9997", false, 10, 1, Random)
})
t.Run("5-loop", func(t *testing.T) {
testServe("tcp-net", ":9998", false, 10, 5, LeastConnections)
})
t.Run("N-loop", func(t *testing.T) {
testServe("tcp-net", ":9999", false, 10, -1, RoundRobin)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe("tcp-net", ":9989", true, 10, 1, Random)
})
t.Run("5-loop", func(t *testing.T) {
testServe("tcp-net", ":9988", true, 10, 5, LeastConnections)
})
t.Run("N-loop", func(t *testing.T) {
testServe("tcp-net", ":9987", true, 10, -1, RoundRobin)
})
})
})
t.Run("poll", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe("tcp", ":9991", false, 10, 1, Random)
})
t.Run("5-loop", func(t *testing.T) {
testServe("tcp", ":9992", false, 10, 5, LeastConnections)
})
t.Run("N-loop", func(t *testing.T) {
testServe("tcp", ":9993", false, 10, -1, RoundRobin)
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
testServe("tcp", ":9994", true, 10, 1, Random)
})
t.Run("5-loop", func(t *testing.T) {
testServe("tcp", ":9995", true, 10, 5, LeastConnections)
})
t.Run("N-loop", func(t *testing.T) {
testServe("tcp", ":9996", true, 10, -1, RoundRobin)
})
})
})
}
func testServe(network, addr string, unix bool, nclients int) {
var started bool
var connected int
var disconnected int
func testServe(network, addr string, unix bool, nclients, nloops int, balance LoadBalance) {
var started int32
var connected int32
var disconnected int32
var events Events
events.LoadBalance = balance
events.NumLoops = nloops
events.Serving = func(srv Server) (action Action) {
return
}
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
connected++
events.Opened = func(c Conn) (out []byte, opts Options, action Action) {
c.SetContext(c)
atomic.AddInt32(&connected, 1)
out = []byte("sweetness\r\n")
opts.TCPKeepAlive = time.Minute * 5
if info.LocalAddr == nil {
if c.LocalAddr() == nil {
panic("nil local addr")
}
if info.RemoteAddr == nil {
if c.RemoteAddr() == nil {
panic("nil local addr")
}
return
}
events.Closed = func(id int, err error) (action Action) {
disconnected++
if connected == disconnected && disconnected == nclients {
events.Closed = func(c Conn, err error) (action Action) {
if c.Context() != c {
panic("invalid context")
}
atomic.AddInt32(&disconnected, 1)
if atomic.LoadInt32(&connected) == atomic.LoadInt32(&disconnected) &&
atomic.LoadInt32(&disconnected) == int32(nclients) {
action = Shutdown
}
return
}
events.Data = func(id int, in []byte) (out []byte, action Action) {
events.Data = func(c Conn, in []byte) (out []byte, action Action) {
out = in
return
}
events.Tick = func() (delay time.Duration, action Action) {
if !started {
if atomic.LoadInt32(&started) == 0 {
for i := 0; i < nclients; i++ {
go startClient(network, addr)
go startClient(network, addr, nloops)
}
started = true
atomic.StoreInt32(&started, 1)
}
delay = time.Second / 5
return
@ -105,7 +139,8 @@ func testServe(network, addr string, unix bool, nclients int) {
}
}
func startClient(network, addr string) {
func startClient(network, addr string, nloops int) {
onetwork := network
network = strings.Replace(network, "-net", "", -1)
rand.Seed(time.Now().UnixNano())
c, err := net.Dial(network, addr)
@ -121,7 +156,7 @@ func startClient(network, addr string) {
if string(msg) != "sweetness\r\n" {
panic("bad header")
}
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 4
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 8
start := time.Now()
for time.Since(start) < duration {
sz := rand.Int() % (1024 * 1024)
@ -132,115 +167,17 @@ func startClient(network, addr string) {
if _, err := c.Write(data); err != nil {
panic(err)
}
data2 := make([]byte, sz)
data2 := make([]byte, len(data))
if _, err := io.ReadFull(rd, data2); err != nil {
panic(err)
}
if string(data) != string(data2) {
fmt.Printf("mismatch: %d bytes\n", len(data))
fmt.Printf("mismatch %s/%d: %d vs %d bytes\n", onetwork, nloops, len(data), len(data2))
//panic("mismatch")
}
}
}
func TestWake(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
testWake("tcp", ":9991", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testWake("tcp", ":9992", true)
}()
wg.Add(1)
go func() {
defer wg.Done()
testWake("unix", "socket1", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testWake("unix", "socket2", true)
}()
wg.Wait()
}
func testWake(network, addr string, stdlib bool) {
var events Events
var srv Server
events.Serving = func(srvin Server) (action Action) {
srv = srvin
go func() {
conn, err := net.Dial(network, addr)
must(err)
defer conn.Close()
rd := bufio.NewReader(conn)
for i := 0; i < 1000; i++ {
line := []byte(fmt.Sprintf("msg%d\r\n", i))
conn.Write(line)
data, err := rd.ReadBytes('\n')
must(err)
if string(data) != string(line) {
panic("msg mismatch")
}
}
}()
return
}
var cid int
var cout []byte
var cin []byte
var cclosed bool
var cond = sync.NewCond(&sync.Mutex{})
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
cid = id
return
}
events.Closed = func(id int, err error) (action Action) {
action = Shutdown
cond.L.Lock()
cclosed = true
cond.Broadcast()
cond.L.Unlock()
return
}
go func() {
cond.L.Lock()
for !cclosed {
if len(cin) > 0 {
cout = append(cout, cin...)
cin = nil
}
if len(cout) > 0 {
srv.Wake(cid)
}
cond.Wait()
}
cond.L.Unlock()
}()
events.Data = func(id int, in []byte) (out []byte, action Action) {
if in == nil {
cond.L.Lock()
out = cout
cout = nil
cond.L.Unlock()
} else {
cond.L.Lock()
cin = append(cin, in...)
cond.Broadcast()
cond.L.Unlock()
}
return
}
if stdlib {
must(Serve(events, network+"-net://"+addr))
} else {
must(Serve(events, network+"://"+addr))
}
}
func must(err error) {
if err != nil {
panic(err)
@ -323,11 +260,11 @@ func testShutdown(network, addr string, stdlib bool) {
var count int
var clients int64
var N = 10
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
events.Opened = func(c Conn) (out []byte, opts Options, action Action) {
atomic.AddInt64(&clients, 1)
return
}
events.Closed = func(id int, err error) (action Action) {
events.Closed = func(c Conn, err error) (action Action) {
atomic.AddInt64(&clients, -1)
return
}
@ -365,28 +302,22 @@ func testShutdown(network, addr string, stdlib bool) {
}
func TestDetach(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
testDetach("tcp", ":9991", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testDetach("tcp", ":9992", true)
}()
wg.Add(1)
go func() {
defer wg.Done()
testDetach("unix", "socket1", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testDetach("unix", "socket2", true)
}()
wg.Wait()
t.Run("poll", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
testDetach("tcp", ":9991", false)
})
t.Run("unix", func(t *testing.T) {
testDetach("unix", "socket1", false)
})
})
t.Run("stdlib", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
testDetach("tcp", ":9992", true)
})
t.Run("unix", func(t *testing.T) {
testDetach("unix", "socket2", true)
})
})
}
func testDetach(network, addr string, stdlib bool) {
@ -400,9 +331,9 @@ func testDetach(network, addr string, stdlib bool) {
expected := []byte(string(rdat) + "--detached--" + string(rdat))
var cin []byte
var events Events
events.Data = func(id int, in []byte) (out []byte, action Action) {
events.Data = func(c Conn, in []byte) (out []byte, action Action) {
cin = append(cin, in...)
if len(cin) == len(expected) {
if len(cin) >= len(expected) {
if string(cin) != string(expected) {
panic("mismatch client -> server")
}
@ -411,49 +342,31 @@ func testDetach(network, addr string, stdlib bool) {
return
}
//expected := "detached\r\n"
var done int64
events.Detached = func(id int, conn io.ReadWriteCloser) (action Action) {
events.Detached = func(c Conn, conn io.ReadWriteCloser) (action Action) {
go func() {
p := make([]byte, len(expected))
defer conn.Close()
// detached connection
n, err := conn.Write([]byte(expected))
_, err := io.ReadFull(conn, p)
must(err)
if n != len(expected) {
panic("not enough data written")
}
conn.Write(expected)
}()
return
}
events.Serving = func(srv Server) (action Action) {
go func() {
// client connection
p := make([]byte, len(expected))
_ = expected
conn, err := net.Dial(network, addr)
must(err)
defer conn.Close()
_, err = conn.Write(expected)
conn.Write(expected)
_, err = io.ReadFull(conn, p)
must(err)
// read from the attached response
packet := make([]byte, len(expected))
time.Sleep(time.Second / 3)
_, err = io.ReadFull(conn, packet)
conn.Write(expected)
_, err = io.ReadFull(conn, p)
must(err)
if string(packet) != string(expected) {
panic("mismatch server -> client 1")
}
// read from the detached response
time.Sleep(time.Second / 3)
_, err = io.ReadFull(conn, packet)
must(err)
if string(packet) != string(expected) {
panic("mismatch server -> client 2")
}
time.Sleep(time.Second / 3)
_, err = conn.Read([]byte{0})
if err == nil {
panic("expected nil, got '" + err.Error() + "'")
}
atomic.StoreInt64(&done, 1)
}()
return
@ -507,237 +420,16 @@ func TestInputStream(t *testing.T) {
}
}
func TestPrePostwrite(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
testPrePostwrite("tcp", ":9991", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testPrePostwrite("tcp", ":9992", true)
}()
wg.Add(1)
go func() {
defer wg.Done()
testPrePostwrite("unix", "socket1", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testPrePostwrite("unix", "socket2", true)
}()
wg.Wait()
}
func testPrePostwrite(network, addr string, stdlib bool) {
var events Events
var srv Server
var packets int
var tout []byte
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
packets++
out = []byte(fmt.Sprintf("hello %d\r\n", packets))
tout = append(tout, out...)
srv.Wake(id)
return
}
events.Data = func(id int, in []byte) (out []byte, action Action) {
packets++
out = []byte(fmt.Sprintf("hello %d\r\n", packets))
tout = append(tout, out...)
srv.Wake(id)
return
}
events.Prewrite = func(id int, amount int) (action Action) {
if amount != len(tout) {
panic("invalid prewrite amount")
}
return
}
events.Postwrite = func(id int, amount, remaining int) (action Action) {
tout = tout[amount:]
if remaining != len(tout) {
panic("invalid postwrite amount")
}
return
}
events.Closed = func(id int, err error) (action Action) {
action = Shutdown
return
}
events.Serving = func(srvin Server) (action Action) {
srv = srvin
go func() {
conn, err := net.Dial(network, addr)
must(err)
defer conn.Close()
rd := bufio.NewReader(conn)
for i := 0; i < 1000; i++ {
line, err := rd.ReadBytes('\n')
must(err)
ex := fmt.Sprintf("hello %d\r\n", i+1)
if string(line) != ex {
panic(fmt.Sprintf("expected '%v', got '%v'", ex, line))
}
}
}()
return
}
if stdlib {
must(Serve(events, network+"-net://"+addr))
} else {
must(Serve(events, network+"://"+addr))
}
}
func TestTranslate(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
testTranslate("tcp", ":9991", "passthrough", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testTranslate("tcp", ":9992", "passthrough", true)
}()
wg.Add(1)
go func() {
defer wg.Done()
testTranslate("unix", "socket1", "passthrough", false)
}()
wg.Add(1)
go func() {
defer wg.Done()
testTranslate("unix", "socket2", "passthrough", true)
}()
wg.Wait()
}
func testTranslate(network, addr string, kind string, stdlib bool) {
var events Events
events.Data = func(id int, in []byte) (out []byte, action Action) {
out = in
return
}
events.Closed = func(id int, err error) (action Action) {
action = Shutdown
return
}
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
out = []byte("sweetness\r\n")
return
}
events.Serving = func(srv Server) (action Action) {
go func() {
conn, err := net.Dial(network, addr)
must(err)
defer conn.Close()
line := "sweetness\r\n"
packet := make([]byte, len(line))
n, err := io.ReadFull(conn, packet)
must(err)
if n != len(line) {
panic("invalid amount")
}
if string(packet) != string(line) {
panic(fmt.Sprintf("expected '%v', got '%v'\n", line, packet))
}
for i := 0; i < 100; i++ {
line := fmt.Sprintf("hello %d\r\n", i)
n, err := conn.Write([]byte(line))
must(err)
if n != len(line) {
panic("invalid amount")
}
packet := make([]byte, len(line))
n, err = io.ReadFull(conn, packet)
must(err)
if n != len(line) {
panic("invalid amount")
}
if string(packet) != string(line) {
panic(fmt.Sprintf("expected '%v', got '%v'\n", line, packet))
}
}
}()
return
}
tevents := Translate(events,
func(id int, info Info) bool {
return true
},
func(id int, rw io.ReadWriter) io.ReadWriter {
switch kind {
case "passthrough":
return rw
}
panic("invalid kind")
},
)
if stdlib {
must(Serve(tevents, network+"-net://"+addr))
} else {
must(Serve(tevents, network+"://"+addr))
}
// test with no shoulds
tevents = Translate(events,
func(id int, info Info) bool {
return false
},
func(id int, rw io.ReadWriter) io.ReadWriter {
return rw
},
)
if stdlib {
must(Serve(tevents, network+"-net://"+addr))
} else {
must(Serve(tevents, network+"://"+addr))
}
}
// func TestVariousAddr(t *testing.T) {
// var events Events
// var kind string
// events.Serving = func(wake func(id int) bool, addrs []net.Addr) (action Action) {
// addr := addrs[0].(*net.TCPAddr)
// if (kind == "tcp4" && len(addr.IP) != 4) || (kind == "tcp6" && len(addr.IP) != 16) {
// println(len(addr.IP))
// panic("invalid ip")
// }
// go func(kind string) {
// conn, err := net.Dial(kind, ":9991")
// must(err)
// defer conn.Close()
// }(kind)
// return
// }
// events.Closed = func(id int, err error) (action Action) {
// return Shutdown
// }
// kind = "tcp4"
// must(Serve(events, "tcp4://:9991"))
// kind = "tcp6"
// must(Serve(events, "tcp6://:9991"))
// }
func TestReuseInputBuffer(t *testing.T) {
reuses := []bool{true, false}
for _, reuse := range reuses {
var events Events
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
events.Opened = func(c Conn) (out []byte, opts Options, action Action) {
opts.ReuseInputBuffer = reuse
return
}
var prev []byte
events.Data = func(id int, in []byte) (out []byte, action Action) {
events.Data = func(c Conn, in []byte) (out []byte, action Action) {
if prev == nil {
prev = in
} else {
@ -764,3 +456,23 @@ func TestReuseInputBuffer(t *testing.T) {
}
}
func TestReuseport(t *testing.T) {
var events Events
events.Serving = func(s Server) (action Action) {
return Shutdown
}
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
var t = "1"
if i%2 == 0 {
t = "true"
}
go func(t string) {
defer wg.Done()
must(Serve(events, "tcp://:9991?reuseport="+t))
}(t)
}
wg.Wait()
}

View File

@ -1,262 +0,0 @@
// Copyright 2017 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package evio
import (
"io"
"net"
"sync"
"time"
)
type tconn struct {
cond [2]*sync.Cond // stream locks
closed [2]bool // init to -1. when it reaches zero we're closed
prebuf [2][]byte // buffers before translation
postbuf [2][]byte // buffers after translate
rd [2]io.ReadCloser // reader pipes
wr [2]io.Writer // writer pipes
mu sync.Mutex // only for the error
action Action // the last known action
err error // the final error if any
}
func (c *tconn) write(st int, b []byte) {
c.cond[st].L.Lock()
c.prebuf[st] = append(c.prebuf[st], b...)
c.cond[st].Broadcast()
c.cond[st].L.Unlock()
}
func (c *tconn) read(st int) []byte {
c.cond[st].L.Lock()
buf := c.postbuf[st]
c.postbuf[st] = nil
c.cond[st].L.Unlock()
return buf
}
func (c *tconn) Read(p []byte) (n int, err error) { return c.rd[0].Read(p) }
func (c *tconn) Write(p []byte) (n int, err error) { return c.wr[1].Write(p) }
// nopConn just wraps a io.ReadWriter and makes it into a net.Conn.
type nopConn struct{ io.ReadWriter }
func (c *nopConn) Read(p []byte) (n int, err error) { return c.ReadWriter.Read(p) }
func (c *nopConn) Write(p []byte) (n int, err error) { return c.ReadWriter.Write(p) }
func (c *nopConn) LocalAddr() net.Addr { return nil }
func (c *nopConn) RemoteAddr() net.Addr { return nil }
func (c *nopConn) SetDeadline(deadline time.Time) error { return nil }
func (c *nopConn) SetWriteDeadline(deadline time.Time) error { return nil }
func (c *nopConn) SetReadDeadline(deadline time.Time) error { return nil }
func (c *nopConn) Close() error { return nil }
// NopConn returns a net.Conn with a no-op LocalAddr, RemoteAddr,
// SetDeadline, SetWriteDeadline, SetReadDeadline, and Close methods wrapping
// the provided ReadWriter rw.
func NopConn(rw io.ReadWriter) net.Conn {
return &nopConn{rw}
}
// Translate provides a utility for performing byte level translation on the
// input and output streams for a connection. This is useful for things like
// compression, encryption, TLS, etc. The function wraps existing events and
// returns new events that manage the translation. The `should` parameter is
// an optional function that can be used to ignore or accept the translation
// for a specific connection. The `translate` parameter is a function that
// provides a ReadWriter for each new connection and returns a ReadWriter
// that performs the actual translation.
func Translate(
events Events,
should func(id int, info Info) bool,
translate func(id int, rd io.ReadWriter) io.ReadWriter,
) Events {
tevents := events
var ctx Server
var mu sync.Mutex
idc := make(map[int]*tconn)
get := func(id int) *tconn {
mu.Lock()
c := idc[id]
mu.Unlock()
return c
}
create := func(id int) *tconn {
mu.Lock()
c := &tconn{
cond: [2]*sync.Cond{
sync.NewCond(&sync.Mutex{}),
sync.NewCond(&sync.Mutex{}),
},
}
idc[id] = c
mu.Unlock()
tc := translate(id, c)
for st := 0; st < 2; st++ {
c.rd[st], c.wr[st] = io.Pipe()
var rd io.Reader
var wr io.Writer
if st == 0 {
rd = tc
wr = c.wr[0]
} else {
rd = c.rd[1]
wr = tc
}
go func(st int, rd io.Reader, wr io.Writer) {
c.cond[st].L.Lock()
for {
if c.closed[st] {
break
}
if len(c.prebuf[st]) > 0 {
buf := c.prebuf[st]
c.prebuf[st] = nil
c.cond[st].L.Unlock()
n, err := wr.Write(buf)
if err != nil {
return
}
c.cond[st].L.Lock()
if n > 0 {
c.prebuf[st] = append(buf[n:], c.prebuf[st]...)
}
continue
}
c.cond[st].Wait()
}
c.cond[st].L.Unlock()
}(st, rd, wr)
go func(st int, wr io.Writer) {
var ferr error
defer func() {
if ferr != nil {
c.mu.Lock()
if c.err == nil {
c.err = ferr
}
c.mu.Unlock()
}
}()
var packet [2048]byte
for {
n, err := rd.Read(packet[:])
if err != nil {
if err != io.EOF && err != io.ErrClosedPipe {
ferr = err
}
return
}
c.cond[st].L.Lock()
c.postbuf[st] = append(c.postbuf[st], packet[:n]...)
c.cond[st].L.Unlock()
ctx.Wake(id)
}
}(st, wr)
}
return c
}
destroy := func(c *tconn, id int) error {
for st := 0; st < 2; st++ {
if rd, ok := c.rd[st].(io.Closer); ok {
rd.Close()
}
if wr, ok := c.wr[st].(io.Closer); ok {
wr.Close()
}
c.cond[st].L.Lock()
c.closed[st] = true
c.cond[st].Broadcast()
c.cond[st].L.Unlock()
}
mu.Lock()
delete(idc, id)
mu.Unlock()
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
tevents.Serving = func(ctxin Server) (action Action) {
ctx = ctxin
if events.Serving != nil {
action = events.Serving(ctx)
}
return
}
tevents.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
if should != nil && !should(id, info) {
if events.Opened != nil {
out, opts, action = events.Opened(id, info)
}
return
}
c := create(id)
if events.Opened != nil {
out, opts, c.action = events.Opened(id, info)
if len(out) > 0 {
c.write(1, out)
out = nil
ctx.Wake(id)
}
}
return
}
tevents.Closed = func(id int, err error) (action Action) {
c := get(id)
if c != nil {
ferr := destroy(c, id)
if err == nil {
err = ferr
}
}
if events.Closed != nil {
action = events.Closed(id, err)
}
return
}
tevents.Data = func(id int, in []byte) (out []byte, action Action) {
c := get(id)
if c == nil {
if events.Data != nil {
out, action = events.Data(id, in)
}
return
}
if in == nil {
// wake up
out = c.read(1)
if len(out) > 0 {
ctx.Wake(id)
return
}
if c.action != None {
return nil, c.action
}
in = c.read(0)
if len(in) > 0 {
if events.Data != nil {
out, c.action = events.Data(id, in)
if len(out) > 0 {
c.write(1, out)
out = nil
}
ctx.Wake(id)
}
return
}
} else if len(in) > 0 {
if c.action != None {
return nil, c.action
}
// accept new input data
c.write(0, in)
in = nil
}
return
}
return tevents
}

490
evio_unix.go Normal file
View File

@ -0,0 +1,490 @@
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
// +build darwin netbsd freebsd openbsd dragonfly linux
package evio
import (
"net"
"os"
"runtime"
"sync"
"sync/atomic"
"syscall"
"time"
reuseport "github.com/kavu/go_reuseport"
"github.com/tidwall/evio/internal"
)
type conn struct {
fd int // file descriptor
lnidx int // listener index in the server lns list
loopidx int // owner loop
out []byte // write buffer
sa syscall.Sockaddr // remote socket address
reuse bool // should reuse input buffer
opened bool // connection opened event fired
action Action // next user action
ctx interface{} // user-defined context
addrIndex int
localAddr net.Addr
remoteAddr net.Addr
}
func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) AddrIndex() int { return c.addrIndex }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr }
type server struct {
events Events // user events
loops []*loop // all the loops
lns []*listener // all the listeners
wg sync.WaitGroup // loop close waitgroup
cond *sync.Cond // shutdown signaler
balance LoadBalance // load balancing method
accepted uintptr // accept counter
tch chan time.Duration // ticker channel
//ticktm time.Time // next tick time
}
type loop struct {
idx int // loop index in the server loops list
poll *internal.Poll // epoll or kqueue
packet []byte // read packet buffer
fdconns map[int]*conn // loop connections fd -> conn
count int32 // connection count
}
// waitForShutdown waits for a signal to shutdown
func (s *server) waitForShutdown() {
s.cond.L.Lock()
s.cond.Wait()
s.cond.L.Unlock()
}
// signalShutdown signals a shutdown an begins server closing
func (s *server) signalShutdown() {
s.cond.L.Lock()
s.cond.Signal()
s.cond.L.Unlock()
}
func serve(events Events, listeners []*listener) error {
// figure out the correct number of loops/goroutines to use.
numLoops := events.NumLoops
if numLoops <= 0 {
if numLoops == 0 {
numLoops = 1
} else {
numLoops = runtime.NumCPU()
}
}
s := &server{}
s.events = events
s.lns = listeners
s.cond = sync.NewCond(&sync.Mutex{})
s.balance = events.LoadBalance
s.tch = make(chan time.Duration)
//println("-- server starting")
if s.events.Serving != nil {
var svr Server
svr.NumLoops = numLoops
svr.Addrs = make([]net.Addr, len(listeners))
for i, ln := range listeners {
svr.Addrs[i] = ln.lnaddr
}
action := s.events.Serving(svr)
switch action {
case None:
case Shutdown:
return nil
}
}
defer func() {
// wait on a signal for shutdown
s.waitForShutdown()
// notify all loops to close by closing all listeners
for _, l := range s.loops {
l.poll.Trigger(errClosing)
}
// wait on all loops to complete reading events
s.wg.Wait()
// close loops and all outstanding connections
for _, l := range s.loops {
for _, c := range l.fdconns {
loopCloseConn(s, l, c, nil)
}
l.poll.Close()
}
//println("-- server stopped")
}()
// create loops locally and bind the listeners.
for i := 0; i < numLoops; i++ {
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF),
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd)
}
s.loops = append(s.loops, l)
}
// start loops in background
s.wg.Add(len(s.loops))
for _, l := range s.loops {
go loopRun(s, l)
}
return nil
}
func loopCloseConn(s *server, l *loop, c *conn, err error) error {
atomic.AddInt32(&l.count, -1)
delete(l.fdconns, c.fd)
syscall.Close(c.fd)
if s.events.Closed != nil {
switch s.events.Closed(c, err) {
case None:
case Shutdown:
return errClosing
}
}
return nil
}
func loopDetachConn(s *server, l *loop, c *conn, err error) error {
if s.events.Detached == nil {
return loopCloseConn(s, l, c, err)
}
l.poll.ModDetach(c.fd)
atomic.AddInt32(&l.count, -1)
delete(l.fdconns, c.fd)
if err := syscall.SetNonblock(c.fd, false); err != nil {
return err
}
switch s.events.Detached(c, &detachedConn{fd: c.fd}) {
case None:
case Shutdown:
return errClosing
}
return nil
}
func loopNote(s *server, l *loop, note interface{}) error {
var err error
switch v := note.(type) {
case time.Duration:
delay, action := s.events.Tick()
switch action {
case None:
case Shutdown:
err = errClosing
}
s.tch <- delay
case error: // shutdown
err = v
}
return err
}
func loopRun(s *server, l *loop) {
defer func() {
//fmt.Println("-- loop stopped --", l.idx)
s.signalShutdown()
s.wg.Done()
}()
if l.idx == 0 && s.events.Tick != nil {
go loopTicker(s, l)
}
//fmt.Println("-- loop started --", l.idx)
l.poll.Wait(func(fd int, note interface{}) error {
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd)
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c)
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c)
}
})
}
func loopTicker(s *server, l *loop) {
for {
if err := l.poll.Trigger(time.Duration(0)); err != nil {
break
}
time.Sleep(<-s.tch)
}
}
func loopAccept(s *server, l *loop, fd int) error {
for i, ln := range s.lns {
if ln.fd == fd {
if len(s.loops) > 1 {
switch s.balance {
case LeastConnections:
n := atomic.LoadInt32(&l.count)
for _, lp := range s.loops {
if lp.idx != l.idx {
if atomic.LoadInt32(&lp.count) < n {
return nil // do not accept
}
}
}
case RoundRobin:
if int(atomic.LoadUintptr(&s.accepted))%len(s.loops) != l.idx {
return nil // do not accept
}
atomic.AddUintptr(&s.accepted, 1)
}
}
if ln.pconn != nil {
return loopUDPRead(s, l, i, fd)
}
nfd, sa, err := syscall.Accept(fd)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return err
}
if err := syscall.SetNonblock(nfd, true); err != nil {
return err
}
c := &conn{fd: nfd, sa: sa, lnidx: i}
l.fdconns[c.fd] = c
l.poll.AddReadWrite(c.fd)
atomic.AddInt32(&l.count, 1)
break
}
}
return nil
}
func loopUDPRead(s *server, l *loop, lnidx, fd int) error {
n, sa, err := syscall.Recvfrom(fd, l.packet, 0)
if err != nil || n == 0 {
return nil
}
if s.events.Data != nil {
var sa6 syscall.SockaddrInet6
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
sa6.ZoneId = 0
sa6.Port = sa.Port
for i := 0; i < 12; i++ {
sa6.Addr[i] = 0
}
sa6.Addr[12] = sa.Addr[0]
sa6.Addr[13] = sa.Addr[1]
sa6.Addr[14] = sa.Addr[2]
sa6.Addr[15] = sa.Addr[3]
case *syscall.SockaddrInet6:
sa6 = *sa
}
c := &conn{}
c.addrIndex = lnidx
c.localAddr = s.lns[lnidx].lnaddr
c.remoteAddr = internal.SockaddrToAddr(&sa6)
in := append([]byte{}, l.packet[:n]...)
out, action := s.events.Data(c, in)
if len(out) > 0 {
syscall.Sendto(fd, out, 0, sa)
}
switch action {
case Shutdown:
return errClosing
}
}
return nil
}
func loopOpened(s *server, l *loop, c *conn) error {
c.opened = true
c.addrIndex = c.lnidx
c.localAddr = s.lns[c.lnidx].lnaddr
c.remoteAddr = internal.SockaddrToAddr(c.sa)
if s.events.Opened != nil {
out, opts, action := s.events.Opened(c)
if len(out) > 0 {
c.out = append([]byte{}, out...)
}
c.action = action
c.reuse = opts.ReuseInputBuffer
if opts.TCPKeepAlive > 0 {
if _, ok := s.lns[c.lnidx].ln.(*net.TCPListener); ok {
internal.SetKeepAlive(c.fd, int(opts.TCPKeepAlive/time.Second))
}
}
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
func loopWrite(s *server, l *loop, c *conn) error {
n, err := syscall.Write(c.fd, c.out)
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
if n == len(c.out) {
c.out = nil
} else {
c.out = c.out[n:]
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
func loopAction(s *server, l *loop, c *conn) error {
switch c.action {
default:
c.action = None
case Close:
return loopCloseConn(s, l, c, nil)
case Shutdown:
return errClosing
case Detach:
return loopDetachConn(s, l, c, nil)
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd)
}
return nil
}
func loopRead(s *server, l *loop, c *conn) error {
var in []byte
n, err := syscall.Read(c.fd, l.packet)
if n == 0 || err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
in = l.packet[:n]
if !c.reuse {
in = append([]byte{}, in...)
}
if s.events.Data != nil {
out, action := s.events.Data(c, in)
c.action = action
if len(out) > 0 {
c.out = append([]byte{}, out...)
}
}
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd)
}
return nil
}
type detachedConn struct {
fd int
}
func (c *detachedConn) Close() error {
err := syscall.Close(c.fd)
if err != nil {
return err
}
c.fd = -1
return nil
}
func (c *detachedConn) Read(p []byte) (n int, err error) {
return syscall.Read(c.fd, p)
}
func (c *detachedConn) Write(p []byte) (n int, err error) {
n = len(p)
for len(p) > 0 {
nn, err := syscall.Write(c.fd, p)
if err != nil {
return n, err
}
p = p[nn:]
}
return n, nil
}
func (ln *listener) close() {
if ln.fd != 0 {
syscall.Close(ln.fd)
}
if ln.f != nil {
ln.f.Close()
}
if ln.ln != nil {
ln.ln.Close()
}
if ln.pconn != nil {
ln.pconn.Close()
}
if ln.network == "unix" {
os.RemoveAll(ln.addr)
}
}
// system takes the net listener and detaches it from it's parent
// event loop, grabs the file descriptor, and makes it non-blocking.
func (ln *listener) system() error {
var err error
switch netln := ln.ln.(type) {
case nil:
switch pconn := ln.pconn.(type) {
case *net.UDPConn:
ln.f, err = pconn.File()
}
case *net.TCPListener:
ln.f, err = netln.File()
case *net.UnixListener:
ln.f, err = netln.File()
}
if err != nil {
ln.close()
return err
}
ln.fd = int(ln.f.Fd())
return syscall.SetNonblock(ln.fd, true)
}
func reuseportListenPacket(proto, addr string) (l net.PacketConn, err error) {
return reuseport.ListenPacket(proto, addr)
}
func reuseportListen(proto, addr string) (l net.Listener, err error) {
return reuseport.Listen(proto, addr)
}

View File

@ -1,37 +0,0 @@
# `evio examples`
## echo-server
Runs on port 5000
```
go run examples/echo-server/main.go
```
Connect with telnet and start entering text.
```
telnet localhost 5000
```
## http-server
Runs on port 8080
```
go run examples/http-server/main.go
```
Browse to http://localhost:8080.
All requests print `Hello World!`.
## redis-server
Runs on port 6380
```
go run examples/redis-server/main.go
```
- `GET`, `SET`, `DEL`, `FLUSHDB`, `QUIT`, `PING`, `ECHO`, `SHUTDOWN` commands.
- Compatible with the [redis-cli](https://redis.io/topics/rediscli) and [redis clients](https://redis.io/clients).

View File

@ -15,32 +15,33 @@ import (
func main() {
var port int
var loops int
var udp bool
var trace bool
var reuseport bool
var stdlib bool
flag.IntVar(&port, "port", 5000, "server port")
flag.BoolVar(&udp, "udp", false, "listen on udp")
flag.BoolVar(&reuseport, "reuseport", false, "reuseport (SO_REUSEPORT)")
flag.BoolVar(&trace, "trace", false, "print packets to console")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.Parse()
var events evio.Events
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("echo server started on port %d", port)
log.Printf("echo server started on port %d (loops: %d)", port, srv.NumLoops)
if reuseport {
log.Printf("reuseport")
}
if stdlib {
log.Printf("stdlib")
}
return
}
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
// log.Printf("opened: %d: %+v", id, info)
return
}
events.Closed = func(id int, err error) (action evio.Action) {
// log.Printf("closed: %d", id)
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
if trace {
log.Printf("%s", strings.TrimSpace(string(in)))
}
@ -51,5 +52,8 @@ func main() {
if udp {
scheme = "udp"
}
if stdlib {
scheme += "-net"
}
log.Fatal(evio.Serve(events, fmt.Sprintf("%s://:%d?reuseport=%t", scheme, port, reuseport)))
}

View File

@ -1,26 +0,0 @@
-----BEGIN CERTIFICATE-----
MIICojCCAiqgAwIBAgIJAL9ZO0MfICMyMAkGByqGSM49BAEwWTELMAkGA1UEBhMC
QVUxEzARBgNVBAgTClNvbWUtU3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdp
dHMgUHR5IEx0ZDESMBAGA1UEAxMJbG9jYWxob3N0MB4XDTE3MTAzMTE4NTU1OFoX
DTI3MTAyOTE4NTU1OFowWTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3Rh
dGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAxMJ
bG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EEACIDYgAE5F+nXSPlox5HPeLy2U0/
Two7iBkjJi8Ak4l7ci4p4c+fheQQo16IRJ+G4K/yGYuW92zapMBy9/t7vvd9lkN1
fy5q6918zsoy6Oke+qYMHFils7f+wjo+SqdzgQG//4DZo4G+MIG7MB0GA1UdDgQW
BBRiYEEXU/1olC+NtGerCkxOXDe6rTCBiwYDVR0jBIGDMIGAgBRiYEEXU/1olC+N
tGerCkxOXDe6raFdpFswWTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUtU3Rh
dGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDESMBAGA1UEAxMJ
bG9jYWxob3N0ggkAv1k7Qx8gIzIwDAYDVR0TBAUwAwEB/zAJBgcqhkjOPQQBA2cA
MGQCMEAowFSCgYBQrwXWvvqe784MBbWYUlUQ5A0+YUxutOTa8sO73/YXGtOqB83N
hLGslwIwK1AmRzgLRSSxuYgdWmJSDIfGnA9tF+/XQTm/5gIO8mSSQnxyKn7p9Tkl
MUU79C4P
-----END CERTIFICATE-----
-----BEGIN EC PARAMETERS-----
BgUrgQQAIg==
-----END EC PARAMETERS-----
-----BEGIN EC PRIVATE KEY-----
MIGkAgEBBDA7hpUYjCV8msn5MPK9k51/xHU+nMXHE5hc+wwYmgawiq0wsYTRJcIY
XWN7CONOneegBwYFK4EEACKhZANiAATkX6ddI+WjHkc94vLZTT9PCjuIGSMmLwCT
iXtyLinhz5+F5BCjXohEn4bgr/IZi5b3bNqkwHL3+3u+932WQ3V/Lmrr3XzOyjLo
6R76pgwcWKWzt/7COj5Kp3OBAb//gNk=
-----END EC PRIVATE KEY-----

View File

@ -6,10 +6,8 @@ package main
import (
"bytes"
"crypto/tls"
"flag"
"fmt"
"io"
"log"
"os"
"strconv"
@ -28,26 +26,20 @@ type request struct {
remoteAddr string
}
type conn struct {
info evio.Info
is evio.InputStream
}
func main() {
var port int
var tlsport int
var tlspem string
var loops int
var aaaa bool
var noparse bool
var unixsocket string
var stdlib bool
flag.StringVar(&unixsocket, "unixsocket", "", "unix socket")
flag.IntVar(&port, "port", 8080, "server port")
flag.IntVar(&tlsport, "tlsport", 4443, "tls port")
flag.StringVar(&tlspem, "tlscert", "", "tls pem cert/key file")
flag.BoolVar(&aaaa, "aaaa", false, "aaaaa....")
flag.BoolVar(&noparse, "noparse", true, "do not parse requests")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.Parse()
if os.Getenv("NOPARSE") == "1" {
@ -61,13 +53,9 @@ func main() {
}
var events evio.Events
var conns = make(map[int]*conn)
events.Serving = func(server evio.Server) (action evio.Action) {
log.Printf("http server started on port %d", port)
if tlspem != "" {
log.Printf("https server started on port %d", tlsport)
}
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("http server started on port %d (loops: %d)", port, srv.NumLoops)
if unixsocket != "" {
log.Printf("http server started at %s", unixsocket)
}
@ -77,29 +65,23 @@ func main() {
return
}
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
conns[id] = &conn{info: info}
log.Printf("opened: %d: laddr: %v: raddr: %v", id, info.LocalAddr, info.RemoteAddr)
// println(info.LocalAddr.(*net.TCPAddr).Zone)
// fmt.Printf("%#v\n", info.LocalAddr)
// fmt.Printf("%#v\n", (&net.TCPAddr{IP: make([]byte, 16)}))
events.Opened = func(c evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
c.SetContext(&evio.InputStream{})
//log.Printf("opened: laddr: %v: raddr: %v", c.LocalAddr(), c.RemoteAddr())
return
}
events.Closed = func(id int, err error) (action evio.Action) {
c := conns[id]
log.Printf("closed: %d: %s: %s", id, c.info.LocalAddr.String(), c.info.RemoteAddr.String())
delete(conns, id)
events.Closed = func(c evio.Conn, err error) (action evio.Action) {
//log.Printf("closed: %s: %s", c.LocalAddr().String(), c.RemoteAddr().String())
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
if in == nil {
return
}
c := conns[id]
data := c.is.Begin(in)
is := c.Context().(*evio.InputStream)
data := is.Begin(in)
if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
// for testing minimal single packet request -> response.
out = appendresp(nil, "200 OK", "", res)
@ -119,11 +101,11 @@ func main() {
break
}
// handle the request
req.remoteAddr = c.info.RemoteAddr.String()
req.remoteAddr = c.RemoteAddr().String()
out = appendhandle(out, &req)
data = leftover
}
c.is.End(data)
is.End(data)
return
}
var ssuf string
@ -132,31 +114,6 @@ func main() {
}
// We at least want the single http address.
addrs := []string{fmt.Sprintf("tcp"+ssuf+"://:%d", port)}
if tlspem != "" {
// load the cert and key pair from the concat'd pem file.
cer, err := tls.LoadX509KeyPair(tlspem, tlspem)
if err != nil {
log.Fatal(err)
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
// Update the address list to include https.
addrs = append(addrs, fmt.Sprintf("tcp"+ssuf+"://:%d", tlsport))
// TLS translate the events
events = evio.Translate(events,
func(id int, info evio.Info) bool {
// only translate for the second address.
return info.AddrIndex == 1
},
func(id int, rw io.ReadWriter) io.ReadWriter {
// Use the standard Go crypto/tls package and create a tls.Conn
// from the provided io.ReadWriter. Here we use the handy
// evio.NopConn utility to create a barebone net.Conn in order
// for the tls.Server to accept the connection.
return tls.Server(evio.NopConn(rw), config)
},
)
}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix"+ssuf+"://%s", unixsocket))
}

View File

@ -8,9 +8,8 @@ import (
"flag"
"fmt"
"log"
"strconv"
"strings"
"time"
"sync"
"github.com/tidwall/evio"
"github.com/tidwall/redcon"
@ -19,73 +18,54 @@ import (
type conn struct {
is evio.InputStream
addr string
wget bool
}
func main() {
var port int
var unixsocket string
var stdlib bool
var loops int
var balance string
flag.IntVar(&port, "port", 6380, "server port")
flag.IntVar(&loops, "loops", 0, "num loops")
flag.StringVar(&unixsocket, "unixsocket", "socket", "unix socket")
flag.StringVar(&balance, "balance", "random", "random, round-robin, least-connections")
flag.BoolVar(&stdlib, "stdlib", false, "use stdlib")
flag.Parse()
var srv evio.Server
var conns = make(map[int]*conn)
var mu sync.RWMutex
var keys = make(map[string]string)
var events evio.Events
events.Serving = func(srvin evio.Server) (action evio.Action) {
srv = srvin
log.Printf("redis server started on port %d", port)
switch balance {
default:
log.Fatalf("invalid -balance flag: '%v'", balance)
case "random":
events.LoadBalance = evio.Random
case "round-robin":
events.LoadBalance = evio.RoundRobin
case "least-connections":
events.LoadBalance = evio.LeastConnections
}
events.NumLoops = loops
events.Serving = func(srv evio.Server) (action evio.Action) {
log.Printf("redis server started on port %d (loops: %d)", port, srv.NumLoops)
if unixsocket != "" {
log.Printf("redis server started at %s", unixsocket)
log.Printf("redis server started at %s (loops: %d)", unixsocket, srv.NumLoops)
}
if stdlib {
log.Printf("stdlib")
}
return
}
wgetids := make(map[int]time.Time)
events.Opened = func(id int, info evio.Info) (out []byte, opts evio.Options, action evio.Action) {
c := &conn{}
if !wgetids[id].IsZero() {
delete(wgetids, id)
c.wget = true
}
conns[id] = c
if c.wget {
log.Printf("opened: %d, wget: %t, laddr: %v, laddr: %v", id, c.wget, info.LocalAddr, info.RemoteAddr)
}
if c.wget {
out = []byte("GET / HTTP/1.0\r\n\r\n")
}
events.Opened = func(ec evio.Conn) (out []byte, opts evio.Options, action evio.Action) {
ec.SetContext(&conn{})
return
}
events.Tick = func() (delay time.Duration, action evio.Action) {
now := time.Now()
for id, t := range wgetids {
if now.Sub(t) > time.Second {
srv.Wake(id)
}
}
delay = time.Second
events.Closed = func(ec evio.Conn, err error) (action evio.Action) {
return
}
events.Closed = func(id int, err error) (action evio.Action) {
c := conns[id]
if c.wget {
fmt.Printf("closed %d %v\n", id, err)
}
delete(conns, id)
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
c := conns[id]
if c.wget {
print(string(in))
return
}
events.Data = func(ec evio.Conn, in []byte) (out []byte, action evio.Action) {
c := ec.Context().(*conn)
data := c.is.Begin(in)
var n int
var complete bool
@ -106,19 +86,6 @@ func main() {
switch strings.ToUpper(string(args[0])) {
default:
out = redcon.AppendError(out, "ERR unknown command '"+string(args[0])+"'")
case "WGET":
if len(args) != 3 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else {
n, _ := strconv.ParseInt(string(args[2]), 10, 63)
cid := srv.Dial("tcp://"+string(args[1]), time.Duration(n)*time.Second)
if cid == 0 {
out = redcon.AppendError(out, "failed to dial")
} else {
wgetids[cid] = time.Now()
out = redcon.AppendOK(out)
}
}
case "PING":
if len(args) > 2 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
@ -136,7 +103,6 @@ func main() {
case "SHUTDOWN":
out = redcon.AppendString(out, "OK")
action = evio.Shutdown
case "QUIT":
out = redcon.AppendString(out, "OK")
action = evio.Close
@ -144,7 +110,10 @@ func main() {
if len(args) != 2 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else {
val, ok := keys[string(args[1])]
key := string(args[1])
mu.Lock()
val, ok := keys[key]
mu.Unlock()
if !ok {
out = redcon.AppendNull(out)
} else {
@ -155,7 +124,10 @@ func main() {
if len(args) != 3 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else {
keys[string(args[1])] = string(args[2])
key, val := string(args[1]), string(args[2])
mu.Lock()
keys[key] = val
mu.Unlock()
out = redcon.AppendString(out, "OK")
}
case "DEL":
@ -163,16 +135,20 @@ func main() {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else {
var n int
mu.Lock()
for i := 1; i < len(args); i++ {
if _, ok := keys[string(args[1])]; ok {
if _, ok := keys[string(args[i])]; ok {
n++
delete(keys, string(args[1]))
delete(keys, string(args[i]))
}
}
mu.Unlock()
out = redcon.AppendInt(out, int64(n))
}
case "FLUSHDB":
mu.Lock()
keys = make(map[string]string)
mu.Unlock()
out = redcon.AppendString(out, "OK")
}
}

View File

@ -8,76 +8,118 @@ package internal
import (
"syscall"
"time"
)
func AddRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if *readon {
return nil
}
*readon = true
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ}},
nil, nil)
return err
// Poll ...
type Poll struct {
fd int
changes []syscall.Kevent_t
notes noteQueue
}
func DelRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if !*readon {
return nil
}
*readon = false
// OpenPoll ...
func OpenPoll() *Poll {
l := new(Poll)
p, err := syscall.Kqueue()
if err != nil {
panic(err)
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ}},
nil, nil)
l.fd = p
_, err = syscall.Kevent(l.fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
}}, nil, nil)
if err != nil {
panic(err)
}
return l
}
// Close ...
func (p *Poll) Close() error {
return syscall.Close(p.fd)
}
// Trigger ...
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
_, err := syscall.Kevent(p.fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Fflags: syscall.NOTE_TRIGGER,
}}, nil, nil)
return err
}
func AddWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if *writeon {
return nil
// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.Kevent_t, 128)
for {
n, err := syscall.Kevent(p.fd, p.changes, events, nil)
if err != nil && err != syscall.EINTR {
return err
}
*writeon = true
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE}},
nil, nil)
return err
}
func DelWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if !*writeon {
return nil
p.changes = p.changes[:0]
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
}); err != nil {
return err
}
for i := 0; i < n; i++ {
if fd := int(events[i].Ident); fd != 0 {
if err := iter(fd, nil); err != nil {
return err
}
}
}
*writeon = false
}
_, err := syscall.Kevent(p,
[]syscall.Kevent_t{{Ident: uint64(fd),
Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE}},
nil, nil)
return err
}
func MakePoll() (p int, err error) {
return syscall.Kqueue()
// AddRead ...
func (p *Poll) AddRead(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
},
)
}
func MakeEvents(n int) interface{} {
return make([]syscall.Kevent_t, n)
// AddReadWrite ...
func (p *Poll) AddReadWrite(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_READ,
},
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,
},
)
}
func Wait(p int, evs interface{}, timeout time.Duration) (n int, err error) {
if timeout < 0 {
timeout = 0
}
ts := syscall.NsecToTimespec(int64(timeout))
return syscall.Kevent(p, nil, evs.([]syscall.Kevent_t), &ts)
// ModRead ...
func (p *Poll) ModRead(fd int) {
p.changes = append(p.changes, syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,
})
}
func GetFD(evs interface{}, i int) int {
return int(evs.([]syscall.Kevent_t)[i].Ident)
// ModReadWrite ...
func (p *Poll) ModReadWrite(fd int) {
p.changes = append(p.changes, syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_ADD, Filter: syscall.EVFILT_WRITE,
})
}
// ModDetach ...
func (p *Poll) ModDetach(fd int) {
p.changes = append(p.changes,
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_READ,
},
syscall.Kevent_t{
Ident: uint64(fd), Flags: syscall.EV_DELETE, Filter: syscall.EVFILT_WRITE,
},
)
}

View File

@ -6,95 +6,124 @@ package internal
import (
"syscall"
"time"
)
func AddRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if *readon {
return nil
}
*readon = true
}
if writeon == nil || !*writeon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
})
}
func DelRead(p, fd int, readon, writeon *bool) error {
if readon != nil {
if !*readon {
return nil
}
*readon = false
}
if writeon == nil || !*writeon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_DEL, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
// Poll ...
type Poll struct {
fd int // epoll fd
wfd int // wake fd
notes noteQueue
}
func AddWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if *writeon {
return nil
// OpenPoll ...
func OpenPoll() *Poll {
l := new(Poll)
p, err := syscall.EpollCreate1(0)
if err != nil {
panic(err)
}
l.fd = p
r0, _, e0 := syscall.Syscall(syscall.SYS_EVENTFD, 0, 0, 0)
if e0 != 0 {
syscall.Close(p)
panic(err)
}
l.wfd = int(r0)
l.AddRead(l.wfd)
return l
}
// Close ...
func (p *Poll) Close() error {
if err := syscall.Close(p.wfd); err != nil {
return err
}
return syscall.Close(p.fd)
}
// Trigger ...
func (p *Poll) Trigger(note interface{}) error {
p.notes.Add(note)
_, err := syscall.Write(p.wfd, []byte{0, 0, 0, 0, 0, 0, 0, 1})
return err
}
// Wait ...
func (p *Poll) Wait(iter func(fd int, note interface{}) error) error {
events := make([]syscall.EpollEvent, 64)
for {
n, err := syscall.EpollWait(p.fd, events, -1)
if err != nil && err != syscall.EINTR {
return err
}
if err := p.notes.ForEach(func(note interface{}) error {
return iter(0, note)
}); err != nil {
return err
}
for i := 0; i < n; i++ {
if fd := int(events[i].Fd); fd != p.wfd {
if err := iter(fd, nil); err != nil {
return err
}
} else {
}
}
*writeon = true
}
if readon == nil || !*readon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
}
// AddReadWrite ...
func (p *Poll) AddReadWrite(fd int) {
if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
})
},
); err != nil {
panic(err)
}
}
func DelWrite(p, fd int, readon, writeon *bool) error {
if writeon != nil {
if !*writeon {
return nil
}
*writeon = false
}
if readon == nil || !*readon {
return syscall.EpollCtl(p, syscall.EPOLL_CTL_DEL, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLOUT,
})
}
return syscall.EpollCtl(p, syscall.EPOLL_CTL_MOD, fd,
// AddRead ...
func (p *Poll) AddRead(fd int) {
if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_ADD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
})
}
func MakePoll() (p int, err error) {
return syscall.EpollCreate1(0)
}
func MakeEvents(n int) interface{} {
return make([]syscall.EpollEvent, n)
}
func Wait(p int, evs interface{}, timeout time.Duration) (n int, err error) {
if timeout < 0 {
timeout = 0
},
); err != nil {
panic(err)
}
ts := int(timeout / time.Millisecond)
return syscall.EpollWait(p, evs.([]syscall.EpollEvent), ts)
}
func GetFD(evs interface{}, i int) int {
return int(evs.([]syscall.EpollEvent)[i].Fd)
// ModRead ...
func (p *Poll) ModRead(fd int) {
if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN,
},
); err != nil {
panic(err)
}
}
// ModReadWrite ...
func (p *Poll) ModReadWrite(fd int) {
if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_MOD, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
},
); err != nil {
panic(err)
}
}
// ModDetach ...
func (p *Poll) ModDetach(fd int) {
if err := syscall.EpollCtl(p.fd, syscall.EPOLL_CTL_DEL, fd,
&syscall.EpollEvent{Fd: int32(fd),
Events: syscall.EPOLLIN | syscall.EPOLLOUT,
},
); err != nil {
panic(err)
}
}

View File

@ -1,34 +0,0 @@
package internal
import (
"fmt"
"testing"
"time"
)
type queueItem struct {
timeout time.Time
}
func (item *queueItem) Timeout() time.Time {
return item.timeout
}
func TestQueue(t *testing.T) {
q := NewTimeoutQueue()
item := &queueItem{timeout: time.Unix(0, 5)}
q.Push(item)
q.Push(&queueItem{timeout: time.Unix(0, 3)})
q.Push(&queueItem{timeout: time.Unix(0, 20)})
q.Push(&queueItem{timeout: time.Unix(0, 13)})
var out string
for q.Len() > 0 {
pitem := q.Peek()
item := q.Pop()
out += fmt.Sprintf("(%v:%v) ", pitem.Timeout().UnixNano(), item.Timeout().UnixNano())
}
exp := "(3:3) (5:5) (13:13) (20:20) "
if out != exp {
t.Fatalf("expected '%v', got '%v'", exp, out)
}
}

53
internal/notequeue.go Normal file
View File

@ -0,0 +1,53 @@
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package internal
import (
"runtime"
"sync/atomic"
)
// this is a good candiate for a lock-free structure.
type spinlock struct{ lock uintptr }
func (l *spinlock) Lock() {
for !atomic.CompareAndSwapUintptr(&l.lock, 0, 1) {
runtime.Gosched()
}
}
func (l *spinlock) Unlock() {
atomic.StoreUintptr(&l.lock, 0)
}
type noteQueue struct {
mu spinlock
notes []interface{}
}
func (q *noteQueue) Add(note interface{}) (one bool) {
q.mu.Lock()
q.notes = append(q.notes, note)
n := len(q.notes)
q.mu.Unlock()
return n == 1
}
func (q *noteQueue) ForEach(iter func(note interface{}) error) error {
q.mu.Lock()
if len(q.notes) == 0 {
q.mu.Unlock()
return nil
}
notes := q.notes
q.notes = nil
q.mu.Unlock()
for _, note := range notes {
if err := iter(note); err != nil {
return err
}
}
return nil
}

39
internal/socktoaddr.go Normal file
View File

@ -0,0 +1,39 @@
// Copyright 2018 Joshua J Baker. All rights reserved.
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package internal
import (
"net"
"syscall"
)
// SockaddrToAddr returns a go/net friendly address
func SockaddrToAddr(sa syscall.Sockaddr) net.Addr {
var a net.Addr
switch sa := sa.(type) {
case *syscall.SockaddrInet4:
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
}
case *syscall.SockaddrInet6:
var zone string
if sa.ZoneId != 0 {
if ifi, err := net.InterfaceByIndex(int(sa.ZoneId)); err == nil {
zone = ifi.Name
}
}
if zone == "" && sa.ZoneId != 0 {
}
a = &net.TCPAddr{
IP: append([]byte{}, sa.Addr[:]...),
Port: sa.Port,
Zone: zone,
}
case *syscall.SockaddrUnix:
a = &net.UnixAddr{Net: "unix", Name: sa.Name}
}
return a
}

View File

@ -1,70 +0,0 @@
package internal
import (
"container/heap"
"time"
)
// TimeoutQueueItem is an item for TimeoutQueue
type TimeoutQueueItem interface {
Timeout() time.Time
}
type timeoutPriorityQueue []TimeoutQueueItem
func (pq timeoutPriorityQueue) Len() int { return len(pq) }
func (pq timeoutPriorityQueue) Less(i, j int) bool {
return pq[i].Timeout().Before(pq[j].Timeout())
}
func (pq timeoutPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *timeoutPriorityQueue) Push(x interface{}) {
*pq = append(*pq, x.(TimeoutQueueItem))
}
func (pq *timeoutPriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// TimeoutQueue is a priority queue ordere be ascending time.Time.
type TimeoutQueue struct {
pq timeoutPriorityQueue
}
// NewTimeoutQueue returns a new TimeoutQueue.
func NewTimeoutQueue() *TimeoutQueue {
q := &TimeoutQueue{}
heap.Init(&q.pq)
return q
}
// Push adds a new item.
func (q *TimeoutQueue) Push(x TimeoutQueueItem) {
heap.Push(&q.pq, x)
}
// Pop removes and returns the items with the smallest value.
func (q *TimeoutQueue) Pop() TimeoutQueueItem {
return heap.Pop(&q.pq).(TimeoutQueueItem)
}
// Peek returns the items with the smallest value, but does not remove it.
func (q *TimeoutQueue) Peek() TimeoutQueueItem {
if q.Len() > 0 {
return q.pq[0]
}
return nil
}
// Len returns the number of items in the queue
func (q *TimeoutQueue) Len() int {
return q.pq.Len()
}

2
vendor/.stub vendored
View File

@ -1 +1 @@
stub
// DO NOT REMOVE