Merge pull request #439 from typeless/add-unlock-notify
Add support for sqlite3_unlock_notify
This commit is contained in:
commit
f3aa5ce899
|
@ -12,7 +12,7 @@ env:
|
||||||
matrix:
|
matrix:
|
||||||
- GOTAGS=
|
- GOTAGS=
|
||||||
- GOTAGS=libsqlite3
|
- GOTAGS=libsqlite3
|
||||||
- GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable"
|
- GOTAGS="sqlite_allow_uri_authority sqlite_app_armor sqlite_foreign_keys sqlite_fts5 sqlite_icu sqlite_introspect sqlite_json sqlite_secure_delete sqlite_see sqlite_stat4 sqlite_trace sqlite_userauth sqlite_vacuum_incr sqlite_vtable sqlite_unlock_notify"
|
||||||
- GOTAGS=sqlite_vacuum_full
|
- GOTAGS=sqlite_vacuum_full
|
||||||
|
|
||||||
go:
|
go:
|
||||||
|
|
45
sqlite3.go
45
sqlite3.go
|
@ -78,8 +78,38 @@ _sqlite3_exec(sqlite3* db, const char* pcmd, long long* rowid, long long* change
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY
|
||||||
|
extern int _sqlite3_step_blocking(sqlite3_stmt *stmt);
|
||||||
|
extern int _sqlite3_step_row_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes);
|
||||||
|
extern int _sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail);
|
||||||
|
|
||||||
static int
|
static int
|
||||||
_sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes)
|
_sqlite3_step_internal(sqlite3_stmt *stmt)
|
||||||
|
{
|
||||||
|
return _sqlite3_step_blocking(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_sqlite3_step_row_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes)
|
||||||
|
{
|
||||||
|
return _sqlite3_step_row_blocking(stmt, rowid, changes);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
|
||||||
|
{
|
||||||
|
return _sqlite3_prepare_v2_blocking(db, zSql, nBytes, ppStmt, pzTail);
|
||||||
|
}
|
||||||
|
|
||||||
|
#else
|
||||||
|
static int
|
||||||
|
_sqlite3_step_internal(sqlite3_stmt *stmt)
|
||||||
|
{
|
||||||
|
return sqlite3_step(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_sqlite3_step_row_internal(sqlite3_stmt* stmt, long long* rowid, long long* changes)
|
||||||
{
|
{
|
||||||
int rv = sqlite3_step(stmt);
|
int rv = sqlite3_step(stmt);
|
||||||
sqlite3* db = sqlite3_db_handle(stmt);
|
sqlite3* db = sqlite3_db_handle(stmt);
|
||||||
|
@ -88,6 +118,13 @@ _sqlite3_step(sqlite3_stmt* stmt, long long* rowid, long long* changes)
|
||||||
return rv;
|
return rv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
_sqlite3_prepare_v2_internal(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
|
||||||
|
{
|
||||||
|
return sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void _sqlite3_result_text(sqlite3_context* ctx, const char* s) {
|
void _sqlite3_result_text(sqlite3_context* ctx, const char* s) {
|
||||||
sqlite3_result_text(ctx, s, -1, &free);
|
sqlite3_result_text(ctx, s, -1, &free);
|
||||||
}
|
}
|
||||||
|
@ -1637,7 +1674,7 @@ func (c *SQLiteConn) prepare(ctx context.Context, query string) (driver.Stmt, er
|
||||||
defer C.free(unsafe.Pointer(pquery))
|
defer C.free(unsafe.Pointer(pquery))
|
||||||
var s *C.sqlite3_stmt
|
var s *C.sqlite3_stmt
|
||||||
var tail *C.char
|
var tail *C.char
|
||||||
rv := C.sqlite3_prepare_v2(c.db, pquery, -1, &s, &tail)
|
rv := C._sqlite3_prepare_v2_internal(c.db, pquery, -1, &s, &tail)
|
||||||
if rv != C.SQLITE_OK {
|
if rv != C.SQLITE_OK {
|
||||||
return nil, c.lastError()
|
return nil, c.lastError()
|
||||||
}
|
}
|
||||||
|
@ -1871,7 +1908,7 @@ func (s *SQLiteStmt) exec(ctx context.Context, args []namedValue) (driver.Result
|
||||||
}
|
}
|
||||||
|
|
||||||
var rowid, changes C.longlong
|
var rowid, changes C.longlong
|
||||||
rv := C._sqlite3_step(s.s, &rowid, &changes)
|
rv := C._sqlite3_step_row_internal(s.s, &rowid, &changes)
|
||||||
if rv != C.SQLITE_ROW && rv != C.SQLITE_OK && rv != C.SQLITE_DONE {
|
if rv != C.SQLITE_ROW && rv != C.SQLITE_OK && rv != C.SQLITE_DONE {
|
||||||
err := s.c.lastError()
|
err := s.c.lastError()
|
||||||
C.sqlite3_reset(s.s)
|
C.sqlite3_reset(s.s)
|
||||||
|
@ -1943,7 +1980,7 @@ func (rc *SQLiteRows) Next(dest []driver.Value) error {
|
||||||
if rc.s.closed {
|
if rc.s.closed {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
}
|
}
|
||||||
rv := C.sqlite3_step(rc.s.s)
|
rv := C._sqlite3_step_internal(rc.s.s)
|
||||||
if rv == C.SQLITE_DONE {
|
if rv == C.SQLITE_DONE {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by an MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
#ifdef SQLITE_ENABLE_UNLOCK_NOTIFY
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <sqlite3-binding.h>
|
||||||
|
|
||||||
|
extern int unlock_notify_wait(sqlite3 *db);
|
||||||
|
|
||||||
|
int
|
||||||
|
_sqlite3_step_blocking(sqlite3_stmt *stmt)
|
||||||
|
{
|
||||||
|
int rv;
|
||||||
|
sqlite3* db;
|
||||||
|
|
||||||
|
db = sqlite3_db_handle(stmt);
|
||||||
|
for (;;) {
|
||||||
|
rv = sqlite3_step(stmt);
|
||||||
|
if (rv != SQLITE_LOCKED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
rv = unlock_notify_wait(db);
|
||||||
|
if (rv != SQLITE_OK) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sqlite3_reset(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
_sqlite3_step_row_blocking(sqlite3_stmt* stmt, long long* rowid, long long* changes)
|
||||||
|
{
|
||||||
|
int rv;
|
||||||
|
sqlite3* db;
|
||||||
|
|
||||||
|
db = sqlite3_db_handle(stmt);
|
||||||
|
for (;;) {
|
||||||
|
rv = sqlite3_step(stmt);
|
||||||
|
if (rv!=SQLITE_LOCKED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
rv = unlock_notify_wait(db);
|
||||||
|
if (rv != SQLITE_OK) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sqlite3_reset(stmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
*rowid = (long long) sqlite3_last_insert_rowid(db);
|
||||||
|
*changes = (long long) sqlite3_changes(db);
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
_sqlite3_prepare_v2_blocking(sqlite3 *db, const char *zSql, int nBytes, sqlite3_stmt **ppStmt, const char **pzTail)
|
||||||
|
{
|
||||||
|
int rv;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
rv = sqlite3_prepare_v2(db, zSql, nBytes, ppStmt, pzTail);
|
||||||
|
if (rv!=SQLITE_LOCKED) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (sqlite3_extended_errcode(db) != SQLITE_LOCKED_SHAREDCACHE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
rv = unlock_notify_wait(db);
|
||||||
|
if (rv != SQLITE_OK) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rv;
|
||||||
|
}
|
||||||
|
#endif
|
|
@ -0,0 +1,92 @@
|
||||||
|
// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by an MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// +build cgo
|
||||||
|
// +build sqlite_unlock_notify
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
/*
|
||||||
|
#cgo CFLAGS: -DSQLITE_ENABLE_UNLOCK_NOTIFY
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <sqlite3-binding.h>
|
||||||
|
|
||||||
|
extern void unlock_notify_callback(void *arg, int argc);
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
type unlock_notify_table struct {
|
||||||
|
sync.Mutex
|
||||||
|
seqnum uint
|
||||||
|
table map[uint]chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var unt unlock_notify_table = unlock_notify_table{table: make(map[uint]chan struct{})}
|
||||||
|
|
||||||
|
func (t *unlock_notify_table) add(c chan struct{}) uint {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
h := t.seqnum
|
||||||
|
t.table[h] = c
|
||||||
|
t.seqnum++
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *unlock_notify_table) remove(h uint) {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
delete(t.table, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *unlock_notify_table) get(h uint) chan struct{} {
|
||||||
|
t.Lock()
|
||||||
|
defer t.Unlock()
|
||||||
|
c, ok := t.table[h]
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("Non-existent key for unlcok-notify channel: %d", h))
|
||||||
|
}
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
//export unlock_notify_callback
|
||||||
|
func unlock_notify_callback(argv unsafe.Pointer, argc C.int) {
|
||||||
|
for i := 0; i < int(argc); i++ {
|
||||||
|
parg := ((*(*[1 << 30]*[1]uint)(argv))[i])
|
||||||
|
arg := *parg
|
||||||
|
h := arg[0]
|
||||||
|
c := unt.get(h)
|
||||||
|
c <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//export unlock_notify_wait
|
||||||
|
func unlock_notify_wait(db *C.sqlite3) C.int {
|
||||||
|
// It has to be a bufferred channel to not block in sqlite_unlock_notify
|
||||||
|
// as sqlite_unlock_notify could invoke the callback before it returns.
|
||||||
|
c := make(chan struct{}, 1)
|
||||||
|
defer close(c)
|
||||||
|
|
||||||
|
h := unt.add(c)
|
||||||
|
defer unt.remove(h)
|
||||||
|
|
||||||
|
pargv := C.malloc(C.sizeof_uint)
|
||||||
|
defer C.free(pargv)
|
||||||
|
|
||||||
|
argv := (*[1]uint)(pargv)
|
||||||
|
argv[0] = h
|
||||||
|
if rv := C.sqlite3_unlock_notify(db, (*[0]byte)(C.unlock_notify_callback), unsafe.Pointer(pargv)); rv != C.SQLITE_OK {
|
||||||
|
return rv
|
||||||
|
}
|
||||||
|
|
||||||
|
<-c
|
||||||
|
|
||||||
|
return C.SQLITE_OK
|
||||||
|
}
|
|
@ -0,0 +1,222 @@
|
||||||
|
// Copyright (C) 2018 Yasuhiro Matsumoto <mattn.jp@gmail.com>.
|
||||||
|
//
|
||||||
|
// Use of this source code is governed by an MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// +build sqlite_unlock_notify
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestUnlockNotify(t *testing.T) {
|
||||||
|
tempFilename := TempFilename(t)
|
||||||
|
defer os.Remove(tempFilename)
|
||||||
|
dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
|
||||||
|
db, err := sql.Open("sqlite3", dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to open database:", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to begin transaction:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to insert null:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to update table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
timer := time.NewTimer(500 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
<-timer.C
|
||||||
|
err := tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to commit transaction:", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
rows, err := db.Query("SELECT count(*) from foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to query foo table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rows.Next() {
|
||||||
|
var count int
|
||||||
|
if err := rows.Scan(&count); err != nil {
|
||||||
|
t.Fatal("Failed to Scan rows", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
t.Fatal("Failed at the call to Next:", err)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnlockNotifyMany(t *testing.T) {
|
||||||
|
tempFilename := TempFilename(t)
|
||||||
|
defer os.Remove(tempFilename)
|
||||||
|
dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
|
||||||
|
db, err := sql.Open("sqlite3", dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to open database:", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to begin transaction:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to insert null:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to update table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
timer := time.NewTimer(500 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
<-timer.C
|
||||||
|
err := tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to commit transaction:", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
const concurrentQueries = 1000
|
||||||
|
wg.Add(concurrentQueries)
|
||||||
|
for i := 0; i < concurrentQueries; i++ {
|
||||||
|
go func() {
|
||||||
|
rows, err := db.Query("SELECT count(*) from foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to query foo table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rows.Next() {
|
||||||
|
var count int
|
||||||
|
if err := rows.Scan(&count); err != nil {
|
||||||
|
t.Fatal("Failed to Scan rows", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
t.Fatal("Failed at the call to Next:", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUnlockNotifyDeadlock(t *testing.T) {
|
||||||
|
tempFilename := TempFilename(t)
|
||||||
|
defer os.Remove(tempFilename)
|
||||||
|
dsn := fmt.Sprintf("file:%s?cache=shared&mode=rwc&_busy_timeout=%d", tempFilename, 500)
|
||||||
|
db, err := sql.Open("sqlite3", dsn)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to open database:", err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
_, err = db.Exec("CREATE TABLE foo(id INTEGER, status INTEGER)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to create table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to begin transaction:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("INSERT INTO foo(id, status) VALUES(1, 100)")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to insert null:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec("UPDATE foo SET status = 200 WHERE id = 1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to update table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
timer := time.NewTimer(500 * time.Millisecond)
|
||||||
|
go func() {
|
||||||
|
<-timer.C
|
||||||
|
err := tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to commit transaction:", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
tx2, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to begin transaction:", err)
|
||||||
|
}
|
||||||
|
defer tx2.Rollback()
|
||||||
|
|
||||||
|
_, err = tx2.Exec("DELETE FROM foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to delete table:", err)
|
||||||
|
}
|
||||||
|
err = tx2.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Failed to commit transaction:", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
rows, err := tx.Query("SELECT count(*) from foo")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Unable to query foo table:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if rows.Next() {
|
||||||
|
var count int
|
||||||
|
if err := rows.Scan(&count); err != nil {
|
||||||
|
t.Fatal("Failed to Scan rows", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
t.Fatal("Failed at the call to Next:", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
Loading…
Reference in New Issue