|  | @@ -1,958 +0,0 @@
 | 
	
		
			
				|  |  | -package sqlite
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -import (
 | 
	
		
			
				|  |  | -	"context"
 | 
	
		
			
				|  |  | -	"database/sql"
 | 
	
		
			
				|  |  | -	"fmt"
 | 
	
		
			
				|  |  | -	"os"
 | 
	
		
			
				|  |  | -	"path/filepath"
 | 
	
		
			
				|  |  | -	"regexp"
 | 
	
		
			
				|  |  | -	"strings"
 | 
	
		
			
				|  |  | -	"sync"
 | 
	
		
			
				|  |  | -	"time"
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -	"git.wecise.com/wecise/cgimport/sqlite/schema"
 | 
	
		
			
				|  |  | -	"github.com/spf13/cast"
 | 
	
		
			
				|  |  | -	"github.com/wecisecode/util/cmap"
 | 
	
		
			
				|  |  | -	"github.com/wecisecode/util/merrs"
 | 
	
		
			
				|  |  | -	"github.com/wecisecode/util/msgpack"
 | 
	
		
			
				|  |  | -	"github.com/wecisecode/util/rc"
 | 
	
		
			
				|  |  | -	"modernc.org/sqlite"
 | 
	
		
			
				|  |  | -	sqlite3 "modernc.org/sqlite/lib"
 | 
	
		
			
				|  |  | -)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -type Status int
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -const (
 | 
	
		
			
				|  |  | -	Uninited Status = iota // 未初始化,不可读,不可写
 | 
	
		
			
				|  |  | -	Initing                // 正在初始化,不可读,不可写
 | 
	
		
			
				|  |  | -	Inited                 // 初始化完成,可读,可写
 | 
	
		
			
				|  |  | -	Closed                 // 已关闭,不可读,不可写
 | 
	
		
			
				|  |  | -)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (status Status) String() string {
 | 
	
		
			
				|  |  | -	switch status {
 | 
	
		
			
				|  |  | -	case Uninited:
 | 
	
		
			
				|  |  | -		return "Uninited"
 | 
	
		
			
				|  |  | -	case Initing:
 | 
	
		
			
				|  |  | -		return "Initing"
 | 
	
		
			
				|  |  | -	case Inited:
 | 
	
		
			
				|  |  | -		return "Inited"
 | 
	
		
			
				|  |  | -	case Closed:
 | 
	
		
			
				|  |  | -		return "Closed"
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return "unkown"
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -var ReNonWordChars = regexp.MustCompile(`\W`)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// 一个 SQLDB
 | 
	
		
			
				|  |  | -// 对应一个读写表
 | 
	
		
			
				|  |  | -// 或
 | 
	
		
			
				|  |  | -// 对应一个视图,关联多个只读表
 | 
	
		
			
				|  |  | -type SQLDB struct {
 | 
	
		
			
				|  |  | -	db           *sql.DB
 | 
	
		
			
				|  |  | -	keyspace     string
 | 
	
		
			
				|  |  | -	dbname       string
 | 
	
		
			
				|  |  | -	dbsourcename string
 | 
	
		
			
				|  |  | -	//
 | 
	
		
			
				|  |  | -	bufferedCommandsMutex sync.Mutex
 | 
	
		
			
				|  |  | -	bufferedCommands      []*Command
 | 
	
		
			
				|  |  | -	chstatus              chan Status
 | 
	
		
			
				|  |  | -	chiniting             chan error
 | 
	
		
			
				|  |  | -	// table info
 | 
	
		
			
				|  |  | -	infomutex  sync.RWMutex
 | 
	
		
			
				|  |  | -	tableinfo  *schema.TableInfo
 | 
	
		
			
				|  |  | -	insertStmt *Stmt
 | 
	
		
			
				|  |  | -	upsertStmt *Stmt
 | 
	
		
			
				|  |  | -	datainfo   *DataInfo
 | 
	
		
			
				|  |  | -	statinfo   *StatInfo
 | 
	
		
			
				|  |  | -	//
 | 
	
		
			
				|  |  | -	reorg_proc ReorgProc
 | 
	
		
			
				|  |  | -	reorg_rc   *rc.RoutinesController
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -type ReorgProc struct {
 | 
	
		
			
				|  |  | -	sync.Mutex
 | 
	
		
			
				|  |  | -	statinfo_update func()
 | 
	
		
			
				|  |  | -	datainfo_update func()
 | 
	
		
			
				|  |  | -	expire_clear    func()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -var dbs = cmap.New[string, *SQLDB]()
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func NewSQLDB(keyspace string, dbname string, inmemory bool) (sdb *SQLDB, err error) {
 | 
	
		
			
				|  |  | -	dbkey := fmt.Sprint("K:", keyspace, "N:", dbname, "M:", inmemory)
 | 
	
		
			
				|  |  | -	sdb, err = dbs.GetWithNew(dbkey, func() (*SQLDB, error) {
 | 
	
		
			
				|  |  | -		return openSQLDB(keyspace, dbname, inmemory, false)
 | 
	
		
			
				|  |  | -	})
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if sdb == nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError("NewSQLDB " + dbkey + " error")
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return sdb, nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -var DBPath = "/opt/matrix/var/cgimport/"
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func openSQLDB(keyspace string, dbname string, inmemory bool, autorebuilding bool) (sqldb *SQLDB, err error) {
 | 
	
		
			
				|  |  | -	var status Status = Uninited
 | 
	
		
			
				|  |  | -	dbsourcename := "file:"
 | 
	
		
			
				|  |  | -	if inmemory {
 | 
	
		
			
				|  |  | -		dbsourcename += dbname + ":memory:?mode=memory&cache=shared"
 | 
	
		
			
				|  |  | -	} else {
 | 
	
		
			
				|  |  | -		dbfname := filepath.Join(DBPath, keyspace, dbname)
 | 
	
		
			
				|  |  | -		e := os.MkdirAll(filepath.Dir(dbfname), os.ModePerm)
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -			return nil, err
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		fi, e := os.Stat(dbfname)
 | 
	
		
			
				|  |  | -		if e != nil && !os.IsNotExist(e) {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -			return nil, err
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		if fi != nil {
 | 
	
		
			
				|  |  | -			if autorebuilding {
 | 
	
		
			
				|  |  | -				e := os.Remove(dbfname)
 | 
	
		
			
				|  |  | -				if e != nil {
 | 
	
		
			
				|  |  | -					err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -					return nil, err
 | 
	
		
			
				|  |  | -				}
 | 
	
		
			
				|  |  | -				status = Uninited
 | 
	
		
			
				|  |  | -			} else {
 | 
	
		
			
				|  |  | -				status = Inited
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		dbsourcename += dbfname + "?cache=shared"
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	db, e := sql.Open("sqlite", dbsourcename)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -		return nil, err
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if !inmemory {
 | 
	
		
			
				|  |  | -		_, e = db.Exec("PRAGMA temp_store=memory") // 0=1=file 2=memory
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -			return nil, err
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		_, e = db.Exec("PRAGMA journal_mode=memory") // DELETE TRUNCATE PERSIST MEMORY OFF
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -			return nil, err
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		_, e = db.Exec("PRAGMA synchronous=off") // 0=off 1=normal 2=full
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -			return nil, err
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		// _, e = db.Exec("PRAGMA auto_vacuum=none") // 0=none 1=full
 | 
	
		
			
				|  |  | -		// if e != nil {
 | 
	
		
			
				|  |  | -		// 	err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -		// 	return nil
 | 
	
		
			
				|  |  | -		// }
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	// _, e = db.Exec("PRAGMA count_changes=1")
 | 
	
		
			
				|  |  | -	// if e != nil {
 | 
	
		
			
				|  |  | -	// 	err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -	// 	return nil
 | 
	
		
			
				|  |  | -	// }
 | 
	
		
			
				|  |  | -	_, e = db.Exec("PRAGMA case_sensitive_like=1")
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -		return nil, err
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	sdb := &SQLDB{
 | 
	
		
			
				|  |  | -		db:               db,
 | 
	
		
			
				|  |  | -		keyspace:         keyspace,
 | 
	
		
			
				|  |  | -		dbname:           dbname,
 | 
	
		
			
				|  |  | -		dbsourcename:     dbsourcename,
 | 
	
		
			
				|  |  | -		bufferedCommands: []*Command{},
 | 
	
		
			
				|  |  | -		chstatus:         make(chan Status, 1),
 | 
	
		
			
				|  |  | -		chiniting:        make(chan error, 1),
 | 
	
		
			
				|  |  | -		reorg_rc:         rc.NewRoutinesController("", 1),
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	sdb.chiniting <- nil
 | 
	
		
			
				|  |  | -	if status == Inited {
 | 
	
		
			
				|  |  | -		e := sdb.loadTableInfo()
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			if !merrs.UninitedError.Contains(e) {
 | 
	
		
			
				|  |  | -				err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -				return nil, err
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -			if !autorebuilding {
 | 
	
		
			
				|  |  | -				sdb.db.Close()
 | 
	
		
			
				|  |  | -				return openSQLDB(keyspace, dbname, inmemory, true)
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -			status = Uninited
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	sdb.chstatus <- status
 | 
	
		
			
				|  |  | -	return sdb, nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) DBName() string {
 | 
	
		
			
				|  |  | -	return me.dbname
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) SourceName() string {
 | 
	
		
			
				|  |  | -	return me.dbsourcename
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Reset() {
 | 
	
		
			
				|  |  | -	status := <-me.chstatus
 | 
	
		
			
				|  |  | -	if status == Initing {
 | 
	
		
			
				|  |  | -		e := <-me.chiniting // 等待初始化完成
 | 
	
		
			
				|  |  | -		me.chiniting <- e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.chstatus <- Uninited
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Close() error {
 | 
	
		
			
				|  |  | -	status := <-me.chstatus
 | 
	
		
			
				|  |  | -	if status == Initing {
 | 
	
		
			
				|  |  | -		e := <-me.chiniting // 等待初始化完成
 | 
	
		
			
				|  |  | -		me.chiniting <- e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.chstatus <- Closed
 | 
	
		
			
				|  |  | -	if status == Closed {
 | 
	
		
			
				|  |  | -		return nil
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return me.db.Close()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Status() Status {
 | 
	
		
			
				|  |  | -	select {
 | 
	
		
			
				|  |  | -	case status := <-me.chstatus:
 | 
	
		
			
				|  |  | -		me.chstatus <- status
 | 
	
		
			
				|  |  | -		return status
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) SetMaxAttached() (err error) {
 | 
	
		
			
				|  |  | -	// set SQLITE_MAX_ATTACHED = 125
 | 
	
		
			
				|  |  | -	conn, e := me.db.Conn(context.Background())
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return merrs.NormalError.NewCause(e)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	defer conn.Close()
 | 
	
		
			
				|  |  | -	sqlite.Limit(conn, sqlite3.SQLITE_MAX_ATTACHED, 125)
 | 
	
		
			
				|  |  | -	return nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) AttacheDB(clsdb *SQLDB, dbname string) error {
 | 
	
		
			
				|  |  | -	dbsourcename := clsdb.SourceName()
 | 
	
		
			
				|  |  | -	sql := "ATTACH DATABASE '" + dbsourcename + "' as " + dbname
 | 
	
		
			
				|  |  | -	logger.Trace(sql)
 | 
	
		
			
				|  |  | -	_, e := me.exec(sql)
 | 
	
		
			
				|  |  | -	return e
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) DBList() (dbs map[string]string, err error) {
 | 
	
		
			
				|  |  | -	sql := "PRAGMA database_list"
 | 
	
		
			
				|  |  | -	rows, e := me.queryMaps(sql)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return nil, merrs.NormalError.NewWith("", []error{e}, merrs.SSMaps{{"sql": sql}}, 1)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	dbs = map[string]string{}
 | 
	
		
			
				|  |  | -	for _, m := range rows {
 | 
	
		
			
				|  |  | -		dbs[cast.ToString(m["name"])] = cast.ToString(m["file"])
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) InitTable(tabledefine *schema.TableDefine, force bool) (err error) {
 | 
	
		
			
				|  |  | -	me.infomutex.Lock()
 | 
	
		
			
				|  |  | -	defer me.infomutex.Unlock()
 | 
	
		
			
				|  |  | -	status := <-me.chstatus
 | 
	
		
			
				|  |  | -	if status == Inited && !force {
 | 
	
		
			
				|  |  | -		// 初始化已完成,直接返回
 | 
	
		
			
				|  |  | -		me.chstatus <- status
 | 
	
		
			
				|  |  | -		return nil
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if status == Initing {
 | 
	
		
			
				|  |  | -		me.chstatus <- status
 | 
	
		
			
				|  |  | -		e := <-me.chiniting // 等待初始化完成
 | 
	
		
			
				|  |  | -		me.chiniting <- e
 | 
	
		
			
				|  |  | -		return e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	<-me.chiniting
 | 
	
		
			
				|  |  | -	me.chstatus <- Initing
 | 
	
		
			
				|  |  | -	// 操作计数
 | 
	
		
			
				|  |  | -	// PushCount(time.Now(), 1, me.keyspace, "renew", tabledefine.TableName)
 | 
	
		
			
				|  |  | -	e := me.renewTable(tabledefine)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = e
 | 
	
		
			
				|  |  | -		me.chiniting <- err
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	logger.Debug("Init LocalDB", me.dbsourcename)
 | 
	
		
			
				|  |  | -	// 执行初始化期间产生的更新操作
 | 
	
		
			
				|  |  | -	_, e = me.execBufferedCommands()
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -		me.chiniting <- err
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.chiniting <- nil
 | 
	
		
			
				|  |  | -	status = <-me.chstatus
 | 
	
		
			
				|  |  | -	me.chstatus <- Inited
 | 
	
		
			
				|  |  | -	// 执行新产生的更新操作
 | 
	
		
			
				|  |  | -	_, e = me.execBufferedCommands()
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) loadTableInfo() (err error) {
 | 
	
		
			
				|  |  | -	rows, e := me.queryMaps("SELECT name FROM sqlite_master WHERE type='table'")
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if len(rows) < 2 {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("uninited")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -	// 通过 sqlite_master 和 PRAGMA table_info 获得的信息有限
 | 
	
		
			
				|  |  | -	// 通过自定义的 schema 表,可以获得自己保存的信息
 | 
	
		
			
				|  |  | -	rows, e = me.queryMaps("SELECT key,value FROM __table_info__")
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if len(rows) < 3 {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("uninited")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	mrows := map[string]any{}
 | 
	
		
			
				|  |  | -	for _, row := range rows {
 | 
	
		
			
				|  |  | -		mrows[row["key"].(string)] = row["value"]
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	// tableinfo
 | 
	
		
			
				|  |  | -	v_tableinfo := mrows["tableinfo"]
 | 
	
		
			
				|  |  | -	if v_tableinfo == nil {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("not found tableinfo")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	s_tableinfo, ok := v_tableinfo.(string)
 | 
	
		
			
				|  |  | -	if !ok {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("tableinfo type inconsistent") // 类型不一致
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	load_tableinfo := &schema.TableInfo{}
 | 
	
		
			
				|  |  | -	e = msgpack.Decode([]byte(s_tableinfo), load_tableinfo)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e, merrs.SSMap{"tablename": load_tableinfo.TableName})
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if load_tableinfo.Version != schema.TableInfoVersion {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("uninited load_tableinfo.version=" + load_tableinfo.Version)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.tableinfo = load_tableinfo
 | 
	
		
			
				|  |  | -	// datainfo
 | 
	
		
			
				|  |  | -	v_datainfo := mrows["datainfo"]
 | 
	
		
			
				|  |  | -	if v_datainfo == nil {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("not found datainfo")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	s_datainfo, ok := v_datainfo.(string)
 | 
	
		
			
				|  |  | -	if !ok {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("datainfo type inconsistent") // 类型不一致
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.datainfo = &DataInfo{}
 | 
	
		
			
				|  |  | -	msgpack.Decode([]byte(s_datainfo), me.datainfo)
 | 
	
		
			
				|  |  | -	// statinfo
 | 
	
		
			
				|  |  | -	v_statinfo := mrows["statinfo"]
 | 
	
		
			
				|  |  | -	if v_statinfo == nil {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("not found statinfo")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	s_statinfo, ok := v_statinfo.(string)
 | 
	
		
			
				|  |  | -	if !ok {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("statinfo type inconsistent") // 类型不一致
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.statinfo = &StatInfo{}
 | 
	
		
			
				|  |  | -	msgpack.Decode([]byte(s_statinfo), me.statinfo)
 | 
	
		
			
				|  |  | -	if me.statinfo.CreateTime.Equal(time.Time{}) {
 | 
	
		
			
				|  |  | -		return merrs.UninitedError.New("uninited")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	//
 | 
	
		
			
				|  |  | -	insertstmt, upsertstmt, e := me.initInsertStmt(false)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e, merrs.SSMap{"tablename": me.tableinfo.TableName})
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.insertStmt = insertstmt
 | 
	
		
			
				|  |  | -	me.upsertStmt = upsertstmt
 | 
	
		
			
				|  |  | -	logger.Debug("Load LocalDB", me.dbsourcename)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) renewTable(tabledefine *schema.TableDefine) (err error) {
 | 
	
		
			
				|  |  | -	tableinfo, createfieldtypes := tabledefine.TableInfo()
 | 
	
		
			
				|  |  | -	if me.tableinfo != nil {
 | 
	
		
			
				|  |  | -		// 简化结构,一个 DB 里只保留一个表或视图,可通过视图关联多个表
 | 
	
		
			
				|  |  | -		err = me.dropTable(me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -		if err != nil {
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	e := me.newTable(tableinfo, createfieldtypes)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.tableinfo = tableinfo
 | 
	
		
			
				|  |  | -	insertstmt, upsertstmt, e := me.initInsertStmt(true)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.insertStmt = insertstmt
 | 
	
		
			
				|  |  | -	me.upsertStmt = upsertstmt
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) dropTable(tablename string) error {
 | 
	
		
			
				|  |  | -	dropSql := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tablename)
 | 
	
		
			
				|  |  | -	// logger.Tracef("Drop table %s sql: %s", tablename, dropSql)
 | 
	
		
			
				|  |  | -	_, e := me.exec(dropSql)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return merrs.NewError(e, merrs.SSMap{"tablename": tablename})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) newTable(tableinfo *schema.TableInfo,
 | 
	
		
			
				|  |  | -	createfieldtypes []string,
 | 
	
		
			
				|  |  | -) (err error) {
 | 
	
		
			
				|  |  | -	tablename := tableinfo.TableName
 | 
	
		
			
				|  |  | -	colums := strings.Join(createfieldtypes, `,`)
 | 
	
		
			
				|  |  | -	if len(tableinfo.IDKeys) > 0 {
 | 
	
		
			
				|  |  | -		colums = colums + `,` + `PRIMARY KEY("` + strings.Join(tableinfo.IDKeys, `","`) + `")`
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	createSql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, tablename, colums)
 | 
	
		
			
				|  |  | -	// logger.Tracef("Create table %s sql: %s", tablename, createSql)
 | 
	
		
			
				|  |  | -	if _, e := me.exec(createSql); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	for idxName, idxfields := range tableinfo.Indexes {
 | 
	
		
			
				|  |  | -		var indexSql string
 | 
	
		
			
				|  |  | -		if len(idxfields) > 1 {
 | 
	
		
			
				|  |  | -			indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, strings.Join(idxfields, `", "`))
 | 
	
		
			
				|  |  | -		} else {
 | 
	
		
			
				|  |  | -			indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, idxfields[0])
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		// logger.Tracef("Create table %s index sql: %s", tablename, indexSql)
 | 
	
		
			
				|  |  | -		if _, e := me.exec(indexSql); e != nil {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(fmt.Sprintf("sqlite create table index error"), e)
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if _, e := me.exec(`CREATE TABLE IF NOT EXISTS __table_info__ (key TEXT PRIMARY KEY, value TEXT)`); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if _, e := me.exec(`DELETE FROM __table_info__`); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	tableinfobs, e := msgpack.Encode(tableinfo)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "tableinfo", string(tableinfobs)); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.datainfo = &DataInfo{}
 | 
	
		
			
				|  |  | -	datainfobs, e := msgpack.Encode(me.datainfo)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "datainfo", string(datainfobs)); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	me.statinfo = &StatInfo{
 | 
	
		
			
				|  |  | -		CreateTime: time.Now(),
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	statinfobs, e := msgpack.Encode(me.statinfo)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "statinfo", string(statinfobs)); e != nil {
 | 
	
		
			
				|  |  | -		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) initInsertStmt(initvaluetypes bool) (insertstmt, upsertstmt *Stmt, err error) {
 | 
	
		
			
				|  |  | -	qmarks := []string{}
 | 
	
		
			
				|  |  | -	for range me.tableinfo.FieldNames {
 | 
	
		
			
				|  |  | -		qmarks = append(qmarks, "?")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	upsertSql := fmt.Sprintf(`BEGIN TRANSACTION;
 | 
	
		
			
				|  |  | -DELETE FROM "`+me.tableinfo.TableName+`" WHERE "%s"=?;
 | 
	
		
			
				|  |  | -INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s);
 | 
	
		
			
				|  |  | -COMMIT;`,
 | 
	
		
			
				|  |  | -		strings.Join(me.tableinfo.IDKeys, `"=? and "`), strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
 | 
	
		
			
				|  |  | -	upsertstmt, err = me.prepare(upsertSql)
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	insertSql := fmt.Sprintf(`INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s)`,
 | 
	
		
			
				|  |  | -		strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
 | 
	
		
			
				|  |  | -	insertstmt, err = me.prepare(insertSql)
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if initvaluetypes {
 | 
	
		
			
				|  |  | -		if len(me.tableinfo.DefValues) != len(me.tableinfo.FieldNames) {
 | 
	
		
			
				|  |  | -			err = merrs.NewError(fmt.Sprintf("sqlite prepare insert table %s error", me.tableinfo.TableName))
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		params := []any{}
 | 
	
		
			
				|  |  | -		for _, k := range me.tableinfo.IDKeys {
 | 
	
		
			
				|  |  | -			for i, f := range me.tableinfo.FieldNames {
 | 
	
		
			
				|  |  | -				if f == k {
 | 
	
		
			
				|  |  | -					params = append(params, me.tableinfo.DefValues[i])
 | 
	
		
			
				|  |  | -				}
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		params = append(params, me.tableinfo.DefValues...)
 | 
	
		
			
				|  |  | -		// 首次插入数据的类型决定该字段的实际存储类型
 | 
	
		
			
				|  |  | -		_, err = insertstmt.Exec(params...)
 | 
	
		
			
				|  |  | -		if err != nil {
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		_, err = me.delete(nil)
 | 
	
		
			
				|  |  | -		if err != nil {
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) setLastQueryTime() {
 | 
	
		
			
				|  |  | -	me.statinfo.LastQueryTime = time.Now()
 | 
	
		
			
				|  |  | -	me.reorg_proc.Lock()
 | 
	
		
			
				|  |  | -	defer me.reorg_proc.Unlock()
 | 
	
		
			
				|  |  | -	me.reorg_proc.statinfo_update = me.updateStatInfo
 | 
	
		
			
				|  |  | -	me.reorg()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) setLastUpdateTime() {
 | 
	
		
			
				|  |  | -	me.statinfo.LastUpdateTime = time.Now()
 | 
	
		
			
				|  |  | -	me.reorg_proc.Lock()
 | 
	
		
			
				|  |  | -	defer me.reorg_proc.Unlock()
 | 
	
		
			
				|  |  | -	me.reorg_proc.statinfo_update = me.updateStatInfo
 | 
	
		
			
				|  |  | -	me.reorg()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) updateStatInfo() {
 | 
	
		
			
				|  |  | -	statinfobs, e := msgpack.Encode(me.statinfo)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		logger.Error("UpdateStatInfo Error", e)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	sql := "UPDATE __table_info__ SET value=? WHERE key=?"
 | 
	
		
			
				|  |  | -	_, e = me.exec(sql, "statinfo", string(statinfobs))
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		logger.Error("UpdateStatInfo Error", e)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) clearExpireData() {
 | 
	
		
			
				|  |  | -	// Format("2006-01-02 15:04:05")
 | 
	
		
			
				|  |  | -	deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE `+me.tableinfo.LatField+`<?`, me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	n, e := me.exec(deleteSql, time.Now().Add(-me.tableinfo.Ttl).UnixMilli())
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		logger.Error("ClearExpireData Error", e)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	logger.Debug("ClearExpireData", n)
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) ClearExpireData() {
 | 
	
		
			
				|  |  | -	if me.tableinfo.Ttl > 0 {
 | 
	
		
			
				|  |  | -		me.reorg_proc.Lock()
 | 
	
		
			
				|  |  | -		defer me.reorg_proc.Unlock()
 | 
	
		
			
				|  |  | -		me.reorg_proc.expire_clear = me.clearExpireData
 | 
	
		
			
				|  |  | -		me.reorg()
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) reorg() {
 | 
	
		
			
				|  |  | -	me.reorg_rc.CallLast2Only(func() {
 | 
	
		
			
				|  |  | -		me.reorg_proc.Lock()
 | 
	
		
			
				|  |  | -		datainfo_update := me.reorg_proc.datainfo_update
 | 
	
		
			
				|  |  | -		statinfo_update := me.reorg_proc.statinfo_update
 | 
	
		
			
				|  |  | -		expire_clear := me.reorg_proc.expire_clear
 | 
	
		
			
				|  |  | -		me.reorg_proc.datainfo_update = nil
 | 
	
		
			
				|  |  | -		me.reorg_proc.statinfo_update = nil
 | 
	
		
			
				|  |  | -		me.reorg_proc.expire_clear = nil
 | 
	
		
			
				|  |  | -		me.reorg_proc.Unlock()
 | 
	
		
			
				|  |  | -		if datainfo_update != nil {
 | 
	
		
			
				|  |  | -			datainfo_update()
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		if statinfo_update != nil {
 | 
	
		
			
				|  |  | -			statinfo_update()
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		if expire_clear != nil {
 | 
	
		
			
				|  |  | -			expire_clear()
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		time.Sleep(1 * time.Second)
 | 
	
		
			
				|  |  | -	})
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) execBufferedCommands() (count int, err error) {
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -	for {
 | 
	
		
			
				|  |  | -		var uc *Command
 | 
	
		
			
				|  |  | -		me.bufferedCommandsMutex.Lock()
 | 
	
		
			
				|  |  | -		if len(me.bufferedCommands) > 0 {
 | 
	
		
			
				|  |  | -			uc = me.bufferedCommands[0]
 | 
	
		
			
				|  |  | -			me.bufferedCommands = me.bufferedCommands[1:]
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		me.bufferedCommandsMutex.Unlock()
 | 
	
		
			
				|  |  | -		if uc == nil {
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		n, e := me.execCommand(uc)
 | 
	
		
			
				|  |  | -		if e != nil {
 | 
	
		
			
				|  |  | -			err = e
 | 
	
		
			
				|  |  | -			if me.Status() != Inited {
 | 
	
		
			
				|  |  | -				// 重新初始化
 | 
	
		
			
				|  |  | -				me.bufferedCommandsMutex.Lock()
 | 
	
		
			
				|  |  | -				me.bufferedCommands = append(me.bufferedCommands, uc)
 | 
	
		
			
				|  |  | -				me.bufferedCommandsMutex.Unlock()
 | 
	
		
			
				|  |  | -				// err = nil
 | 
	
		
			
				|  |  | -				err = merrs.NewError(merrs.SSMaps{
 | 
	
		
			
				|  |  | -					{
 | 
	
		
			
				|  |  | -						"me.Status()": fmt.Sprint(me.Status()),
 | 
	
		
			
				|  |  | -					}}, e)
 | 
	
		
			
				|  |  | -				logger.Error(err)
 | 
	
		
			
				|  |  | -				err = nil
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		count += n
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) pushCommand(uc *Command) (n int, err error) {
 | 
	
		
			
				|  |  | -	select {
 | 
	
		
			
				|  |  | -	case status := <-me.chstatus:
 | 
	
		
			
				|  |  | -		me.chstatus <- status
 | 
	
		
			
				|  |  | -		switch status {
 | 
	
		
			
				|  |  | -		case Uninited, Initing:
 | 
	
		
			
				|  |  | -			me.bufferedCommandsMutex.Lock()
 | 
	
		
			
				|  |  | -			me.bufferedCommands = append(me.bufferedCommands, uc)
 | 
	
		
			
				|  |  | -			me.bufferedCommandsMutex.Unlock()
 | 
	
		
			
				|  |  | -			return 1, nil
 | 
	
		
			
				|  |  | -		case Inited:
 | 
	
		
			
				|  |  | -			me.bufferedCommandsMutex.Lock()
 | 
	
		
			
				|  |  | -			me.bufferedCommands = append(me.bufferedCommands, uc)
 | 
	
		
			
				|  |  | -			me.bufferedCommandsMutex.Unlock()
 | 
	
		
			
				|  |  | -			n, err = me.execBufferedCommands()
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		case Closed:
 | 
	
		
			
				|  |  | -			return
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) execCommand(uc *Command) (n int, err error) {
 | 
	
		
			
				|  |  | -	switch uc.CmdType {
 | 
	
		
			
				|  |  | -	case UCTInsert:
 | 
	
		
			
				|  |  | -		return me.insert(uc.Data, uc.OverwriteExists)
 | 
	
		
			
				|  |  | -	case UCTUpdate:
 | 
	
		
			
				|  |  | -		return me.update(uc.Data, uc.Conds)
 | 
	
		
			
				|  |  | -	case UCTDelete:
 | 
	
		
			
				|  |  | -		return me.delete(uc.Conds)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	panic(merrs.NewError(fmt.Sprint("unsupport UpdateCommandType ", uc.CmdType), nil))
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Prepare(sql string) (*Stmt, error) {
 | 
	
		
			
				|  |  | -	return me.prepare(sql)
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) prepare(sql string) (*Stmt, error) {
 | 
	
		
			
				|  |  | -	stmt, e := me.db.Prepare(sql)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return &Stmt{Stmt: stmt}, nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) exec(sql string, args ...any) (n int, err error) {
 | 
	
		
			
				|  |  | -	rslt, e := me.db.Exec(sql, args...)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if rslt == nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError("no result", merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	rn, e := rslt.RowsAffected()
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	n = int(rn)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Select(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
 | 
	
		
			
				|  |  | -	var status Status
 | 
	
		
			
				|  |  | -	select {
 | 
	
		
			
				|  |  | -	case status = <-me.chstatus:
 | 
	
		
			
				|  |  | -		me.chstatus <- status
 | 
	
		
			
				|  |  | -		if status == Closed {
 | 
	
		
			
				|  |  | -			return nil, merrs.ClosedError.New("closed")
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		if status == Uninited || status == Initing {
 | 
	
		
			
				|  |  | -			return nil, merrs.UninitedError.New("uninited")
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	iter, err = me.query(ctx, sql, args...)
 | 
	
		
			
				|  |  | -	me.setLastQueryTime()
 | 
	
		
			
				|  |  | -	// PushCount(time.Now(), 1, me.keyspace, "select", me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) queryMaps(sql string, args ...any) (rows []map[string]any, err error) {
 | 
	
		
			
				|  |  | -	iter, e := me.query(nil, sql, args...)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return nil, e
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return iter.AllMaps()
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) query(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
 | 
	
		
			
				|  |  | -	defer func() {
 | 
	
		
			
				|  |  | -		x := recover()
 | 
	
		
			
				|  |  | -		if x != nil {
 | 
	
		
			
				|  |  | -			switch xv := x.(type) {
 | 
	
		
			
				|  |  | -			case error:
 | 
	
		
			
				|  |  | -				err = xv
 | 
	
		
			
				|  |  | -			default:
 | 
	
		
			
				|  |  | -				err = merrs.NewError(fmt.Sprint(x))
 | 
	
		
			
				|  |  | -			}
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}()
 | 
	
		
			
				|  |  | -	if ctx == nil {
 | 
	
		
			
				|  |  | -		ctx = context.Background()
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	rows, e := me.db.QueryContext(ctx, sql, args...)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	coltypes, e := rows.ColumnTypes()
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	iter = &Iter{ctx: ctx, rows: rows, cols: coltypes}
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) ids(data map[string]any) (idmap map[string]any) {
 | 
	
		
			
				|  |  | -	idmap = map[string]any{}
 | 
	
		
			
				|  |  | -	for _, k := range me.tableinfo.IDKeys {
 | 
	
		
			
				|  |  | -		idmap[k] = data[k]
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) exists(conds map[string]any) (exists bool, err error) {
 | 
	
		
			
				|  |  | -	values := []any{}
 | 
	
		
			
				|  |  | -	existsSql := fmt.Sprintf(`SELECT * FROM %s`, me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	if len(conds) > 0 {
 | 
	
		
			
				|  |  | -		existsSql += " WHERE "
 | 
	
		
			
				|  |  | -		keys := []string{}
 | 
	
		
			
				|  |  | -		for k, v := range conds {
 | 
	
		
			
				|  |  | -			keys = append(keys, k+"=?")
 | 
	
		
			
				|  |  | -			values = append(values, v)
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		existsSql += strings.Join(keys, " and ")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	existsSql += " LIMIT 1"
 | 
	
		
			
				|  |  | -	logger.Tracef("Exists sql: %s, params=%v", existsSql, values)
 | 
	
		
			
				|  |  | -	iter, e := me.Select(nil, existsSql, values...)
 | 
	
		
			
				|  |  | -	if e != nil {
 | 
	
		
			
				|  |  | -		return false, merrs.NewError(e)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	defer iter.Close()
 | 
	
		
			
				|  |  | -	if iter.rows.Next() {
 | 
	
		
			
				|  |  | -		return true, nil
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	return false, nil
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Insert(
 | 
	
		
			
				|  |  | -	data map[string]any,
 | 
	
		
			
				|  |  | -	overwriteexists bool,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	return me.pushCommand(&Command{CmdType: UCTInsert, Data: data, OverwriteExists: overwriteexists})
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) insert(
 | 
	
		
			
				|  |  | -	data map[string]any,
 | 
	
		
			
				|  |  | -	overwriteexists bool,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	encodedata := map[string]any{}
 | 
	
		
			
				|  |  | -	for _, field := range me.tableinfo.FieldNames {
 | 
	
		
			
				|  |  | -		ftype := me.tableinfo.MapNameFields[field].Type
 | 
	
		
			
				|  |  | -		v, _ := data[field]
 | 
	
		
			
				|  |  | -		if field == me.tableinfo.LatField {
 | 
	
		
			
				|  |  | -			v = time.Now().UnixMilli()
 | 
	
		
			
				|  |  | -		} else {
 | 
	
		
			
				|  |  | -			v = SQLValueEncode(ftype, v)
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		encodedata[field] = v
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	values := []interface{}{}
 | 
	
		
			
				|  |  | -	upsertStmt := me.insertStmt
 | 
	
		
			
				|  |  | -	if overwriteexists {
 | 
	
		
			
				|  |  | -		// 覆盖已有数据,需要指定主键值
 | 
	
		
			
				|  |  | -		upsertStmt = me.upsertStmt
 | 
	
		
			
				|  |  | -		for _, field := range me.tableinfo.IDKeys {
 | 
	
		
			
				|  |  | -			values = append(values, encodedata[field])
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	for _, field := range me.tableinfo.FieldNames {
 | 
	
		
			
				|  |  | -		values = append(values, encodedata[field])
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	logger.Tracef("Insert %s data: values=%v", me.tableinfo.TableName, values)
 | 
	
		
			
				|  |  | -	rslt, err := upsertStmt.Exec(values...)
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		if strings.Index(err.Error(), "UNIQUE constraint") >= 0 {
 | 
	
		
			
				|  |  | -			return 0, merrs.ExistError.NewError(err,
 | 
	
		
			
				|  |  | -				merrs.SSMaps{
 | 
	
		
			
				|  |  | -					{"TableName": me.tableinfo.TableName},
 | 
	
		
			
				|  |  | -					{"Values": fmt.Sprint(values)},
 | 
	
		
			
				|  |  | -					{"Status": fmt.Sprint(me.Status())},
 | 
	
		
			
				|  |  | -				})
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(err,
 | 
	
		
			
				|  |  | -			merrs.SSMaps{
 | 
	
		
			
				|  |  | -				{"TableName": me.tableinfo.TableName},
 | 
	
		
			
				|  |  | -				{"Values": fmt.Sprint(values)},
 | 
	
		
			
				|  |  | -				{"Status": fmt.Sprint(me.Status())},
 | 
	
		
			
				|  |  | -			})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if rslt == nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(fmt.Sprintf("sqlite insert table %s no result", me.tableinfo.TableName))
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	rn, err := rslt.RowsAffected()
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(err,
 | 
	
		
			
				|  |  | -			merrs.SSMaps{
 | 
	
		
			
				|  |  | -				{"TableName": me.tableinfo.TableName},
 | 
	
		
			
				|  |  | -				{"Values": fmt.Sprint(values)},
 | 
	
		
			
				|  |  | -				{"Status": fmt.Sprint(me.Status())},
 | 
	
		
			
				|  |  | -			})
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	n = int(rn)
 | 
	
		
			
				|  |  | -	if n > 0 {
 | 
	
		
			
				|  |  | -		me.setLastUpdateTime()
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	// PushCount(time.Now(), n, me.keyspace, "insert", me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Update(
 | 
	
		
			
				|  |  | -	data map[string]any,
 | 
	
		
			
				|  |  | -	conds map[string]any,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	return me.pushCommand(&Command{CmdType: UCTUpdate, Data: data, Conds: conds})
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) update(
 | 
	
		
			
				|  |  | -	data map[string]any,
 | 
	
		
			
				|  |  | -	conds map[string]any,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	updatefields := []string{}
 | 
	
		
			
				|  |  | -	condfields := []string{}
 | 
	
		
			
				|  |  | -	values := []interface{}{}
 | 
	
		
			
				|  |  | -	for k, v := range data {
 | 
	
		
			
				|  |  | -		updatefields = append(updatefields, k)
 | 
	
		
			
				|  |  | -		values = append(values, v)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	updatefields = append(updatefields, me.tableinfo.LatField)
 | 
	
		
			
				|  |  | -	values = append(values, time.Now().UnixMilli())
 | 
	
		
			
				|  |  | -	for k, v := range conds {
 | 
	
		
			
				|  |  | -		condfields = append(condfields, k)
 | 
	
		
			
				|  |  | -		values = append(values, v)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if len(values) == 0 {
 | 
	
		
			
				|  |  | -		return 0, nil
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	updateSql := fmt.Sprintf(`UPDATE %s SET "%s"=?`, me.tableinfo.TableName, strings.Join(updatefields, `"=?,"`))
 | 
	
		
			
				|  |  | -	if len(condfields) > 0 {
 | 
	
		
			
				|  |  | -		updateSql += fmt.Sprintf(` WHERE "%s"=?`, strings.Join(condfields, `"=? and "`))
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	logger.Tracef("Update sql: %s, params=%v", updateSql, values)
 | 
	
		
			
				|  |  | -	n, err = me.exec(updateSql, values...)
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(fmt.Sprintf("sqlite update table %s error", me.tableinfo.TableName), err)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if n > 0 {
 | 
	
		
			
				|  |  | -		me.setLastUpdateTime()
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	// PushCount(time.Now(), n, me.keyspace, "update", me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) Delete(
 | 
	
		
			
				|  |  | -	conds map[string]any,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	return me.pushCommand(&Command{CmdType: UCTDelete, Conds: conds})
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -func (me *SQLDB) delete(
 | 
	
		
			
				|  |  | -	conds map[string]any,
 | 
	
		
			
				|  |  | -) (
 | 
	
		
			
				|  |  | -	n int,
 | 
	
		
			
				|  |  | -	err error,
 | 
	
		
			
				|  |  | -) {
 | 
	
		
			
				|  |  | -	values := []interface{}{}
 | 
	
		
			
				|  |  | -	deleteSql := fmt.Sprintf(`DELETE FROM %s`, me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	if len(conds) > 0 {
 | 
	
		
			
				|  |  | -		fields := []string{}
 | 
	
		
			
				|  |  | -		for k, v := range conds {
 | 
	
		
			
				|  |  | -			fields = append(fields, `"`+k+`"=?`)
 | 
	
		
			
				|  |  | -			values = append(values, v)
 | 
	
		
			
				|  |  | -		}
 | 
	
		
			
				|  |  | -		deleteSql += " WHERE " + strings.Join(fields, " and ")
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	logger.Tracef("Delete sql: %s, params=%v", deleteSql, values)
 | 
	
		
			
				|  |  | -	n, err = me.exec(deleteSql, values...)
 | 
	
		
			
				|  |  | -	if err != nil {
 | 
	
		
			
				|  |  | -		return 0, merrs.NewError(fmt.Sprintf("sqlite delete table %s error", me.tableinfo.TableName), err)
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	if n > 0 {
 | 
	
		
			
				|  |  | -		me.setLastUpdateTime()
 | 
	
		
			
				|  |  | -	}
 | 
	
		
			
				|  |  | -	// PushCount(time.Now(), n, me.keyspace, "delete", me.tableinfo.TableName)
 | 
	
		
			
				|  |  | -	return
 | 
	
		
			
				|  |  | -}
 |