Dial outbound connection

It's not possible to dial an outbound connection which connects to the
event loop exactly like inbound connection.
This commit is contained in:
Josh Baker 2017-11-07 15:59:24 -07:00
parent 749915306d
commit 7da2f5a251
6 changed files with 300 additions and 219 deletions

View File

@ -114,17 +114,17 @@ events.Tick = func() (delay time.Duration, action Action){
### Wake up
A connection can be woken up using the `wake` function that is made available through the `Serving` event. This is useful for when you need to offload an operation to a background goroutine and then later notify the event loop that it's time to send some data.
A connection can be woken up using the `Wake` function that is made available through the `Serving` event. This is useful for when you need to offload an operation to a background goroutine and then later notify the event loop that it's time to send some data.
Example echo server that when encountering the line "exec" it waits 5 seconds before responding.
```go
var wake func(id int) bool
var srv evio.Server
var mu sync.Mutex
var execs = make(map[int]int)
events.Serving = func(wakefn func(id int) bool, addrs []net.Addr) (action evio.Action) {
wake = wakefn // hang on to the wake function
events.Serving = func(srvin evio.Server) (action evio.Action) {
srv = srvin // hang on to the server control, which has the Wake function
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
@ -143,7 +143,7 @@ events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
mu.Lock()
execs[id]++
mu.Unlock()
wake(id)
srv.Wake(id)
}()
} else {
out = in
@ -152,6 +152,41 @@ events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
}
```
### Dialing out
An outbound connection can created by using the `Dial` function that is
made available through the `Serving` event. Dialing a new connection will
return a new connection ID and attach that connection to the event loop in
the same manner as incoming connections. This operation is completely
non-blocking including any DNS resolution.
All new outbound connection attempts will immediately fire an `Opened`
event and end with a `Closed` event. A failed connection will send the
connection error through the `Closed` event.
```go
var srv evio.Server
var mu sync.Mutex
var execs = make(map[int]int)
events.Serving = func(srvin evio.Server) (action evio.Action) {
srv = srvin // hang on to the server control, which has the Dial function
return
}
events.Data = func(id int, in []byte) (out []byte, action evio.Action) {
if string(in) == "dial\r\n" {
id := srv.Dial("tcp://google.com:80")
// We now established an outbound connection to google.
// Treat it like you would incoming connection.
} else {
out = in
}
return
}
```
### Data translations
The `Translate` function wraps events and provides a `ReadWriter` that can be used to translate data off the wire from one format to another. This can be useful for transparently adding compression or encryption.

10
evio.go
View File

@ -58,10 +58,10 @@ type Server struct {
// server and returns a new connection id. The new connection is added
// to the event loop and is managed exactly the same way as all the
// other connections. This operation only fails if the server/loop has
// been shut down. When `ok` is true there will always be exactly one
// Opened and one Closed event following this call. Look for socket
// errors from the Closed event.
Dial func(addr string, timeout time.Duration) (id int, ok bool)
// been shut down. An `id` that is not zero means the operation succeeded
// and then there always be exactly one Opened and one Closed event
// following this call. Look for socket errors from the Closed event.
Dial func(addr string, timeout time.Duration) (id int)
}
// Events represents the server events for the Serve call.
@ -112,7 +112,7 @@ type Events struct {
// Addresses should use a scheme prefix and be formatted
// like `tcp://192.168.0.10:9851` or `unix://socket`.
// Valid network schemes:
// tcp - bind to both IPv4 and IPv6
// tcp - bind to both IPv4 and IPv6
// tcp4 - IPv4
// tcp6 - IPv6
// unix - Unix Domain Socket

View File

@ -137,11 +137,11 @@ func serve(events Events, lns []*listener) error {
idconn := make(map[int]*unixConn)
timeoutqueue := internal.NewTimeoutQueue()
var id int
dial := func(addr string, timeout time.Duration) (int, bool) {
dial := func(addr string, timeout time.Duration) int {
lock()
if done {
unlock()
return 0, false
return 0
}
id++
c := &unixConn{id: id, opening: true}
@ -207,7 +207,7 @@ func serve(events Events, lns []*listener) error {
}
}()
return id, true
return id
}
// wake wakes up a connection

View File

@ -63,11 +63,192 @@ func (c *netConn) Close() error {
// servenet uses the stdlib net package instead of syscalls.
func servenet(events Events, lns []*listener) error {
var id int64
var idc int64
var mu sync.Mutex
var cmu sync.Mutex
var idconn = make(map[int]*netConn)
var done bool
var done int64
var shutdown func(err error)
// connloop handles an individual connection
connloop := func(id int, conn net.Conn, lnidx int, ln net.Listener) {
var closed bool
defer func() {
if !closed {
conn.Close()
}
}()
var packet [0xFFFF]byte
var cout []byte
var caction Action
c := &netConn{id: id, conn: conn}
cmu.Lock()
idconn[id] = c
cmu.Unlock()
if events.Opened != nil {
var out []byte
var opts Options
var action Action
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, opts, action = events.Opened(id, Info{
AddrIndex: lnidx,
LocalAddr: conn.LocalAddr(),
RemoteAddr: conn.RemoteAddr(),
})
}
mu.Unlock()
if opts.TCPKeepAlive > 0 {
if conn, ok := conn.(*net.TCPConn); ok {
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(opts.TCPKeepAlive)
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
}
for {
var n int
var err error
var out []byte
var action Action
if caction != None {
goto write
}
if len(cout) > 0 || atomic.LoadInt64(&c.wake) != 0 {
conn.SetReadDeadline(time.Now().Add(time.Microsecond))
} else {
conn.SetReadDeadline(time.Now().Add(time.Second))
}
n, err = c.Read(packet[:])
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
if n > 0 {
if events.Data != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, action = events.Data(id, append([]byte{}, packet[:n]...))
}
mu.Unlock()
}
} else if atomic.LoadInt64(&c.wake) != 0 {
atomic.StoreInt64(&c.wake, 0)
if events.Data != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
out, action = events.Data(id, nil)
}
mu.Unlock()
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
goto write
write:
if len(cout) > 0 {
if events.Prewrite != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Prewrite(id, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
conn.SetWriteDeadline(time.Now().Add(time.Microsecond))
n, err := c.Write(cout)
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
cout = cout[n:]
if len(cout) == 0 {
cout = nil
}
if events.Postwrite != nil {
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Postwrite(id, n, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
}
if caction == Shutdown {
goto close
}
if len(cout) == 0 {
if caction != None {
goto close
}
}
continue
close:
cmu.Lock()
delete(idconn, c.id)
cmu.Unlock()
mu.Lock()
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
return
}
mu.Unlock()
if caction == Detach {
if events.Detached != nil {
if len(cout) > 0 {
c.outbuf = cout
}
c.detached = true
conn.SetDeadline(time.Time{})
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
caction = events.Detached(c.id, c)
}
mu.Unlock()
closed = true
if caction == Shutdown {
goto fail
}
return
}
}
conn.Close()
if events.Closed != nil {
var action Action
mu.Lock()
if atomic.LoadInt64(&done) == 0 {
action = events.Closed(c.id, c.err)
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
closed = true
if caction == Shutdown {
goto fail
}
return
fail:
shutdown(nil)
return
}
}
ctx := Server{
Wake: func(id int) bool {
cmu.Lock()
@ -81,18 +262,57 @@ func servenet(events Events, lns []*listener) error {
c.conn.SetDeadline(time.Time{}.Add(1))
return true
},
Dial: func(addr string, timeout time.Duration) int {
if atomic.LoadInt64(&done) != 0 {
return 0
}
id := int(atomic.AddInt64(&idc, 1))
go func() {
network, address, _ := parseAddr(addr)
var conn net.Conn
var err error
if timeout > 0 {
conn, err = net.DialTimeout(network, address, timeout)
} else {
conn, err = net.Dial(network, address)
}
if err != nil {
if events.Opened != nil {
mu.Lock()
_, _, action := events.Opened(id, Info{Closing: true, AddrIndex: -1})
mu.Unlock()
if action == Shutdown {
shutdown(nil)
return
}
}
if events.Closed != nil {
mu.Lock()
action := events.Closed(id, err)
mu.Unlock()
if action == Shutdown {
shutdown(nil)
return
}
}
return
}
go connloop(id, conn, -1, nil)
}()
return id
},
}
var swg sync.WaitGroup
swg.Add(1)
var ferr error
shutdown := func(err error) {
shutdown = func(err error) {
mu.Lock()
if done {
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
return
}
defer swg.Done()
done = true
atomic.StoreInt64(&done, 1)
ferr = err
for _, ln := range lns {
ln.ln.Close()
@ -145,186 +365,15 @@ func servenet(events Events, lns []*listener) error {
}
return
}
id := int(atomic.AddInt64(&id, 1))
go func(id int, conn net.Conn) {
var closed bool
defer func() {
if !closed {
conn.Close()
}
}()
var packet [0xFFFF]byte
var cout []byte
var caction Action
c := &netConn{id: id, conn: conn}
cmu.Lock()
idconn[id] = c
cmu.Unlock()
if events.Opened != nil {
var out []byte
var opts Options
var action Action
mu.Lock()
if !done {
out, opts, action = events.Opened(id, Info{
AddrIndex: lnidx,
LocalAddr: conn.LocalAddr(),
RemoteAddr: conn.RemoteAddr(),
})
}
mu.Unlock()
if opts.TCPKeepAlive > 0 {
if conn, ok := conn.(*net.TCPConn); ok {
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(opts.TCPKeepAlive)
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
}
for {
var n int
var out []byte
var action Action
if caction != None {
goto write
}
conn.SetReadDeadline(time.Now().Add(time.Microsecond))
n, err = c.Read(packet[:])
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
if n > 0 {
if events.Data != nil {
mu.Lock()
if !done {
out, action = events.Data(id, append([]byte{}, packet[:n]...))
}
mu.Unlock()
}
} else if atomic.LoadInt64(&c.wake) != 0 {
atomic.StoreInt64(&c.wake, 0)
if events.Data != nil {
mu.Lock()
if !done {
out, action = events.Data(id, nil)
}
mu.Unlock()
}
}
if len(out) > 0 {
cout = append(cout, out...)
}
caction = action
goto write
write:
if len(cout) > 0 {
if events.Prewrite != nil {
mu.Lock()
if !done {
action = events.Prewrite(id, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
conn.SetWriteDeadline(time.Now().Add(time.Microsecond))
n, err := c.Write(cout)
if err != nil && !istimeout(err) {
if err != io.EOF {
c.err = err
}
goto close
}
cout = cout[n:]
if len(cout) == 0 {
cout = nil
}
if events.Postwrite != nil {
mu.Lock()
if !done {
action = events.Postwrite(id, n, len(cout))
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
}
if caction == Shutdown {
goto close
}
if len(cout) == 0 {
if caction != None {
goto close
}
}
continue
close:
cmu.Lock()
delete(idconn, c.id)
cmu.Unlock()
mu.Lock()
if done {
mu.Unlock()
return
}
mu.Unlock()
if caction == Detach {
if events.Detached != nil {
if len(cout) > 0 {
c.outbuf = cout
}
c.detached = true
conn.SetDeadline(time.Time{})
mu.Lock()
if !done {
caction = events.Detached(c.id, c)
}
mu.Unlock()
closed = true
if caction == Shutdown {
goto fail
}
return
}
}
conn.Close()
if events.Closed != nil {
var action Action
mu.Lock()
if !done {
action = events.Closed(c.id, c.err)
}
mu.Unlock()
if action == Shutdown {
caction = Shutdown
}
}
closed = true
if caction == Shutdown {
goto fail
}
return
fail:
shutdown(nil)
return
}
}(id, conn)
id := int(atomic.AddInt64(&idc, 1))
go connloop(id, conn, lnidx, ln)
}
}(i, ln.ln)
}
go func() {
for {
mu.Lock()
if done {
if atomic.LoadInt64(&done) != 0 {
mu.Unlock()
break
}
@ -333,7 +382,7 @@ func servenet(events Events, lns []*listener) error {
var action Action
mu.Lock()
if events.Tick != nil {
if !done {
if atomic.LoadInt64(&done) == 0 {
delay, action = events.Tick()
}
} else {

View File

@ -55,10 +55,10 @@ func testServe(network, addr string, unix bool, nclients int) {
var disconnected int
var events Events
events.Serving = func(ctx Context) (action Action) {
events.Serving = func(srv Server) (action Action) {
return
}
events.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) {
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
connected++
out = []byte("sweetness\r\n")
opts.TCPKeepAlive = time.Minute * 5
@ -163,9 +163,9 @@ func TestWake(t *testing.T) {
}
func testWake(network, addr string, stdlib bool) {
var events Events
var ctx Context
events.Serving = func(ctxin Context) (action Action) {
ctx = ctxin
var srv Server
events.Serving = func(srvin Server) (action Action) {
srv = srvin
go func() {
conn, err := net.Dial(network, addr)
must(err)
@ -189,7 +189,7 @@ func testWake(network, addr string, stdlib bool) {
var cin []byte
var cclosed bool
var cond = sync.NewCond(&sync.Mutex{})
events.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) {
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
cid = id
return
}
@ -209,7 +209,7 @@ func testWake(network, addr string, stdlib bool) {
cin = nil
}
if len(cout) > 0 {
ctx.Wake(cid)
srv.Wake(cid)
}
cond.Wait()
}
@ -317,7 +317,7 @@ func testShutdown(network, addr string, stdlib bool) {
var count int
var clients int64
var N = 10
events.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) {
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
atomic.AddInt64(&clients, 1)
return
}
@ -419,7 +419,7 @@ func testDetach(network, addr string, stdlib bool) {
}()
return
}
events.Serving = func(ctx Context) (action Action) {
events.Serving = func(srv Server) (action Action) {
go func() {
// client connection
conn, err := net.Dial(network, addr)
@ -468,7 +468,7 @@ func testDetach(network, addr string, stdlib bool) {
func TestBadAddresses(t *testing.T) {
var events Events
events.Serving = func(ctx Context) (action Action) {
events.Serving = func(srv Server) (action Action) {
return Shutdown
}
if err := Serve(events, "tulip://howdy"); err == nil {
@ -528,21 +528,21 @@ func TestPrePostwrite(t *testing.T) {
func testPrePostwrite(network, addr string, stdlib bool) {
var events Events
var ctx Context
var srv Server
var packets int
var tout []byte
events.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) {
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
packets++
out = []byte(fmt.Sprintf("hello %d\r\n", packets))
tout = append(tout, out...)
ctx.Wake(id)
srv.Wake(id)
return
}
events.Data = func(id int, in []byte) (out []byte, action Action) {
packets++
out = []byte(fmt.Sprintf("hello %d\r\n", packets))
tout = append(tout, out...)
ctx.Wake(id)
srv.Wake(id)
return
}
events.Prewrite = func(id int, amount int) (action Action) {
@ -562,8 +562,8 @@ func testPrePostwrite(network, addr string, stdlib bool) {
action = Shutdown
return
}
events.Serving = func(ctxin Context) (action Action) {
ctx = ctxin
events.Serving = func(srvin Server) (action Action) {
srv = srvin
go func() {
conn, err := net.Dial(network, addr)
must(err)
@ -622,11 +622,11 @@ func testTranslate(network, addr string, kind string, stdlib bool) {
action = Shutdown
return
}
events.Opened = func(id int, addr Addr) (out []byte, opts Options, action Action) {
events.Opened = func(id int, info Info) (out []byte, opts Options, action Action) {
out = []byte("sweetness\r\n")
return
}
events.Serving = func(ctx Context) (action Action) {
events.Serving = func(srv Server) (action Action) {
go func() {
conn, err := net.Dial(network, addr)
must(err)
@ -663,7 +663,7 @@ func testTranslate(network, addr string, kind string, stdlib bool) {
}
tevents := Translate(events,
func(id int, addr Addr) bool {
func(id int, info Info) bool {
return true
},
func(id int, rw io.ReadWriter) io.ReadWriter {
@ -683,7 +683,7 @@ func testTranslate(network, addr string, kind string, stdlib bool) {
// test with no shoulds
tevents = Translate(events,
func(id int, addr Addr) bool {
func(id int, info Info) bool {
return false
},
func(id int, rw io.ReadWriter) io.ReadWriter {

View File

@ -73,7 +73,6 @@ func main() {
c := conns[id]
if c.wget {
print(string(in))
//action = evio.Close
return
}
data := c.is.Begin(in)
@ -100,13 +99,11 @@ func main() {
if len(args) != 3 {
out = redcon.AppendError(out, "ERR wrong number of arguments for '"+string(args[0])+"' command")
} else {
start := time.Now()
n, _ := strconv.ParseInt(string(args[2]), 10, 63)
cid, ok := srv.Dial("unix://"+string(args[1]), time.Duration(n)*time.Second)
if !ok {
cid := srv.Dial("tcp://"+string(args[1]), time.Duration(n)*time.Second)
if cid == 0 {
out = redcon.AppendError(out, "failed to dial")
} else {
println(cid, time.Since(start).String())
wgetids[cid] = time.Now()
out = redcon.AppendOK(out)
}
@ -172,7 +169,7 @@ func main() {
c.is.End(data)
return
}
addrs := []string{fmt.Sprintf("tcp://:%d", port)}
addrs := []string{fmt.Sprintf("tcp-net://:%d", port)}
if unixsocket != "" {
addrs = append(addrs, fmt.Sprintf("unix://%s", unixsocket))
}