diff --git a/append.go b/append.go index 5a27cf9..2ba04ba 100644 --- a/append.go +++ b/append.go @@ -18,9 +18,10 @@ const ( ) var errInvalidMessage = &errProtocol{"invalid message"} +var errIncompletePacket = &errProtocol{"incomplete packet"} // ReadNextCommand reads the next command from the provided packet. It's -// possibel that the packet contains multiple commands, or zero commands +// possible that the packet contains multiple commands, or zero commands // when the packet is incomplete. // 'args' is an optional reusable buffer and it can be nil. // 'argsout' are the output arguments for the command. 'kind' is the type of @@ -31,6 +32,7 @@ var errInvalidMessage = &errProtocol{"invalid message"} func ReadNextCommand(packet []byte, args [][]byte) ( leftover []byte, argsout [][]byte, kind Kind, stop bool, err error, ) { + args = args[:0] if len(packet) > 0 { if packet[0] != '*' { if packet[0] == '$' { @@ -38,43 +40,50 @@ func ReadNextCommand(packet []byte, args [][]byte) ( } return readTelnetCommand(packet, args) } - for i := 1; i < len(packet); i++ { + // standard redis command + for s, i := 1, 1; i < len(packet); i++ { if packet[i] == '\n' { if packet[i-1] != '\r' { - return packet, args, Redis, true, errInvalidMultiBulkLength + println("here", i) + return packet, args[:0], Redis, true, errInvalidMultiBulkLength } - count, ok := parseInt(packet[1 : i-1]) - if !ok || count <= 0 { - return packet, args, Redis, true, errInvalidMultiBulkLength + count, ok := parseInt(packet[s : i-1]) + if !ok || count < 0 { + println("there", i, count, ok, "["+string(packet[s:i-1])+"]") + return packet, args[:0], Redis, true, errInvalidMultiBulkLength } i++ + if count == 0 { + return packet[i:], args[:0], Redis, i == len(packet), nil + } nextArg: for j := 0; j < count; j++ { if i == len(packet) { break } if packet[i] != '$' { - return packet, args, Redis, true, + return packet, args[:0], Redis, true, &errProtocol{"expected '$', got '" + string(packet[i]) + "'"} } for s := i + 1; i < len(packet); i++ { if packet[i] == '\n' { if packet[i-1] != '\r' { - return packet, args, Redis, true, errInvalidBulkLength + return packet, args[:0], Redis, true, errInvalidBulkLength } n, ok := parseInt(packet[s : i-1]) if !ok || count <= 0 { - return packet, args, Redis, true, errInvalidBulkLength + return packet, args[:0], Redis, true, errInvalidBulkLength } i++ if len(packet)-i >= n+2 { if packet[i+n] != '\r' || packet[i+n+1] != '\n' { - return packet, args, Redis, true, errInvalidBulkLength + return packet, args[:0], Redis, true, errInvalidBulkLength } args = append(args, packet[i:i+n]) i += n + 2 if j == count-1 { + // done reading return packet[i:], args, Redis, i == len(packet), nil } continue nextArg @@ -88,7 +97,7 @@ func ReadNextCommand(packet []byte, args [][]byte) ( } } } - return packet, args, Redis, true, nil + return packet, args[:0], Redis, true, errIncompletePacket } func readTile38Command(b []byte, argsbuf [][]byte) ( @@ -98,12 +107,12 @@ func readTile38Command(b []byte, argsbuf [][]byte) ( if b[i] == ' ' { n, ok := parseInt(b[1:i]) if !ok || n < 0 { - return b, args, Tile38, true, errInvalidMessage + return b, args[:0], Tile38, true, errInvalidMessage } i++ if len(b) >= i+n+2 { if b[i+n] != '\r' || b[i+n+1] != '\n' { - return b, args, Tile38, true, errInvalidMessage + return b, args[:0], Tile38, true, errInvalidMessage } line := b[i : i+n] reading: @@ -143,7 +152,7 @@ func readTile38Command(b []byte, argsbuf [][]byte) ( break } } - return b, args, Tile38, true, nil + return b, args[:0], Tile38, true, errIncompletePacket } func readTelnetCommand(b []byte, argsbuf [][]byte) ( leftover []byte, args [][]byte, kind Kind, stop bool, err error, @@ -175,7 +184,7 @@ func readTelnetCommand(b []byte, argsbuf [][]byte) ( } if c == '"' || c == '\'' { if i != 0 { - return b, args, Telnet, true, errUnbalancedQuotes + return b, args[:0], Telnet, true, errUnbalancedQuotes } quotech = c quote = true @@ -199,7 +208,7 @@ func readTelnetCommand(b []byte, argsbuf [][]byte) ( args = append(args, nline) line = line[i+1:] if len(line) > 0 && line[0] != ' ' { - return b, args, Telnet, true, errUnbalancedQuotes + return b, args[:0], Telnet, true, errUnbalancedQuotes } continue outer } else if c == '\\' { @@ -210,7 +219,7 @@ func readTelnetCommand(b []byte, argsbuf [][]byte) ( nline = append(nline, c) } if quote { - return b, args, Telnet, true, errUnbalancedQuotes + return b, args[:0], Telnet, true, errUnbalancedQuotes } if len(line) > 0 { args = append(args, line) @@ -220,7 +229,7 @@ func readTelnetCommand(b []byte, argsbuf [][]byte) ( return b[i+1:], args, Telnet, i == len(b), nil } } - return b, args, Telnet, true, nil + return b, args[:0], Telnet, true, errIncompletePacket } // AppendUint appends a Redis protocol uint64 to the input bytes. diff --git a/append_test.go b/append_test.go new file mode 100644 index 0000000..b942d1a --- /dev/null +++ b/append_test.go @@ -0,0 +1,96 @@ +package redcon + +import ( + "bytes" + "math/rand" + "testing" + "time" +) + +func TestNextCommand(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + start := time.Now() + for time.Since(start) < time.Second { + // keep copy of pipeline args for final compare + var plargs [][][]byte + + // create a pipeline of random number of commands with random data. + N := rand.Int() % 10000 + var data []byte + for i := 0; i < N; i++ { + nargs := rand.Int() % 10 + data = AppendArray(data, nargs) + var args [][]byte + for j := 0; j < nargs; j++ { + arg := make([]byte, rand.Int()%100) + if _, err := rand.Read(arg); err != nil { + t.Fatal(err) + } + data = AppendBulk(data, arg) + args = append(args, arg) + } + plargs = append(plargs, args) + } + + // break data into random number of chunks + chunkn := rand.Int() % 100 + if chunkn == 0 { + chunkn = 1 + } + if len(data) < chunkn { + continue + } + var chunks [][]byte + var chunksz int + for i := 0; i < len(data); i += chunksz { + chunksz = rand.Int() % (len(data) / chunkn) + var chunk []byte + if i+chunksz < len(data) { + chunk = data[i : i+chunksz] + } else { + chunk = data[i:] + } + chunks = append(chunks, chunk) + } + + // process chunks + var rbuf []byte + var fargs [][][]byte + for _, chunk := range chunks { + var data []byte + if len(rbuf) > 0 { + data = append(rbuf, chunk...) + } else { + data = chunk + } + for { + leftover, args, _, stop, err := ReadNextCommand(data, nil) + data = leftover + if err != nil && err != errIncompletePacket { + t.Fatal(err) + } + if err != errIncompletePacket { + fargs = append(fargs, args) + } + if stop { + break + } + } + rbuf = append(rbuf[:0], data...) + } + // compare final args to original + if len(plargs) != len(fargs) { + t.Fatalf("not equal size: %v != %v", len(plargs), len(fargs)) + } + for i := 0; i < len(plargs); i++ { + if len(plargs[i]) != len(fargs[i]) { + t.Fatalf("not equal size for item %v: %v != %v", i, len(plargs[i]), len(fargs[i])) + } + for j := 0; j < len(plargs[i]); j++ { + if !bytes.Equal(plargs[i][j], plargs[i][j]) { + t.Fatalf("not equal for item %v:%v: %v != %v", i, j, len(plargs[i][j]), len(fargs[i][j])) + } + } + } + } +}