123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958 |
- package sqlite
- import (
- "context"
- "database/sql"
- "fmt"
- "os"
- "path/filepath"
- "regexp"
- "strings"
- "sync"
- "time"
- "git.wecise.com/wecise/cgimport/sqlite/schema"
- "github.com/spf13/cast"
- "github.com/wecisecode/util/cmap"
- "github.com/wecisecode/util/merrs"
- "github.com/wecisecode/util/msgpack"
- "github.com/wecisecode/util/rc"
- "modernc.org/sqlite"
- sqlite3 "modernc.org/sqlite/lib"
- )
- type Status int
- const (
- Uninited Status = iota // 未初始化,不可读,不可写
- Initing // 正在初始化,不可读,不可写
- Inited // 初始化完成,可读,可写
- Closed // 已关闭,不可读,不可写
- )
- func (status Status) String() string {
- switch status {
- case Uninited:
- return "Uninited"
- case Initing:
- return "Initing"
- case Inited:
- return "Inited"
- case Closed:
- return "Closed"
- }
- return "unkown"
- }
- var ReNonWordChars = regexp.MustCompile(`\W`)
- // 一个 SQLDB
- // 对应一个读写表
- // 或
- // 对应一个视图,关联多个只读表
- type SQLDB struct {
- db *sql.DB
- keyspace string
- dbname string
- dbsourcename string
- //
- bufferedCommandsMutex sync.Mutex
- bufferedCommands []*Command
- chstatus chan Status
- chiniting chan error
- // table info
- infomutex sync.RWMutex
- tableinfo *schema.TableInfo
- insertStmt *Stmt
- upsertStmt *Stmt
- datainfo *DataInfo
- statinfo *StatInfo
- //
- reorg_proc ReorgProc
- reorg_rc *rc.RoutinesController
- }
- type ReorgProc struct {
- sync.Mutex
- statinfo_update func()
- datainfo_update func()
- expire_clear func()
- }
- var dbs = cmap.New[string, *SQLDB]()
- func NewSQLDB(keyspace string, dbname string, inmemory bool) (sdb *SQLDB, err error) {
- dbkey := fmt.Sprint("K:", keyspace, "N:", dbname, "M:", inmemory)
- sdb, err = dbs.GetWithNew(dbkey, func() (*SQLDB, error) {
- return openSQLDB(keyspace, dbname, inmemory, false)
- })
- if err != nil {
- return
- }
- if sdb == nil {
- err = merrs.NewError("NewSQLDB " + dbkey + " error")
- return
- }
- return sdb, nil
- }
- var DBPath = "/opt/matrix/var/cgimport/"
- func openSQLDB(keyspace string, dbname string, inmemory bool, autorebuilding bool) (sqldb *SQLDB, err error) {
- var status Status = Uninited
- dbsourcename := "file:"
- if inmemory {
- dbsourcename += dbname + ":memory:?mode=memory&cache=shared"
- } else {
- dbfname := filepath.Join(DBPath, keyspace, dbname)
- e := os.MkdirAll(filepath.Dir(dbfname), os.ModePerm)
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- fi, e := os.Stat(dbfname)
- if e != nil && !os.IsNotExist(e) {
- err = merrs.NewError(e)
- return nil, err
- }
- if fi != nil {
- if autorebuilding {
- e := os.Remove(dbfname)
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- status = Uninited
- } else {
- status = Inited
- }
- }
- dbsourcename += dbfname + "?cache=shared"
- }
- db, e := sql.Open("sqlite", dbsourcename)
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- if !inmemory {
- _, e = db.Exec("PRAGMA temp_store=memory") // 0=1=file 2=memory
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- _, e = db.Exec("PRAGMA journal_mode=memory") // DELETE TRUNCATE PERSIST MEMORY OFF
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- _, e = db.Exec("PRAGMA synchronous=off") // 0=off 1=normal 2=full
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- // _, e = db.Exec("PRAGMA auto_vacuum=none") // 0=none 1=full
- // if e != nil {
- // err = merrs.NewError(e)
- // return nil
- // }
- }
- // _, e = db.Exec("PRAGMA count_changes=1")
- // if e != nil {
- // err = merrs.NewError(e)
- // return nil
- // }
- _, e = db.Exec("PRAGMA case_sensitive_like=1")
- if e != nil {
- err = merrs.NewError(e)
- return nil, err
- }
- sdb := &SQLDB{
- db: db,
- keyspace: keyspace,
- dbname: dbname,
- dbsourcename: dbsourcename,
- bufferedCommands: []*Command{},
- chstatus: make(chan Status, 1),
- chiniting: make(chan error, 1),
- reorg_rc: rc.NewRoutinesController("", 1),
- }
- sdb.chiniting <- nil
- if status == Inited {
- e := sdb.loadTableInfo()
- if e != nil {
- if !merrs.UninitedError.Contains(e) {
- err = merrs.NewError(e)
- return nil, err
- }
- if !autorebuilding {
- sdb.db.Close()
- return openSQLDB(keyspace, dbname, inmemory, true)
- }
- status = Uninited
- }
- }
- sdb.chstatus <- status
- return sdb, nil
- }
- func (me *SQLDB) DBName() string {
- return me.dbname
- }
- func (me *SQLDB) SourceName() string {
- return me.dbsourcename
- }
- func (me *SQLDB) Reset() {
- status := <-me.chstatus
- if status == Initing {
- e := <-me.chiniting // 等待初始化完成
- me.chiniting <- e
- }
- me.chstatus <- Uninited
- }
- func (me *SQLDB) Close() error {
- status := <-me.chstatus
- if status == Initing {
- e := <-me.chiniting // 等待初始化完成
- me.chiniting <- e
- }
- me.chstatus <- Closed
- if status == Closed {
- return nil
- }
- return me.db.Close()
- }
- func (me *SQLDB) Status() Status {
- select {
- case status := <-me.chstatus:
- me.chstatus <- status
- return status
- }
- }
- func (me *SQLDB) SetMaxAttached() (err error) {
- // set SQLITE_MAX_ATTACHED = 125
- conn, e := me.db.Conn(context.Background())
- if e != nil {
- return merrs.NormalError.NewCause(e)
- }
- defer conn.Close()
- sqlite.Limit(conn, sqlite3.SQLITE_MAX_ATTACHED, 125)
- return nil
- }
- func (me *SQLDB) AttacheDB(clsdb *SQLDB, dbname string) error {
- dbsourcename := clsdb.SourceName()
- sql := "ATTACH DATABASE '" + dbsourcename + "' as " + dbname
- logger.Trace(sql)
- _, e := me.exec(sql)
- return e
- }
- func (me *SQLDB) DBList() (dbs map[string]string, err error) {
- sql := "PRAGMA database_list"
- rows, e := me.queryMaps(sql)
- if e != nil {
- return nil, merrs.NormalError.NewWith("", []error{e}, merrs.SSMaps{{"sql": sql}}, 1)
- }
- dbs = map[string]string{}
- for _, m := range rows {
- dbs[cast.ToString(m["name"])] = cast.ToString(m["file"])
- }
- return
- }
- func (me *SQLDB) InitTable(tabledefine *schema.TableDefine, force bool) (err error) {
- me.infomutex.Lock()
- defer me.infomutex.Unlock()
- status := <-me.chstatus
- if status == Inited && !force {
- // 初始化已完成,直接返回
- me.chstatus <- status
- return nil
- }
- if status == Initing {
- me.chstatus <- status
- e := <-me.chiniting // 等待初始化完成
- me.chiniting <- e
- return e
- }
- <-me.chiniting
- me.chstatus <- Initing
- // 操作计数
- // PushCount(time.Now(), 1, me.keyspace, "renew", tabledefine.TableName)
- e := me.renewTable(tabledefine)
- if e != nil {
- err = e
- me.chiniting <- err
- return
- }
- logger.Debug("Init LocalDB", me.dbsourcename)
- // 执行初始化期间产生的更新操作
- _, e = me.execBufferedCommands()
- if e != nil {
- err = merrs.NewError(e)
- me.chiniting <- err
- return
- }
- me.chiniting <- nil
- status = <-me.chstatus
- me.chstatus <- Inited
- // 执行新产生的更新操作
- _, e = me.execBufferedCommands()
- if e != nil {
- err = merrs.NewError(e)
- return
- }
- return nil
- }
- func (me *SQLDB) loadTableInfo() (err error) {
- rows, e := me.queryMaps("SELECT name FROM sqlite_master WHERE type='table'")
- if e != nil {
- return e
- }
- if len(rows) < 2 {
- return merrs.UninitedError.New("uninited")
- }
- // 通过 sqlite_master 和 PRAGMA table_info 获得的信息有限
- // 通过自定义的 schema 表,可以获得自己保存的信息
- rows, e = me.queryMaps("SELECT key,value FROM __table_info__")
- if e != nil {
- return e
- }
- if len(rows) < 3 {
- return merrs.UninitedError.New("uninited")
- }
- mrows := map[string]any{}
- for _, row := range rows {
- mrows[row["key"].(string)] = row["value"]
- }
- // tableinfo
- v_tableinfo := mrows["tableinfo"]
- if v_tableinfo == nil {
- return merrs.UninitedError.New("not found tableinfo")
- }
- s_tableinfo, ok := v_tableinfo.(string)
- if !ok {
- return merrs.UninitedError.New("tableinfo type inconsistent") // 类型不一致
- }
- load_tableinfo := &schema.TableInfo{}
- e = msgpack.Decode([]byte(s_tableinfo), load_tableinfo)
- if e != nil {
- err = merrs.NewError(e, merrs.SSMap{"tablename": load_tableinfo.TableName})
- return
- }
- if load_tableinfo.Version != schema.TableInfoVersion {
- return merrs.UninitedError.New("uninited load_tableinfo.version=" + load_tableinfo.Version)
- }
- me.tableinfo = load_tableinfo
- // datainfo
- v_datainfo := mrows["datainfo"]
- if v_datainfo == nil {
- return merrs.UninitedError.New("not found datainfo")
- }
- s_datainfo, ok := v_datainfo.(string)
- if !ok {
- return merrs.UninitedError.New("datainfo type inconsistent") // 类型不一致
- }
- me.datainfo = &DataInfo{}
- msgpack.Decode([]byte(s_datainfo), me.datainfo)
- // statinfo
- v_statinfo := mrows["statinfo"]
- if v_statinfo == nil {
- return merrs.UninitedError.New("not found statinfo")
- }
- s_statinfo, ok := v_statinfo.(string)
- if !ok {
- return merrs.UninitedError.New("statinfo type inconsistent") // 类型不一致
- }
- me.statinfo = &StatInfo{}
- msgpack.Decode([]byte(s_statinfo), me.statinfo)
- if me.statinfo.CreateTime.Equal(time.Time{}) {
- return merrs.UninitedError.New("uninited")
- }
- //
- insertstmt, upsertstmt, e := me.initInsertStmt(false)
- if e != nil {
- err = merrs.NewError(e, merrs.SSMap{"tablename": me.tableinfo.TableName})
- return
- }
- me.insertStmt = insertstmt
- me.upsertStmt = upsertstmt
- logger.Debug("Load LocalDB", me.dbsourcename)
- return
- }
- func (me *SQLDB) renewTable(tabledefine *schema.TableDefine) (err error) {
- tableinfo, createfieldtypes := tabledefine.TableInfo()
- if me.tableinfo != nil {
- // 简化结构,一个 DB 里只保留一个表或视图,可通过视图关联多个表
- err = me.dropTable(me.tableinfo.TableName)
- if err != nil {
- return
- }
- }
- e := me.newTable(tableinfo, createfieldtypes)
- if e != nil {
- err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
- return
- }
- me.tableinfo = tableinfo
- insertstmt, upsertstmt, e := me.initInsertStmt(true)
- if e != nil {
- err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
- return
- }
- me.insertStmt = insertstmt
- me.upsertStmt = upsertstmt
- return
- }
- func (me *SQLDB) dropTable(tablename string) error {
- dropSql := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tablename)
- // logger.Tracef("Drop table %s sql: %s", tablename, dropSql)
- _, e := me.exec(dropSql)
- if e != nil {
- return merrs.NewError(e, merrs.SSMap{"tablename": tablename})
- }
- return nil
- }
- func (me *SQLDB) newTable(tableinfo *schema.TableInfo,
- createfieldtypes []string,
- ) (err error) {
- tablename := tableinfo.TableName
- colums := strings.Join(createfieldtypes, `,`)
- if len(tableinfo.IDKeys) > 0 {
- colums = colums + `,` + `PRIMARY KEY("` + strings.Join(tableinfo.IDKeys, `","`) + `")`
- }
- createSql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, tablename, colums)
- // logger.Tracef("Create table %s sql: %s", tablename, createSql)
- if _, e := me.exec(createSql); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- for idxName, idxfields := range tableinfo.Indexes {
- var indexSql string
- if len(idxfields) > 1 {
- indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, strings.Join(idxfields, `", "`))
- } else {
- indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, idxfields[0])
- }
- // logger.Tracef("Create table %s index sql: %s", tablename, indexSql)
- if _, e := me.exec(indexSql); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table index error"), e)
- return
- }
- }
- if _, e := me.exec(`CREATE TABLE IF NOT EXISTS __table_info__ (key TEXT PRIMARY KEY, value TEXT)`); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- if _, e := me.exec(`DELETE FROM __table_info__`); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- tableinfobs, e := msgpack.Encode(tableinfo)
- if e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "tableinfo", string(tableinfobs)); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- me.datainfo = &DataInfo{}
- datainfobs, e := msgpack.Encode(me.datainfo)
- if e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "datainfo", string(datainfobs)); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- me.statinfo = &StatInfo{
- CreateTime: time.Now(),
- }
- statinfobs, e := msgpack.Encode(me.statinfo)
- if e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "statinfo", string(statinfobs)); e != nil {
- err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
- return
- }
- return nil
- }
- func (me *SQLDB) initInsertStmt(initvaluetypes bool) (insertstmt, upsertstmt *Stmt, err error) {
- qmarks := []string{}
- for range me.tableinfo.FieldNames {
- qmarks = append(qmarks, "?")
- }
- upsertSql := fmt.Sprintf(`BEGIN TRANSACTION;
- DELETE FROM "`+me.tableinfo.TableName+`" WHERE "%s"=?;
- INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s);
- COMMIT;`,
- strings.Join(me.tableinfo.IDKeys, `"=? and "`), strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
- upsertstmt, err = me.prepare(upsertSql)
- if err != nil {
- return
- }
- insertSql := fmt.Sprintf(`INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s)`,
- strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
- insertstmt, err = me.prepare(insertSql)
- if err != nil {
- return
- }
- if initvaluetypes {
- if len(me.tableinfo.DefValues) != len(me.tableinfo.FieldNames) {
- err = merrs.NewError(fmt.Sprintf("sqlite prepare insert table %s error", me.tableinfo.TableName))
- return
- }
- params := []any{}
- for _, k := range me.tableinfo.IDKeys {
- for i, f := range me.tableinfo.FieldNames {
- if f == k {
- params = append(params, me.tableinfo.DefValues[i])
- }
- }
- }
- params = append(params, me.tableinfo.DefValues...)
- // 首次插入数据的类型决定该字段的实际存储类型
- _, err = insertstmt.Exec(params...)
- if err != nil {
- return
- }
- _, err = me.delete(nil)
- if err != nil {
- return
- }
- }
- return
- }
- func (me *SQLDB) setLastQueryTime() {
- me.statinfo.LastQueryTime = time.Now()
- me.reorg_proc.Lock()
- defer me.reorg_proc.Unlock()
- me.reorg_proc.statinfo_update = me.updateStatInfo
- me.reorg()
- }
- func (me *SQLDB) setLastUpdateTime() {
- me.statinfo.LastUpdateTime = time.Now()
- me.reorg_proc.Lock()
- defer me.reorg_proc.Unlock()
- me.reorg_proc.statinfo_update = me.updateStatInfo
- me.reorg()
- }
- func (me *SQLDB) updateStatInfo() {
- statinfobs, e := msgpack.Encode(me.statinfo)
- if e != nil {
- logger.Error("UpdateStatInfo Error", e)
- }
- sql := "UPDATE __table_info__ SET value=? WHERE key=?"
- _, e = me.exec(sql, "statinfo", string(statinfobs))
- if e != nil {
- logger.Error("UpdateStatInfo Error", e)
- }
- }
- func (me *SQLDB) clearExpireData() {
- // Format("2006-01-02 15:04:05")
- deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE `+me.tableinfo.LatField+`<?`, me.tableinfo.TableName)
- n, e := me.exec(deleteSql, time.Now().Add(-me.tableinfo.Ttl).UnixMilli())
- if e != nil {
- logger.Error("ClearExpireData Error", e)
- }
- logger.Debug("ClearExpireData", n)
- }
- func (me *SQLDB) ClearExpireData() {
- if me.tableinfo.Ttl > 0 {
- me.reorg_proc.Lock()
- defer me.reorg_proc.Unlock()
- me.reorg_proc.expire_clear = me.clearExpireData
- me.reorg()
- }
- }
- func (me *SQLDB) reorg() {
- me.reorg_rc.CallLast2Only(func() {
- me.reorg_proc.Lock()
- datainfo_update := me.reorg_proc.datainfo_update
- statinfo_update := me.reorg_proc.statinfo_update
- expire_clear := me.reorg_proc.expire_clear
- me.reorg_proc.datainfo_update = nil
- me.reorg_proc.statinfo_update = nil
- me.reorg_proc.expire_clear = nil
- me.reorg_proc.Unlock()
- if datainfo_update != nil {
- datainfo_update()
- }
- if statinfo_update != nil {
- statinfo_update()
- }
- if expire_clear != nil {
- expire_clear()
- }
- time.Sleep(1 * time.Second)
- })
- }
- func (me *SQLDB) execBufferedCommands() (count int, err error) {
- for {
- var uc *Command
- me.bufferedCommandsMutex.Lock()
- if len(me.bufferedCommands) > 0 {
- uc = me.bufferedCommands[0]
- me.bufferedCommands = me.bufferedCommands[1:]
- }
- me.bufferedCommandsMutex.Unlock()
- if uc == nil {
- return
- }
- n, e := me.execCommand(uc)
- if e != nil {
- err = e
- if me.Status() != Inited {
- // 重新初始化
- me.bufferedCommandsMutex.Lock()
- me.bufferedCommands = append(me.bufferedCommands, uc)
- me.bufferedCommandsMutex.Unlock()
- // err = nil
- err = merrs.NewError(merrs.SSMaps{
- {
- "me.Status()": fmt.Sprint(me.Status()),
- }}, e)
- logger.Error(err)
- err = nil
- }
- return
- }
- count += n
- }
- }
- func (me *SQLDB) pushCommand(uc *Command) (n int, err error) {
- select {
- case status := <-me.chstatus:
- me.chstatus <- status
- switch status {
- case Uninited, Initing:
- me.bufferedCommandsMutex.Lock()
- me.bufferedCommands = append(me.bufferedCommands, uc)
- me.bufferedCommandsMutex.Unlock()
- return 1, nil
- case Inited:
- me.bufferedCommandsMutex.Lock()
- me.bufferedCommands = append(me.bufferedCommands, uc)
- me.bufferedCommandsMutex.Unlock()
- n, err = me.execBufferedCommands()
- return
- case Closed:
- return
- }
- }
- return
- }
- func (me *SQLDB) execCommand(uc *Command) (n int, err error) {
- switch uc.CmdType {
- case UCTInsert:
- return me.insert(uc.Data, uc.OverwriteExists)
- case UCTUpdate:
- return me.update(uc.Data, uc.Conds)
- case UCTDelete:
- return me.delete(uc.Conds)
- }
- panic(merrs.NewError(fmt.Sprint("unsupport UpdateCommandType ", uc.CmdType), nil))
- }
- func (me *SQLDB) Prepare(sql string) (*Stmt, error) {
- return me.prepare(sql)
- }
- func (me *SQLDB) prepare(sql string) (*Stmt, error) {
- stmt, e := me.db.Prepare(sql)
- if e != nil {
- return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}})
- }
- return &Stmt{Stmt: stmt}, nil
- }
- func (me *SQLDB) exec(sql string, args ...any) (n int, err error) {
- rslt, e := me.db.Exec(sql, args...)
- if e != nil {
- return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
- }
- if rslt == nil {
- return 0, merrs.NewError("no result", merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
- }
- rn, e := rslt.RowsAffected()
- if e != nil {
- return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
- }
- n = int(rn)
- return
- }
- func (me *SQLDB) Select(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
- var status Status
- select {
- case status = <-me.chstatus:
- me.chstatus <- status
- if status == Closed {
- return nil, merrs.ClosedError.New("closed")
- }
- if status == Uninited || status == Initing {
- return nil, merrs.UninitedError.New("uninited")
- }
- }
- iter, err = me.query(ctx, sql, args...)
- me.setLastQueryTime()
- // PushCount(time.Now(), 1, me.keyspace, "select", me.tableinfo.TableName)
- return
- }
- func (me *SQLDB) queryMaps(sql string, args ...any) (rows []map[string]any, err error) {
- iter, e := me.query(nil, sql, args...)
- if e != nil {
- return nil, e
- }
- return iter.AllMaps()
- }
- func (me *SQLDB) query(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
- defer func() {
- x := recover()
- if x != nil {
- switch xv := x.(type) {
- case error:
- err = xv
- default:
- err = merrs.NewError(fmt.Sprint(x))
- }
- }
- }()
- if ctx == nil {
- ctx = context.Background()
- }
- rows, e := me.db.QueryContext(ctx, sql, args...)
- if e != nil {
- return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
- }
- coltypes, e := rows.ColumnTypes()
- if e != nil {
- return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
- }
- iter = &Iter{ctx: ctx, rows: rows, cols: coltypes}
- return
- }
- func (me *SQLDB) ids(data map[string]any) (idmap map[string]any) {
- idmap = map[string]any{}
- for _, k := range me.tableinfo.IDKeys {
- idmap[k] = data[k]
- }
- return
- }
- func (me *SQLDB) exists(conds map[string]any) (exists bool, err error) {
- values := []any{}
- existsSql := fmt.Sprintf(`SELECT * FROM %s`, me.tableinfo.TableName)
- if len(conds) > 0 {
- existsSql += " WHERE "
- keys := []string{}
- for k, v := range conds {
- keys = append(keys, k+"=?")
- values = append(values, v)
- }
- existsSql += strings.Join(keys, " and ")
- }
- existsSql += " LIMIT 1"
- logger.Tracef("Exists sql: %s, params=%v", existsSql, values)
- iter, e := me.Select(nil, existsSql, values...)
- if e != nil {
- return false, merrs.NewError(e)
- }
- defer iter.Close()
- if iter.rows.Next() {
- return true, nil
- }
- return false, nil
- }
- func (me *SQLDB) Insert(
- data map[string]any,
- overwriteexists bool,
- ) (
- n int,
- err error,
- ) {
- return me.pushCommand(&Command{CmdType: UCTInsert, Data: data, OverwriteExists: overwriteexists})
- }
- func (me *SQLDB) insert(
- data map[string]any,
- overwriteexists bool,
- ) (
- n int,
- err error,
- ) {
- encodedata := map[string]any{}
- for _, field := range me.tableinfo.FieldNames {
- ftype := me.tableinfo.MapNameFields[field].Type
- v, _ := data[field]
- if field == me.tableinfo.LatField {
- v = time.Now().UnixMilli()
- } else {
- v = SQLValueEncode(ftype, v)
- }
- encodedata[field] = v
- }
- values := []interface{}{}
- upsertStmt := me.insertStmt
- if overwriteexists {
- // 覆盖已有数据,需要指定主键值
- upsertStmt = me.upsertStmt
- for _, field := range me.tableinfo.IDKeys {
- values = append(values, encodedata[field])
- }
- }
- for _, field := range me.tableinfo.FieldNames {
- values = append(values, encodedata[field])
- }
- logger.Tracef("Insert %s data: values=%v", me.tableinfo.TableName, values)
- rslt, err := upsertStmt.Exec(values...)
- if err != nil {
- if strings.Index(err.Error(), "UNIQUE constraint") >= 0 {
- return 0, merrs.ExistError.NewError(err,
- merrs.SSMaps{
- {"TableName": me.tableinfo.TableName},
- {"Values": fmt.Sprint(values)},
- {"Status": fmt.Sprint(me.Status())},
- })
- }
- return 0, merrs.NewError(err,
- merrs.SSMaps{
- {"TableName": me.tableinfo.TableName},
- {"Values": fmt.Sprint(values)},
- {"Status": fmt.Sprint(me.Status())},
- })
- }
- if rslt == nil {
- return 0, merrs.NewError(fmt.Sprintf("sqlite insert table %s no result", me.tableinfo.TableName))
- }
- rn, err := rslt.RowsAffected()
- if err != nil {
- return 0, merrs.NewError(err,
- merrs.SSMaps{
- {"TableName": me.tableinfo.TableName},
- {"Values": fmt.Sprint(values)},
- {"Status": fmt.Sprint(me.Status())},
- })
- }
- n = int(rn)
- if n > 0 {
- me.setLastUpdateTime()
- }
- // PushCount(time.Now(), n, me.keyspace, "insert", me.tableinfo.TableName)
- return
- }
- func (me *SQLDB) Update(
- data map[string]any,
- conds map[string]any,
- ) (
- n int,
- err error,
- ) {
- return me.pushCommand(&Command{CmdType: UCTUpdate, Data: data, Conds: conds})
- }
- func (me *SQLDB) update(
- data map[string]any,
- conds map[string]any,
- ) (
- n int,
- err error,
- ) {
- updatefields := []string{}
- condfields := []string{}
- values := []interface{}{}
- for k, v := range data {
- updatefields = append(updatefields, k)
- values = append(values, v)
- }
- updatefields = append(updatefields, me.tableinfo.LatField)
- values = append(values, time.Now().UnixMilli())
- for k, v := range conds {
- condfields = append(condfields, k)
- values = append(values, v)
- }
- if len(values) == 0 {
- return 0, nil
- }
- updateSql := fmt.Sprintf(`UPDATE %s SET "%s"=?`, me.tableinfo.TableName, strings.Join(updatefields, `"=?,"`))
- if len(condfields) > 0 {
- updateSql += fmt.Sprintf(` WHERE "%s"=?`, strings.Join(condfields, `"=? and "`))
- }
- logger.Tracef("Update sql: %s, params=%v", updateSql, values)
- n, err = me.exec(updateSql, values...)
- if err != nil {
- return 0, merrs.NewError(fmt.Sprintf("sqlite update table %s error", me.tableinfo.TableName), err)
- }
- if n > 0 {
- me.setLastUpdateTime()
- }
- // PushCount(time.Now(), n, me.keyspace, "update", me.tableinfo.TableName)
- return
- }
- func (me *SQLDB) Delete(
- conds map[string]any,
- ) (
- n int,
- err error,
- ) {
- return me.pushCommand(&Command{CmdType: UCTDelete, Conds: conds})
- }
- func (me *SQLDB) delete(
- conds map[string]any,
- ) (
- n int,
- err error,
- ) {
- values := []interface{}{}
- deleteSql := fmt.Sprintf(`DELETE FROM %s`, me.tableinfo.TableName)
- if len(conds) > 0 {
- fields := []string{}
- for k, v := range conds {
- fields = append(fields, `"`+k+`"=?`)
- values = append(values, v)
- }
- deleteSql += " WHERE " + strings.Join(fields, " and ")
- }
- logger.Tracef("Delete sql: %s, params=%v", deleteSql, values)
- n, err = me.exec(deleteSql, values...)
- if err != nil {
- return 0, merrs.NewError(fmt.Sprintf("sqlite delete table %s error", me.tableinfo.TableName), err)
- }
- if n > 0 {
- me.setLastUpdateTime()
- }
- // PushCount(time.Now(), n, me.keyspace, "delete", me.tableinfo.TableName)
- return
- }
|