mirror of https://github.com/tidwall/evio.git
271 lines
7.9 KiB
Go
271 lines
7.9 KiB
Go
// Copyright 2017 Joshua J Baker. All rights reserved.
|
|
// Use of this source code is governed by an MIT-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package evio
|
|
|
|
import (
|
|
"io"
|
|
"net"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/kavu/go_reuseport"
|
|
)
|
|
|
|
// Action is an action that occurs after the completion of an event.
|
|
type Action int
|
|
|
|
const (
|
|
// None indicates that no action should occur following an event.
|
|
None Action = iota
|
|
// Detach detaches the client. Not available for UDP connections.
|
|
Detach
|
|
// Close closes the client.
|
|
Close
|
|
// Shutdown shutdowns the server.
|
|
Shutdown
|
|
)
|
|
|
|
// Options are set when the client opens.
|
|
type Options struct {
|
|
// TCPKeepAlive (SO_KEEPALIVE) socket option.
|
|
TCPKeepAlive time.Duration
|
|
// ReuseInputBuffer will forces the connection to share and reuse the
|
|
// same input packet buffer with all other connections that also use
|
|
// this option.
|
|
// Default value is false, which means that all input data which is
|
|
// passed to the Data event will be a uniquely copied []byte slice.
|
|
ReuseInputBuffer bool
|
|
}
|
|
|
|
// Info represents a information about the connection
|
|
type Info struct {
|
|
// Closing is true when the connection is about to close. Expect a Closed
|
|
// event to fire soon.
|
|
Closing bool
|
|
// AddrIndex is the index of server address that was passed to the Serve call.
|
|
AddrIndex int
|
|
// LocalAddr is the connection's local socket address.
|
|
LocalAddr net.Addr
|
|
// RemoteAddr is the connection's remote peer address.
|
|
RemoteAddr net.Addr
|
|
}
|
|
|
|
// Server represents a server context which provides information about the
|
|
// running server and has control functions for managing state.
|
|
type Server struct {
|
|
// The addrs parameter is an array of listening addresses that align
|
|
// with the addr strings passed to the Serve function.
|
|
Addrs []net.Addr
|
|
// Wake is a goroutine-safe function that triggers a Data event
|
|
// (with a nil `in` parameter) for the specified id. Not available for
|
|
// UDP connections.
|
|
Wake func(id int) (ok bool)
|
|
// Dial is a goroutine-safe function makes a connection to an external
|
|
// server and returns a new connection id. The new connection is added
|
|
// to the event loop and is managed exactly the same way as all the
|
|
// other connections. This operation only fails if the server/loop has
|
|
// been shut down. An `id` that is not zero means the operation succeeded
|
|
// and then there always be exactly one Opened and one Closed event
|
|
// following this call. Look for socket errors from the Closed event.
|
|
// Not available for UDP connections.
|
|
Dial func(addr string, timeout time.Duration) (id int)
|
|
}
|
|
|
|
// Events represents the server events for the Serve call.
|
|
// Each event has an Action return value that is used manage the state
|
|
// of the connection and server.
|
|
type Events struct {
|
|
// Serving fires when the server can accept connections. The server
|
|
// parameter has information and various utilities.
|
|
Serving func(server Server) (action Action)
|
|
// Opened fires when a new connection has opened.
|
|
// The info parameter has information about the connection such as
|
|
// it's local and remote address.
|
|
// Use the out return value to write data to the connection.
|
|
// The opts return value is used to set connection options.
|
|
Opened func(id int, info Info) (out []byte, opts Options, action Action)
|
|
// Closed fires when a connection has closed.
|
|
// The err parameter is the last known connection error.
|
|
Closed func(id int, err error) (action Action)
|
|
// Detached fires when a connection has been previously detached.
|
|
// Once detached it's up to the receiver of this event to manage the
|
|
// state of the connection. The Closed event will not be called for
|
|
// this connection.
|
|
// The conn parameter is a ReadWriteCloser that represents the
|
|
// underlying socket connection. It can be freely used in goroutines
|
|
// and should be closed when it's no longer needed.
|
|
Detached func(id int, rwc io.ReadWriteCloser) (action Action)
|
|
// Data fires when a connection sends the server data.
|
|
// The in parameter is the incoming data.
|
|
// Use the out return value to write data to the connection.
|
|
Data func(id int, in []byte) (out []byte, action Action)
|
|
// Prewrite fires prior to every write attempt.
|
|
// The amount parameter is the number of bytes that will be attempted
|
|
// to be written to the connection.
|
|
Prewrite func(id int, amount int) (action Action)
|
|
// Postwrite fires immediately after every write attempt.
|
|
// The amount parameter is the number of bytes that was written to the
|
|
// connection.
|
|
// The remaining parameter is the number of bytes that still remain in
|
|
// the buffer scheduled to be written.
|
|
Postwrite func(id int, amount, remaining int) (action Action)
|
|
// Tick fires immediately after the server starts and will fire again
|
|
// following the duration specified by the delay return value.
|
|
Tick func() (delay time.Duration, action Action)
|
|
}
|
|
|
|
// Serve starts handling events for the specified addresses.
|
|
//
|
|
// Addresses should use a scheme prefix and be formatted
|
|
// like `tcp://192.168.0.10:9851` or `unix://socket`.
|
|
// Valid network schemes:
|
|
// tcp - bind to both IPv4 and IPv6
|
|
// tcp4 - IPv4
|
|
// tcp6 - IPv6
|
|
// udp - bind to both IPv4 and IPv6
|
|
// udp4 - IPv4
|
|
// udp6 - IPv6
|
|
// unix - Unix Domain Socket
|
|
//
|
|
// The "tcp" network scheme is assumed when one is not specified.
|
|
func Serve(events Events, addr ...string) error {
|
|
var lns []*listener
|
|
defer func() {
|
|
for _, ln := range lns {
|
|
ln.close()
|
|
}
|
|
}()
|
|
var stdlib bool
|
|
for _, addr := range addr {
|
|
var ln listener
|
|
var stdlibt bool
|
|
ln.network, ln.addr, ln.opts, stdlibt = parseAddr(addr)
|
|
if stdlibt {
|
|
stdlib = true
|
|
}
|
|
if ln.network == "unix" {
|
|
os.RemoveAll(ln.addr)
|
|
}
|
|
var err error
|
|
if ln.network == "udp" {
|
|
if ln.opts.reusePort() {
|
|
ln.pconn, err = reuseport.ListenPacket(ln.network, ln.addr)
|
|
} else {
|
|
ln.pconn, err = net.ListenPacket(ln.network, ln.addr)
|
|
}
|
|
} else {
|
|
if ln.opts.reusePort() {
|
|
ln.ln, err = reuseport.Listen(ln.network, ln.addr)
|
|
} else {
|
|
ln.ln, err = net.Listen(ln.network, ln.addr)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ln.pconn != nil {
|
|
ln.lnaddr = ln.pconn.LocalAddr()
|
|
} else {
|
|
ln.lnaddr = ln.ln.Addr()
|
|
}
|
|
if !stdlib {
|
|
if err := ln.system(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
lns = append(lns, &ln)
|
|
}
|
|
if stdlib {
|
|
return servenet(events, lns)
|
|
}
|
|
return serve(events, lns)
|
|
}
|
|
|
|
// InputStream is a helper type for managing input streams inside the
|
|
// Data event.
|
|
type InputStream struct{ b []byte }
|
|
|
|
// Begin accepts a new packet and returns a working sequence of
|
|
// unprocessed bytes.
|
|
func (is *InputStream) Begin(packet []byte) (data []byte) {
|
|
data = packet
|
|
if len(is.b) > 0 {
|
|
is.b = append(is.b, data...)
|
|
data = is.b
|
|
}
|
|
return data
|
|
}
|
|
|
|
// End shift the stream to match the unprocessed data.
|
|
func (is *InputStream) End(data []byte) {
|
|
if len(data) > 0 {
|
|
if len(data) != len(is.b) {
|
|
is.b = append(is.b[:0], data...)
|
|
}
|
|
} else if len(is.b) > 0 {
|
|
is.b = is.b[:0]
|
|
}
|
|
}
|
|
|
|
type listener struct {
|
|
ln net.Listener
|
|
lnaddr net.Addr
|
|
pconn net.PacketConn
|
|
opts addrOpts
|
|
f *os.File
|
|
fd int
|
|
network string
|
|
addr string
|
|
}
|
|
|
|
type addrOpts map[string]string
|
|
|
|
func (opts addrOpts) reusePort() bool {
|
|
switch opts["reuseport"] {
|
|
case "yes", "true", "1":
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (opts addrOpts) loadBalance() int {
|
|
n, err := strconv.ParseInt(opts["loadbalance"], 10, 16)
|
|
if err != nil {
|
|
return 1
|
|
}
|
|
if n <= 0 {
|
|
return runtime.NumCPU()
|
|
}
|
|
return int(n)
|
|
}
|
|
|
|
func parseAddr(addr string) (network, address string, opts addrOpts, stdlib bool) {
|
|
network = "tcp"
|
|
address = addr
|
|
opts = make(map[string]string)
|
|
if strings.Contains(address, "://") {
|
|
network = strings.Split(address, "://")[0]
|
|
address = strings.Split(address, "://")[1]
|
|
}
|
|
if strings.HasSuffix(network, "-net") {
|
|
stdlib = true
|
|
network = network[:len(network)-4]
|
|
}
|
|
q := strings.Index(address, "?")
|
|
if q != -1 {
|
|
for _, part := range strings.Split(address[q+1:], "&") {
|
|
kv := strings.Split(part, "=")
|
|
if len(kv) == 2 {
|
|
opts[kv[0]] = kv[1]
|
|
}
|
|
}
|
|
address = address[:q]
|
|
}
|
|
return
|
|
}
|