forked from mirror/go-sqlite3
Implement support for aggregation functions implemented in Go.
This commit is contained in:
parent
b037a61690
commit
26917df7a6
Binary file not shown.
|
@ -0,0 +1,133 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
|
||||
sqlite "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// Computes x^y
|
||||
func pow(x, y int64) int64 {
|
||||
return int64(math.Pow(float64(x), float64(y)))
|
||||
}
|
||||
|
||||
// Computes the bitwise exclusive-or of all its arguments
|
||||
func xor(xs ...int64) int64 {
|
||||
var ret int64
|
||||
for _, x := range xs {
|
||||
ret ^= x
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
// Returns a random number. It's actually deterministic here because
|
||||
// we don't seed the RNG, but it's an example of a non-pure function
|
||||
// from SQLite's POV.
|
||||
func getrand() int64 {
|
||||
return rand.Int63()
|
||||
}
|
||||
|
||||
// Computes the standard deviation of a GROUPed BY set of values
|
||||
type stddev struct {
|
||||
xs []int64
|
||||
// Running average calculation
|
||||
sum int64
|
||||
n int64
|
||||
}
|
||||
|
||||
func newStddev() *stddev { return &stddev{} }
|
||||
|
||||
func (s *stddev) Step(x int64) {
|
||||
s.xs = append(s.xs, x)
|
||||
s.sum += x
|
||||
s.n++
|
||||
}
|
||||
|
||||
func (s *stddev) Done() float64 {
|
||||
mean := float64(s.sum) / float64(s.n)
|
||||
var sqDiff []float64
|
||||
for _, x := range s.xs {
|
||||
sqDiff = append(sqDiff, math.Pow(float64(x)-mean, 2))
|
||||
}
|
||||
var dev float64
|
||||
for _, x := range sqDiff {
|
||||
dev += x
|
||||
}
|
||||
dev /= float64(len(sqDiff))
|
||||
return math.Sqrt(dev)
|
||||
}
|
||||
|
||||
func main() {
|
||||
sql.Register("sqlite3_custom", &sqlite.SQLiteDriver{
|
||||
ConnectHook: func(conn *sqlite.SQLiteConn) error {
|
||||
if err := conn.RegisterFunc("pow", pow, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.RegisterFunc("xor", xor, true); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.RegisterFunc("rand", getrand, false); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.RegisterAggregator("stddev", newStddev, true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
|
||||
db, err := sql.Open("sqlite3_custom", ":memory:")
|
||||
if err != nil {
|
||||
log.Fatal("Failed to open database:", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
var i int64
|
||||
err = db.QueryRow("SELECT pow(2,3)").Scan(&i)
|
||||
if err != nil {
|
||||
log.Fatal("POW query error:", err)
|
||||
}
|
||||
fmt.Println("pow(2,3) =", i) // 8
|
||||
|
||||
err = db.QueryRow("SELECT xor(1,2,3,4,5,6)").Scan(&i)
|
||||
if err != nil {
|
||||
log.Fatal("XOR query error:", err)
|
||||
}
|
||||
fmt.Println("xor(1,2,3,4,5) =", i) // 7
|
||||
|
||||
err = db.QueryRow("SELECT rand()").Scan(&i)
|
||||
if err != nil {
|
||||
log.Fatal("RAND query error:", err)
|
||||
}
|
||||
fmt.Println("rand() =", i) // pseudorandom
|
||||
|
||||
_, err = db.Exec("create table foo (department integer, profits integer)")
|
||||
if err != nil {
|
||||
log.Fatal("Failed to create table:", err)
|
||||
}
|
||||
_, err = db.Exec("insert into foo values (1, 10), (1, 20), (1, 45), (2, 42), (2, 115)")
|
||||
if err != nil {
|
||||
log.Fatal("Failed to insert records:", err)
|
||||
}
|
||||
|
||||
rows, err := db.Query("select department, stddev(profits) from foo group by department")
|
||||
if err != nil {
|
||||
log.Fatal("STDDEV query error:", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
var dept int64
|
||||
var dev float64
|
||||
if err := rows.Scan(&dept, &dev); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
fmt.Printf("dept=%d stddev=%f\n", dept, dev)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
47
callback.go
47
callback.go
|
@ -12,6 +12,7 @@ package sqlite3
|
|||
|
||||
/*
|
||||
#include <sqlite3-binding.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
void _sqlite3_result_text(sqlite3_context* ctx, const char* s);
|
||||
void _sqlite3_result_blob(sqlite3_context* ctx, const void* b, int l);
|
||||
|
@ -32,6 +33,19 @@ func callbackTrampoline(ctx *C.sqlite3_context, argc int, argv **C.sqlite3_value
|
|||
fi.Call(ctx, args)
|
||||
}
|
||||
|
||||
//export stepTrampoline
|
||||
func stepTrampoline(ctx *C.sqlite3_context, argc int, argv **C.sqlite3_value) {
|
||||
args := (*[1 << 30]*C.sqlite3_value)(unsafe.Pointer(argv))[:argc:argc]
|
||||
ai := (*aggInfo)(unsafe.Pointer(C.sqlite3_user_data(ctx)))
|
||||
ai.Step(ctx, args)
|
||||
}
|
||||
|
||||
//export doneTrampoline
|
||||
func doneTrampoline(ctx *C.sqlite3_context) {
|
||||
ai := (*aggInfo)(unsafe.Pointer(C.sqlite3_user_data(ctx)))
|
||||
ai.Done(ctx)
|
||||
}
|
||||
|
||||
// This is only here so that tests can refer to it.
|
||||
type callbackArgRaw C.sqlite3_value
|
||||
|
||||
|
@ -158,6 +172,33 @@ func callbackArg(typ reflect.Type) (callbackArgConverter, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func callbackConvertArgs(argv []*C.sqlite3_value, converters []callbackArgConverter, variadic callbackArgConverter) ([]reflect.Value, error) {
|
||||
var args []reflect.Value
|
||||
|
||||
if len(argv) < len(converters) {
|
||||
return nil, fmt.Errorf("function requires at least %d arguments", len(converters))
|
||||
}
|
||||
|
||||
for i, arg := range argv[:len(converters)] {
|
||||
v, err := converters[i](arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args = append(args, v)
|
||||
}
|
||||
|
||||
if variadic != nil {
|
||||
for _, arg := range argv[len(converters):] {
|
||||
v, err := variadic(arg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
args = append(args, v)
|
||||
}
|
||||
}
|
||||
return args, nil
|
||||
}
|
||||
|
||||
type callbackRetConverter func(*C.sqlite3_context, reflect.Value) error
|
||||
|
||||
func callbackRetInteger(ctx *C.sqlite3_context, v reflect.Value) error {
|
||||
|
@ -233,6 +274,12 @@ func callbackRet(typ reflect.Type) (callbackRetConverter, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func callbackError(ctx *C.sqlite3_context, err error) {
|
||||
cstr := C.CString(err.Error())
|
||||
defer C.free(unsafe.Pointer(cstr))
|
||||
C.sqlite3_result_error(ctx, cstr, -1)
|
||||
}
|
||||
|
||||
// Test support code. Tests are not allowed to import "C", so we can't
|
||||
// declare any functions that use C.sqlite3_value.
|
||||
func callbackSyntheticForTests(v reflect.Value, err error) callbackArgConverter {
|
||||
|
|
253
sqlite3.go
253
sqlite3.go
|
@ -75,6 +75,8 @@ void _sqlite3_result_blob(sqlite3_context* ctx, const void* b, int l) {
|
|||
}
|
||||
|
||||
void callbackTrampoline(sqlite3_context*, int, sqlite3_value**);
|
||||
void stepTrampoline(sqlite3_context*, int, sqlite3_value**);
|
||||
void doneTrampoline(sqlite3_context*);
|
||||
*/
|
||||
import "C"
|
||||
import (
|
||||
|
@ -127,10 +129,11 @@ type SQLiteDriver struct {
|
|||
|
||||
// Conn struct.
|
||||
type SQLiteConn struct {
|
||||
db *C.sqlite3
|
||||
loc *time.Location
|
||||
txlock string
|
||||
funcs []*functionInfo
|
||||
db *C.sqlite3
|
||||
loc *time.Location
|
||||
txlock string
|
||||
funcs []*functionInfo
|
||||
aggregators []*aggInfo
|
||||
}
|
||||
|
||||
// Tx struct.
|
||||
|
@ -171,49 +174,96 @@ type functionInfo struct {
|
|||
retConverter callbackRetConverter
|
||||
}
|
||||
|
||||
func (fi *functionInfo) error(ctx *C.sqlite3_context, err error) {
|
||||
cstr := C.CString(err.Error())
|
||||
defer C.free(unsafe.Pointer(cstr))
|
||||
C.sqlite3_result_error(ctx, cstr, -1)
|
||||
}
|
||||
|
||||
func (fi *functionInfo) Call(ctx *C.sqlite3_context, argv []*C.sqlite3_value) {
|
||||
var args []reflect.Value
|
||||
|
||||
if len(argv) < len(fi.argConverters) {
|
||||
fi.error(ctx, fmt.Errorf("function requires at least %d arguments", len(fi.argConverters)))
|
||||
}
|
||||
|
||||
for i, arg := range argv[:len(fi.argConverters)] {
|
||||
v, err := fi.argConverters[i](arg)
|
||||
if err != nil {
|
||||
fi.error(ctx, err)
|
||||
return
|
||||
}
|
||||
args = append(args, v)
|
||||
}
|
||||
|
||||
if fi.variadicConverter != nil {
|
||||
for _, arg := range argv[len(fi.argConverters):] {
|
||||
v, err := fi.variadicConverter(arg)
|
||||
if err != nil {
|
||||
fi.error(ctx, err)
|
||||
return
|
||||
}
|
||||
args = append(args, v)
|
||||
}
|
||||
args, err := callbackConvertArgs(argv, fi.argConverters, fi.variadicConverter)
|
||||
if err != nil {
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
ret := fi.f.Call(args)
|
||||
|
||||
if len(ret) == 2 && ret[1].Interface() != nil {
|
||||
fi.error(ctx, ret[1].Interface().(error))
|
||||
callbackError(ctx, ret[1].Interface().(error))
|
||||
return
|
||||
}
|
||||
|
||||
err := fi.retConverter(ctx, ret[0])
|
||||
err = fi.retConverter(ctx, ret[0])
|
||||
if err != nil {
|
||||
fi.error(ctx, err)
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type aggInfo struct {
|
||||
constructor reflect.Value
|
||||
|
||||
// Active aggregator objects for aggregations in flight. The
|
||||
// aggregators are indexed by a counter stored in the aggregation
|
||||
// user data space provided by sqlite.
|
||||
active map[int64]reflect.Value
|
||||
next int64
|
||||
|
||||
stepArgConverters []callbackArgConverter
|
||||
stepVariadicConverter callbackArgConverter
|
||||
|
||||
doneRetConverter callbackRetConverter
|
||||
}
|
||||
|
||||
func (ai *aggInfo) agg(ctx *C.sqlite3_context) (int64, reflect.Value, error) {
|
||||
aggIdx := (*int64)(C.sqlite3_aggregate_context(ctx, C.int(8)))
|
||||
if *aggIdx == 0 {
|
||||
*aggIdx = ai.next
|
||||
ret := ai.constructor.Call(nil)
|
||||
if len(ret) == 2 && ret[1].Interface() != nil {
|
||||
return 0, reflect.Value{}, ret[1].Interface().(error)
|
||||
}
|
||||
if ret[0].IsNil() {
|
||||
return 0, reflect.Value{}, errors.New("aggregator constructor returned nil state")
|
||||
}
|
||||
ai.next++
|
||||
ai.active[*aggIdx] = ret[0]
|
||||
}
|
||||
return *aggIdx, ai.active[*aggIdx], nil
|
||||
}
|
||||
|
||||
func (ai *aggInfo) Step(ctx *C.sqlite3_context, argv []*C.sqlite3_value) {
|
||||
_, agg, err := ai.agg(ctx)
|
||||
if err != nil {
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
args, err := callbackConvertArgs(argv, ai.stepArgConverters, ai.stepVariadicConverter)
|
||||
if err != nil {
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
ret := agg.MethodByName("Step").Call(args)
|
||||
if len(ret) == 1 && ret[0].Interface() != nil {
|
||||
callbackError(ctx, ret[0].Interface().(error))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ai *aggInfo) Done(ctx *C.sqlite3_context) {
|
||||
idx, agg, err := ai.agg(ctx)
|
||||
if err != nil {
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
defer func() { delete(ai.active, idx) }()
|
||||
|
||||
ret := agg.MethodByName("Done").Call(nil)
|
||||
if len(ret) == 2 && ret[1].Interface() != nil {
|
||||
callbackError(ctx, ret[1].Interface().(error))
|
||||
return
|
||||
}
|
||||
|
||||
err = ai.doneRetConverter(ctx, ret[0])
|
||||
if err != nil {
|
||||
callbackError(ctx, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -244,6 +294,8 @@ func (tx *SQLiteTx) Rollback() error {
|
|||
// If pure is true. SQLite will assume that the function's return
|
||||
// value depends only on its inputs, and make more aggressive
|
||||
// optimizations in its queries.
|
||||
//
|
||||
// See _example/go_custom_funcs for a detailed example.
|
||||
func (c *SQLiteConn) RegisterFunc(name string, impl interface{}, pure bool) error {
|
||||
var fi functionInfo
|
||||
fi.f = reflect.ValueOf(impl)
|
||||
|
@ -298,7 +350,132 @@ func (c *SQLiteConn) RegisterFunc(name string, impl interface{}, pure bool) erro
|
|||
if pure {
|
||||
opts |= C.SQLITE_DETERMINISTIC
|
||||
}
|
||||
rv := C.sqlite3_create_function_v2(c.db, cname, C.int(numArgs), C.int(opts), unsafe.Pointer(&fi), (*[0]byte)(unsafe.Pointer(C.callbackTrampoline)), nil, nil, nil)
|
||||
rv := C.sqlite3_create_function(c.db, cname, C.int(numArgs), C.int(opts), unsafe.Pointer(&fi), (*[0]byte)(unsafe.Pointer(C.callbackTrampoline)), nil, nil)
|
||||
if rv != C.SQLITE_OK {
|
||||
return c.lastError()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterAggregator makes a Go type available as a SQLite aggregation function.
|
||||
//
|
||||
// Because aggregation is incremental, it's implemented in Go with a
|
||||
// type that has 2 methods: func Step(values) accumulates one row of
|
||||
// data into the accumulator, and func Done() ret finalizes and
|
||||
// returns the aggregate value. "values" and "ret" may be any type
|
||||
// supported by RegisterFunc.
|
||||
//
|
||||
// RegisterAggregator takes as implementation a constructor function
|
||||
// that constructs an instance of the aggregator type each time an
|
||||
// aggregation begins. The constructor must return a pointer to a
|
||||
// type, or an interface that implements Step() and Done().
|
||||
//
|
||||
// The constructor function and the Step/Done methods may optionally
|
||||
// return an error in addition to their other return values.
|
||||
//
|
||||
// See _example/go_custom_funcs for a detailed example.
|
||||
func (c *SQLiteConn) RegisterAggregator(name string, impl interface{}, pure bool) error {
|
||||
var ai aggInfo
|
||||
ai.constructor = reflect.ValueOf(impl)
|
||||
t := ai.constructor.Type()
|
||||
if t.Kind() != reflect.Func {
|
||||
return errors.New("non-function passed to RegisterAggregator")
|
||||
}
|
||||
if t.NumOut() != 1 && t.NumOut() != 2 {
|
||||
return errors.New("SQLite aggregator constructors must return 1 or 2 values")
|
||||
}
|
||||
if t.NumOut() == 2 && !t.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||||
return errors.New("Second return value of SQLite function must be error")
|
||||
}
|
||||
if t.NumIn() != 0 {
|
||||
return errors.New("SQLite aggregator constructors must not have arguments")
|
||||
}
|
||||
|
||||
agg := t.Out(0)
|
||||
switch agg.Kind() {
|
||||
case reflect.Ptr, reflect.Interface:
|
||||
default:
|
||||
return errors.New("SQlite aggregator constructor must return a pointer object")
|
||||
}
|
||||
stepFn, found := agg.MethodByName("Step")
|
||||
if !found {
|
||||
return errors.New("SQlite aggregator doesn't have a Step() function")
|
||||
}
|
||||
step := stepFn.Type
|
||||
if step.NumOut() != 0 && step.NumOut() != 1 {
|
||||
return errors.New("SQlite aggregator Step() function must return 0 or 1 values")
|
||||
}
|
||||
if step.NumOut() == 1 && !step.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||||
return errors.New("type of SQlite aggregator Step() return value must be error")
|
||||
}
|
||||
|
||||
stepNArgs := step.NumIn()
|
||||
start := 0
|
||||
if agg.Kind() == reflect.Ptr {
|
||||
// Skip over the method receiver
|
||||
stepNArgs--
|
||||
start++
|
||||
}
|
||||
if step.IsVariadic() {
|
||||
stepNArgs--
|
||||
}
|
||||
for i := start; i < start+stepNArgs; i++ {
|
||||
conv, err := callbackArg(step.In(i))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ai.stepArgConverters = append(ai.stepArgConverters, conv)
|
||||
}
|
||||
if step.IsVariadic() {
|
||||
conv, err := callbackArg(t.In(start + stepNArgs).Elem())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ai.stepVariadicConverter = conv
|
||||
// Pass -1 to sqlite so that it allows any number of
|
||||
// arguments. The call helper verifies that the minimum number
|
||||
// of arguments is present for variadic functions.
|
||||
stepNArgs = -1
|
||||
}
|
||||
|
||||
doneFn, found := agg.MethodByName("Done")
|
||||
if !found {
|
||||
return errors.New("SQlite aggregator doesn't have a Done() function")
|
||||
}
|
||||
done := doneFn.Type
|
||||
doneNArgs := done.NumIn()
|
||||
if agg.Kind() == reflect.Ptr {
|
||||
// Skip over the method receiver
|
||||
doneNArgs--
|
||||
}
|
||||
if doneNArgs != 0 {
|
||||
return errors.New("SQlite aggregator Done() function must have no arguments")
|
||||
}
|
||||
if done.NumOut() != 1 && done.NumOut() != 2 {
|
||||
return errors.New("SQLite aggregator Done() function must return 1 or 2 values")
|
||||
}
|
||||
if done.NumOut() == 2 && !done.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
|
||||
return errors.New("second return value of SQLite aggregator Done() function must be error")
|
||||
}
|
||||
|
||||
conv, err := callbackRet(done.Out(0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ai.doneRetConverter = conv
|
||||
ai.active = make(map[int64]reflect.Value)
|
||||
ai.next = 1
|
||||
|
||||
// ai must outlast the database connection, or we'll have dangling pointers.
|
||||
c.aggregators = append(c.aggregators, &ai)
|
||||
|
||||
cname := C.CString(name)
|
||||
defer C.free(unsafe.Pointer(cname))
|
||||
opts := C.SQLITE_UTF8
|
||||
if pure {
|
||||
opts |= C.SQLITE_DETERMINISTIC
|
||||
}
|
||||
rv := C.sqlite3_create_function(c.db, cname, C.int(stepNArgs), C.int(opts), unsafe.Pointer(&ai), nil, (*[0]byte)(unsafe.Pointer(C.stepTrampoline)), (*[0]byte)(unsafe.Pointer(C.doneTrampoline)))
|
||||
if rv != C.SQLITE_OK {
|
||||
return c.lastError()
|
||||
}
|
||||
|
|
|
@ -1175,6 +1175,65 @@ func TestFunctionRegistration(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type sumAggregator int64
|
||||
|
||||
func (s *sumAggregator) Step(x int64) {
|
||||
*s += sumAggregator(x)
|
||||
}
|
||||
|
||||
func (s *sumAggregator) Done() int64 {
|
||||
return int64(*s)
|
||||
}
|
||||
|
||||
func TestAggregatorRegistration(t *testing.T) {
|
||||
customSum := func() *sumAggregator {
|
||||
var ret sumAggregator
|
||||
return &ret
|
||||
}
|
||||
|
||||
sql.Register("sqlite3_AggregatorRegistration", &SQLiteDriver{
|
||||
ConnectHook: func(conn *SQLiteConn) error {
|
||||
if err := conn.RegisterAggregator("customSum", customSum, true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
})
|
||||
db, err := sql.Open("sqlite3_AggregatorRegistration", ":memory:")
|
||||
if err != nil {
|
||||
t.Fatal("Failed to open database:", err)
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
_, err = db.Exec("create table foo (department integer, profits integer)")
|
||||
if err != nil {
|
||||
t.Fatal("Failed to create table:", err)
|
||||
}
|
||||
|
||||
_, err = db.Exec("insert into foo values (1, 10), (1, 20), (2, 42)")
|
||||
if err != nil {
|
||||
t.Fatal("Failed to insert records:", err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
dept, sum int64
|
||||
}{
|
||||
{1, 30},
|
||||
{2, 42},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
var ret int64
|
||||
err = db.QueryRow("select customSum(profits) from foo where department = $1 group by department", test.dept).Scan(&ret)
|
||||
if err != nil {
|
||||
t.Fatal("Query failed:", err)
|
||||
}
|
||||
if ret != test.sum {
|
||||
t.Fatalf("Custom sum returned wrong value, got %d, want %d", ret, test.sum)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var customFunctionOnce sync.Once
|
||||
|
||||
func BenchmarkCustomFunctions(b *testing.B) {
|
||||
|
|
Loading…
Reference in New Issue