implement custom connection handler

This commit is contained in:
Fusl 2022-10-09 11:02:30 +00:00
parent bc9875b4b0
commit 05069c3c85
1 changed files with 74 additions and 23 deletions

View File

@ -105,6 +105,8 @@ type Conn interface {
// }
// }()
Detach() DetachedConn
// Hijack returns a connection that's only partially managed by redcon
Hijack(handler func(conn HijackedConn) error) error
// ReadPipeline returns all commands in current pipeline, if any
// The commands are removed from the pipeline.
ReadPipeline() []Command
@ -401,6 +403,17 @@ func handle(s *Server, c *conn) {
}()
}()
if c.connHandler != nil {
err = c.connHandler(c)
if err, ok := err.(*errProtocol); ok {
// All protocol errors should attempt a response to
// the client. Ignore write errors.
c.wr.WriteError("ERR " + err.Error())
c.wr.Flush()
}
return
}
err = func() error {
// read commands and feed back to the client
for {
@ -444,15 +457,16 @@ func handle(s *Server, c *conn) {
// conn represents a client connection
type conn struct {
conn net.Conn
wr *Writer
rd *Reader
addr string
ctx interface{}
detached bool
closed bool
cmds []Command
idleClose time.Duration
conn net.Conn
wr *Writer
rd *Reader
addr string
ctx interface{}
detached bool
closed bool
cmds []Command
idleClose time.Duration
connHandler func(c HijackedConn) error
}
func (c *conn) Close() error {
@ -486,6 +500,15 @@ func (c *conn) PeekPipeline() []Command {
func (c *conn) NetConn() net.Conn {
return c.conn
}
func (c *conn) ReadCommand() (Command, error) {
return c.rd.ReadCommand()
}
func (c *conn) ReadCommands() ([]Command, error) {
return c.rd.readCommands(nil)
}
func (c *conn) Flush() error {
return c.wr.Flush()
}
// BaseWriter returns the underlying connection writer, if any
func BaseWriter(c Conn) *Writer {
@ -505,6 +528,18 @@ type DetachedConn interface {
Flush() error
}
// HijackedConn represents a connection that is hijacked from the server to allow for custom command read functionality
type HijackedConn interface {
// DetachedConn is the original connection
DetachedConn
// ReadCommand reads the next client command.
ReadCommand() (Command, error)
// ReadCommands reads the next pipeline commands.
ReadCommands() ([]Command, error)
// Flush flushes any writes to the network.
Flush() error
}
// Detach removes the current connection from the server loop and returns
// a detached connection. This is useful for operations such as PubSub.
// The detached connection must be closed by calling Close() when done.
@ -545,6 +580,21 @@ func (dc *detachedConn) ReadCommand() (Command, error) {
return cmd, nil
}
func (c *conn) Hijack(handler func(c HijackedConn) error) error {
c.cmds = nil
c.connHandler = handler
return nil
}
type hijackedConn struct {
*conn
}
// ReadCommands reads the next pipeline commands
func (c *hijackedConn) ReadCommands() ([]Command, error) {
return c.rd.readCommands(nil)
}
// Command represent a command
type Command struct {
// Raw is a encoded RESP message.
@ -602,9 +652,9 @@ func (w *Writer) WriteNull() {
// sub-responses to the client to complete the response.
// For example to write two strings:
//
// c.WriteArray(2)
// c.WriteBulkString("item 1")
// c.WriteBulkString("item 2")
// c.WriteArray(2)
// c.WriteBulkString("item 1")
// c.WriteBulkString("item 2")
func (w *Writer) WriteArray(count int) {
if w.err != nil {
return
@ -709,17 +759,18 @@ func (w *Writer) WriteRaw(data []byte) {
}
// WriteAny writes any type to client.
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// everything-else -> bulk-string representation using fmt.Sprint()
//
// nil -> null
// error -> error (adds "ERR " when first word is not uppercase)
// string -> bulk-string
// numbers -> bulk-string
// []byte -> bulk-string
// bool -> bulk-string ("0" or "1")
// slice -> array
// map -> array with key/value pairs
// SimpleString -> string
// SimpleInt -> integer
// everything-else -> bulk-string representation using fmt.Sprint()
func (w *Writer) WriteAny(v interface{}) {
if w.err != nil {
return