diff --git a/redcon.go b/redcon.go index 9c069bc..f91fd2a 100644 --- a/redcon.go +++ b/redcon.go @@ -105,6 +105,8 @@ type Conn interface { // } // }() Detach() DetachedConn + // Hijack returns a connection that's only partially managed by redcon + Hijack(handler func(conn HijackedConn) error) error // ReadPipeline returns all commands in current pipeline, if any // The commands are removed from the pipeline. ReadPipeline() []Command @@ -401,6 +403,17 @@ func handle(s *Server, c *conn) { }() }() + if c.connHandler != nil { + err = c.connHandler(c) + if err, ok := err.(*errProtocol); ok { + // All protocol errors should attempt a response to + // the client. Ignore write errors. + c.wr.WriteError("ERR " + err.Error()) + c.wr.Flush() + } + return + } + err = func() error { // read commands and feed back to the client for { @@ -444,15 +457,16 @@ func handle(s *Server, c *conn) { // conn represents a client connection type conn struct { - conn net.Conn - wr *Writer - rd *Reader - addr string - ctx interface{} - detached bool - closed bool - cmds []Command - idleClose time.Duration + conn net.Conn + wr *Writer + rd *Reader + addr string + ctx interface{} + detached bool + closed bool + cmds []Command + idleClose time.Duration + connHandler func(c HijackedConn) error } func (c *conn) Close() error { @@ -486,6 +500,15 @@ func (c *conn) PeekPipeline() []Command { func (c *conn) NetConn() net.Conn { return c.conn } +func (c *conn) ReadCommand() (Command, error) { + return c.rd.ReadCommand() +} +func (c *conn) ReadCommands() ([]Command, error) { + return c.rd.readCommands(nil) +} +func (c *conn) Flush() error { + return c.wr.Flush() +} // BaseWriter returns the underlying connection writer, if any func BaseWriter(c Conn) *Writer { @@ -505,6 +528,18 @@ type DetachedConn interface { Flush() error } +// HijackedConn represents a connection that is hijacked from the server to allow for custom command read functionality +type HijackedConn interface { + // DetachedConn is the original connection + DetachedConn + // ReadCommand reads the next client command. + ReadCommand() (Command, error) + // ReadCommands reads the next pipeline commands. + ReadCommands() ([]Command, error) + // Flush flushes any writes to the network. + Flush() error +} + // Detach removes the current connection from the server loop and returns // a detached connection. This is useful for operations such as PubSub. // The detached connection must be closed by calling Close() when done. @@ -545,6 +580,21 @@ func (dc *detachedConn) ReadCommand() (Command, error) { return cmd, nil } +func (c *conn) Hijack(handler func(c HijackedConn) error) error { + c.cmds = nil + c.connHandler = handler + return nil +} + +type hijackedConn struct { + *conn +} + +// ReadCommands reads the next pipeline commands +func (c *hijackedConn) ReadCommands() ([]Command, error) { + return c.rd.readCommands(nil) +} + // Command represent a command type Command struct { // Raw is a encoded RESP message. @@ -602,9 +652,9 @@ func (w *Writer) WriteNull() { // sub-responses to the client to complete the response. // For example to write two strings: // -// c.WriteArray(2) -// c.WriteBulkString("item 1") -// c.WriteBulkString("item 2") +// c.WriteArray(2) +// c.WriteBulkString("item 1") +// c.WriteBulkString("item 2") func (w *Writer) WriteArray(count int) { if w.err != nil { return @@ -709,17 +759,18 @@ func (w *Writer) WriteRaw(data []byte) { } // WriteAny writes any type to client. -// nil -> null -// error -> error (adds "ERR " when first word is not uppercase) -// string -> bulk-string -// numbers -> bulk-string -// []byte -> bulk-string -// bool -> bulk-string ("0" or "1") -// slice -> array -// map -> array with key/value pairs -// SimpleString -> string -// SimpleInt -> integer -// everything-else -> bulk-string representation using fmt.Sprint() +// +// nil -> null +// error -> error (adds "ERR " when first word is not uppercase) +// string -> bulk-string +// numbers -> bulk-string +// []byte -> bulk-string +// bool -> bulk-string ("0" or "1") +// slice -> array +// map -> array with key/value pairs +// SimpleString -> string +// SimpleInt -> integer +// everything-else -> bulk-string representation using fmt.Sprint() func (w *Writer) WriteAny(v interface{}) { if w.err != nil { return