diff --git a/_example/go_custom_funcs/go_custom_funcs b/_example/go_custom_funcs/go_custom_funcs new file mode 100755 index 0000000..b6be764 Binary files /dev/null and b/_example/go_custom_funcs/go_custom_funcs differ diff --git a/_example/go_custom_funcs/main.go b/_example/go_custom_funcs/main.go new file mode 100644 index 0000000..85657e6 --- /dev/null +++ b/_example/go_custom_funcs/main.go @@ -0,0 +1,133 @@ +package main + +import ( + "database/sql" + "fmt" + "log" + "math" + "math/rand" + + sqlite "github.com/mattn/go-sqlite3" +) + +// Computes x^y +func pow(x, y int64) int64 { + return int64(math.Pow(float64(x), float64(y))) +} + +// Computes the bitwise exclusive-or of all its arguments +func xor(xs ...int64) int64 { + var ret int64 + for _, x := range xs { + ret ^= x + } + return ret +} + +// Returns a random number. It's actually deterministic here because +// we don't seed the RNG, but it's an example of a non-pure function +// from SQLite's POV. +func getrand() int64 { + return rand.Int63() +} + +// Computes the standard deviation of a GROUPed BY set of values +type stddev struct { + xs []int64 + // Running average calculation + sum int64 + n int64 +} + +func newStddev() *stddev { return &stddev{} } + +func (s *stddev) Step(x int64) { + s.xs = append(s.xs, x) + s.sum += x + s.n++ +} + +func (s *stddev) Done() float64 { + mean := float64(s.sum) / float64(s.n) + var sqDiff []float64 + for _, x := range s.xs { + sqDiff = append(sqDiff, math.Pow(float64(x)-mean, 2)) + } + var dev float64 + for _, x := range sqDiff { + dev += x + } + dev /= float64(len(sqDiff)) + return math.Sqrt(dev) +} + +func main() { + sql.Register("sqlite3_custom", &sqlite.SQLiteDriver{ + ConnectHook: func(conn *sqlite.SQLiteConn) error { + if err := conn.RegisterFunc("pow", pow, true); err != nil { + return err + } + if err := conn.RegisterFunc("xor", xor, true); err != nil { + return err + } + if err := conn.RegisterFunc("rand", getrand, false); err != nil { + return err + } + if err := conn.RegisterAggregator("stddev", newStddev, true); err != nil { + return err + } + return nil + }, + }) + + db, err := sql.Open("sqlite3_custom", ":memory:") + if err != nil { + log.Fatal("Failed to open database:", err) + } + defer db.Close() + + var i int64 + err = db.QueryRow("SELECT pow(2,3)").Scan(&i) + if err != nil { + log.Fatal("POW query error:", err) + } + fmt.Println("pow(2,3) =", i) // 8 + + err = db.QueryRow("SELECT xor(1,2,3,4,5,6)").Scan(&i) + if err != nil { + log.Fatal("XOR query error:", err) + } + fmt.Println("xor(1,2,3,4,5) =", i) // 7 + + err = db.QueryRow("SELECT rand()").Scan(&i) + if err != nil { + log.Fatal("RAND query error:", err) + } + fmt.Println("rand() =", i) // pseudorandom + + _, err = db.Exec("create table foo (department integer, profits integer)") + if err != nil { + log.Fatal("Failed to create table:", err) + } + _, err = db.Exec("insert into foo values (1, 10), (1, 20), (1, 45), (2, 42), (2, 115)") + if err != nil { + log.Fatal("Failed to insert records:", err) + } + + rows, err := db.Query("select department, stddev(profits) from foo group by department") + if err != nil { + log.Fatal("STDDEV query error:", err) + } + defer rows.Close() + for rows.Next() { + var dept int64 + var dev float64 + if err := rows.Scan(&dept, &dev); err != nil { + log.Fatal(err) + } + fmt.Printf("dept=%d stddev=%f\n", dept, dev) + } + if err := rows.Err(); err != nil { + log.Fatal(err) + } +} diff --git a/callback.go b/callback.go index b1704fe..61fc8d1 100644 --- a/callback.go +++ b/callback.go @@ -12,6 +12,7 @@ package sqlite3 /* #include +#include void _sqlite3_result_text(sqlite3_context* ctx, const char* s); void _sqlite3_result_blob(sqlite3_context* ctx, const void* b, int l); @@ -32,6 +33,19 @@ func callbackTrampoline(ctx *C.sqlite3_context, argc int, argv **C.sqlite3_value fi.Call(ctx, args) } +//export stepTrampoline +func stepTrampoline(ctx *C.sqlite3_context, argc int, argv **C.sqlite3_value) { + args := (*[1 << 30]*C.sqlite3_value)(unsafe.Pointer(argv))[:argc:argc] + ai := (*aggInfo)(unsafe.Pointer(C.sqlite3_user_data(ctx))) + ai.Step(ctx, args) +} + +//export doneTrampoline +func doneTrampoline(ctx *C.sqlite3_context) { + ai := (*aggInfo)(unsafe.Pointer(C.sqlite3_user_data(ctx))) + ai.Done(ctx) +} + // This is only here so that tests can refer to it. type callbackArgRaw C.sqlite3_value @@ -158,6 +172,33 @@ func callbackArg(typ reflect.Type) (callbackArgConverter, error) { } } +func callbackConvertArgs(argv []*C.sqlite3_value, converters []callbackArgConverter, variadic callbackArgConverter) ([]reflect.Value, error) { + var args []reflect.Value + + if len(argv) < len(converters) { + return nil, fmt.Errorf("function requires at least %d arguments", len(converters)) + } + + for i, arg := range argv[:len(converters)] { + v, err := converters[i](arg) + if err != nil { + return nil, err + } + args = append(args, v) + } + + if variadic != nil { + for _, arg := range argv[len(converters):] { + v, err := variadic(arg) + if err != nil { + return nil, err + } + args = append(args, v) + } + } + return args, nil +} + type callbackRetConverter func(*C.sqlite3_context, reflect.Value) error func callbackRetInteger(ctx *C.sqlite3_context, v reflect.Value) error { @@ -233,6 +274,12 @@ func callbackRet(typ reflect.Type) (callbackRetConverter, error) { } } +func callbackError(ctx *C.sqlite3_context, err error) { + cstr := C.CString(err.Error()) + defer C.free(unsafe.Pointer(cstr)) + C.sqlite3_result_error(ctx, cstr, -1) +} + // Test support code. Tests are not allowed to import "C", so we can't // declare any functions that use C.sqlite3_value. func callbackSyntheticForTests(v reflect.Value, err error) callbackArgConverter { diff --git a/sqlite3.go b/sqlite3.go index 73e67e3..8d2faca 100644 --- a/sqlite3.go +++ b/sqlite3.go @@ -75,6 +75,8 @@ void _sqlite3_result_blob(sqlite3_context* ctx, const void* b, int l) { } void callbackTrampoline(sqlite3_context*, int, sqlite3_value**); +void stepTrampoline(sqlite3_context*, int, sqlite3_value**); +void doneTrampoline(sqlite3_context*); */ import "C" import ( @@ -127,10 +129,11 @@ type SQLiteDriver struct { // Conn struct. type SQLiteConn struct { - db *C.sqlite3 - loc *time.Location - txlock string - funcs []*functionInfo + db *C.sqlite3 + loc *time.Location + txlock string + funcs []*functionInfo + aggregators []*aggInfo } // Tx struct. @@ -171,49 +174,96 @@ type functionInfo struct { retConverter callbackRetConverter } -func (fi *functionInfo) error(ctx *C.sqlite3_context, err error) { - cstr := C.CString(err.Error()) - defer C.free(unsafe.Pointer(cstr)) - C.sqlite3_result_error(ctx, cstr, -1) -} - func (fi *functionInfo) Call(ctx *C.sqlite3_context, argv []*C.sqlite3_value) { - var args []reflect.Value - - if len(argv) < len(fi.argConverters) { - fi.error(ctx, fmt.Errorf("function requires at least %d arguments", len(fi.argConverters))) - } - - for i, arg := range argv[:len(fi.argConverters)] { - v, err := fi.argConverters[i](arg) - if err != nil { - fi.error(ctx, err) - return - } - args = append(args, v) - } - - if fi.variadicConverter != nil { - for _, arg := range argv[len(fi.argConverters):] { - v, err := fi.variadicConverter(arg) - if err != nil { - fi.error(ctx, err) - return - } - args = append(args, v) - } + args, err := callbackConvertArgs(argv, fi.argConverters, fi.variadicConverter) + if err != nil { + callbackError(ctx, err) + return } ret := fi.f.Call(args) if len(ret) == 2 && ret[1].Interface() != nil { - fi.error(ctx, ret[1].Interface().(error)) + callbackError(ctx, ret[1].Interface().(error)) return } - err := fi.retConverter(ctx, ret[0]) + err = fi.retConverter(ctx, ret[0]) if err != nil { - fi.error(ctx, err) + callbackError(ctx, err) + return + } +} + +type aggInfo struct { + constructor reflect.Value + + // Active aggregator objects for aggregations in flight. The + // aggregators are indexed by a counter stored in the aggregation + // user data space provided by sqlite. + active map[int64]reflect.Value + next int64 + + stepArgConverters []callbackArgConverter + stepVariadicConverter callbackArgConverter + + doneRetConverter callbackRetConverter +} + +func (ai *aggInfo) agg(ctx *C.sqlite3_context) (int64, reflect.Value, error) { + aggIdx := (*int64)(C.sqlite3_aggregate_context(ctx, C.int(8))) + if *aggIdx == 0 { + *aggIdx = ai.next + ret := ai.constructor.Call(nil) + if len(ret) == 2 && ret[1].Interface() != nil { + return 0, reflect.Value{}, ret[1].Interface().(error) + } + if ret[0].IsNil() { + return 0, reflect.Value{}, errors.New("aggregator constructor returned nil state") + } + ai.next++ + ai.active[*aggIdx] = ret[0] + } + return *aggIdx, ai.active[*aggIdx], nil +} + +func (ai *aggInfo) Step(ctx *C.sqlite3_context, argv []*C.sqlite3_value) { + _, agg, err := ai.agg(ctx) + if err != nil { + callbackError(ctx, err) + return + } + + args, err := callbackConvertArgs(argv, ai.stepArgConverters, ai.stepVariadicConverter) + if err != nil { + callbackError(ctx, err) + return + } + + ret := agg.MethodByName("Step").Call(args) + if len(ret) == 1 && ret[0].Interface() != nil { + callbackError(ctx, ret[0].Interface().(error)) + return + } +} + +func (ai *aggInfo) Done(ctx *C.sqlite3_context) { + idx, agg, err := ai.agg(ctx) + if err != nil { + callbackError(ctx, err) + return + } + defer func() { delete(ai.active, idx) }() + + ret := agg.MethodByName("Done").Call(nil) + if len(ret) == 2 && ret[1].Interface() != nil { + callbackError(ctx, ret[1].Interface().(error)) + return + } + + err = ai.doneRetConverter(ctx, ret[0]) + if err != nil { + callbackError(ctx, err) return } } @@ -244,6 +294,8 @@ func (tx *SQLiteTx) Rollback() error { // If pure is true. SQLite will assume that the function's return // value depends only on its inputs, and make more aggressive // optimizations in its queries. +// +// See _example/go_custom_funcs for a detailed example. func (c *SQLiteConn) RegisterFunc(name string, impl interface{}, pure bool) error { var fi functionInfo fi.f = reflect.ValueOf(impl) @@ -298,7 +350,132 @@ func (c *SQLiteConn) RegisterFunc(name string, impl interface{}, pure bool) erro if pure { opts |= C.SQLITE_DETERMINISTIC } - rv := C.sqlite3_create_function_v2(c.db, cname, C.int(numArgs), C.int(opts), unsafe.Pointer(&fi), (*[0]byte)(unsafe.Pointer(C.callbackTrampoline)), nil, nil, nil) + rv := C.sqlite3_create_function(c.db, cname, C.int(numArgs), C.int(opts), unsafe.Pointer(&fi), (*[0]byte)(unsafe.Pointer(C.callbackTrampoline)), nil, nil) + if rv != C.SQLITE_OK { + return c.lastError() + } + return nil +} + +// RegisterAggregator makes a Go type available as a SQLite aggregation function. +// +// Because aggregation is incremental, it's implemented in Go with a +// type that has 2 methods: func Step(values) accumulates one row of +// data into the accumulator, and func Done() ret finalizes and +// returns the aggregate value. "values" and "ret" may be any type +// supported by RegisterFunc. +// +// RegisterAggregator takes as implementation a constructor function +// that constructs an instance of the aggregator type each time an +// aggregation begins. The constructor must return a pointer to a +// type, or an interface that implements Step() and Done(). +// +// The constructor function and the Step/Done methods may optionally +// return an error in addition to their other return values. +// +// See _example/go_custom_funcs for a detailed example. +func (c *SQLiteConn) RegisterAggregator(name string, impl interface{}, pure bool) error { + var ai aggInfo + ai.constructor = reflect.ValueOf(impl) + t := ai.constructor.Type() + if t.Kind() != reflect.Func { + return errors.New("non-function passed to RegisterAggregator") + } + if t.NumOut() != 1 && t.NumOut() != 2 { + return errors.New("SQLite aggregator constructors must return 1 or 2 values") + } + if t.NumOut() == 2 && !t.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return errors.New("Second return value of SQLite function must be error") + } + if t.NumIn() != 0 { + return errors.New("SQLite aggregator constructors must not have arguments") + } + + agg := t.Out(0) + switch agg.Kind() { + case reflect.Ptr, reflect.Interface: + default: + return errors.New("SQlite aggregator constructor must return a pointer object") + } + stepFn, found := agg.MethodByName("Step") + if !found { + return errors.New("SQlite aggregator doesn't have a Step() function") + } + step := stepFn.Type + if step.NumOut() != 0 && step.NumOut() != 1 { + return errors.New("SQlite aggregator Step() function must return 0 or 1 values") + } + if step.NumOut() == 1 && !step.Out(0).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return errors.New("type of SQlite aggregator Step() return value must be error") + } + + stepNArgs := step.NumIn() + start := 0 + if agg.Kind() == reflect.Ptr { + // Skip over the method receiver + stepNArgs-- + start++ + } + if step.IsVariadic() { + stepNArgs-- + } + for i := start; i < start+stepNArgs; i++ { + conv, err := callbackArg(step.In(i)) + if err != nil { + return err + } + ai.stepArgConverters = append(ai.stepArgConverters, conv) + } + if step.IsVariadic() { + conv, err := callbackArg(t.In(start + stepNArgs).Elem()) + if err != nil { + return err + } + ai.stepVariadicConverter = conv + // Pass -1 to sqlite so that it allows any number of + // arguments. The call helper verifies that the minimum number + // of arguments is present for variadic functions. + stepNArgs = -1 + } + + doneFn, found := agg.MethodByName("Done") + if !found { + return errors.New("SQlite aggregator doesn't have a Done() function") + } + done := doneFn.Type + doneNArgs := done.NumIn() + if agg.Kind() == reflect.Ptr { + // Skip over the method receiver + doneNArgs-- + } + if doneNArgs != 0 { + return errors.New("SQlite aggregator Done() function must have no arguments") + } + if done.NumOut() != 1 && done.NumOut() != 2 { + return errors.New("SQLite aggregator Done() function must return 1 or 2 values") + } + if done.NumOut() == 2 && !done.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return errors.New("second return value of SQLite aggregator Done() function must be error") + } + + conv, err := callbackRet(done.Out(0)) + if err != nil { + return err + } + ai.doneRetConverter = conv + ai.active = make(map[int64]reflect.Value) + ai.next = 1 + + // ai must outlast the database connection, or we'll have dangling pointers. + c.aggregators = append(c.aggregators, &ai) + + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + opts := C.SQLITE_UTF8 + if pure { + opts |= C.SQLITE_DETERMINISTIC + } + rv := C.sqlite3_create_function(c.db, cname, C.int(stepNArgs), C.int(opts), unsafe.Pointer(&ai), nil, (*[0]byte)(unsafe.Pointer(C.stepTrampoline)), (*[0]byte)(unsafe.Pointer(C.doneTrampoline))) if rv != C.SQLITE_OK { return c.lastError() } diff --git a/sqlite3_test.go b/sqlite3_test.go index 62db05b..74d3de1 100644 --- a/sqlite3_test.go +++ b/sqlite3_test.go @@ -1175,6 +1175,65 @@ func TestFunctionRegistration(t *testing.T) { } } +type sumAggregator int64 + +func (s *sumAggregator) Step(x int64) { + *s += sumAggregator(x) +} + +func (s *sumAggregator) Done() int64 { + return int64(*s) +} + +func TestAggregatorRegistration(t *testing.T) { + customSum := func() *sumAggregator { + var ret sumAggregator + return &ret + } + + sql.Register("sqlite3_AggregatorRegistration", &SQLiteDriver{ + ConnectHook: func(conn *SQLiteConn) error { + if err := conn.RegisterAggregator("customSum", customSum, true); err != nil { + return err + } + return nil + }, + }) + db, err := sql.Open("sqlite3_AggregatorRegistration", ":memory:") + if err != nil { + t.Fatal("Failed to open database:", err) + } + defer db.Close() + + _, err = db.Exec("create table foo (department integer, profits integer)") + if err != nil { + t.Fatal("Failed to create table:", err) + } + + _, err = db.Exec("insert into foo values (1, 10), (1, 20), (2, 42)") + if err != nil { + t.Fatal("Failed to insert records:", err) + } + + tests := []struct { + dept, sum int64 + }{ + {1, 30}, + {2, 42}, + } + + for _, test := range tests { + var ret int64 + err = db.QueryRow("select customSum(profits) from foo where department = $1 group by department", test.dept).Scan(&ret) + if err != nil { + t.Fatal("Query failed:", err) + } + if ret != test.sum { + t.Fatalf("Custom sum returned wrong value, got %d, want %d", ret, test.sum) + } + } +} + var customFunctionOnce sync.Once func BenchmarkCustomFunctions(b *testing.B) {