package sqlite import ( "context" "database/sql" "fmt" "os" "path/filepath" "regexp" "strings" "sync" "time" "git.wecise.com/wecise/cgimport/sqlite/schema" "git.wecise.com/wecise/util/cmap" "git.wecise.com/wecise/util/msgpack" "git.wecise.com/wecise/util/rc" "github.com/spf13/cast" "github.com/wecisecode/util/merrs" "modernc.org/sqlite" sqlite3 "modernc.org/sqlite/lib" ) type Status int const ( Uninited Status = iota // 未初始化,不可读,不可写 Initing // 正在初始化,不可读,不可写 Inited // 初始化完成,可读,可写 Closed // 已关闭,不可读,不可写 ) func (status Status) String() string { switch status { case Uninited: return "Uninited" case Initing: return "Initing" case Inited: return "Inited" case Closed: return "Closed" } return "unkown" } var ReNonWordChars = regexp.MustCompile(`\W`) // 一个 SQLDB // 对应一个读写表 // 或 // 对应一个视图,关联多个只读表 type SQLDB struct { db *sql.DB keyspace string dbname string dbsourcename string // bufferedCommandsMutex sync.Mutex bufferedCommands []*Command chstatus chan Status chiniting chan error // table info infomutex sync.RWMutex tableinfo *schema.TableInfo insertStmt *Stmt upsertStmt *Stmt datainfo *DataInfo statinfo *StatInfo // reorg_proc ReorgProc reorg_rc *rc.RoutinesController } type ReorgProc struct { sync.Mutex statinfo_update func() datainfo_update func() expire_clear func() } var dbs = cmap.New[string, *SQLDB]() func NewSQLDB(keyspace string, dbname string, inmemory bool) (sdb *SQLDB, err error) { dbkey := fmt.Sprint("K:", keyspace, "N:", dbname, "M:", inmemory) sdb, err = dbs.GetWithNew(dbkey, func() (*SQLDB, error) { return openSQLDB(keyspace, dbname, inmemory, false) }) if err != nil { return } if sdb == nil { err = merrs.NewError("NewSQLDB " + dbkey + " error") return } return sdb, nil } var DBPath = "/opt/matrix/var/cgimport/" func openSQLDB(keyspace string, dbname string, inmemory bool, autorebuilding bool) (sqldb *SQLDB, err error) { var status Status = Uninited dbsourcename := "file:" if inmemory { dbsourcename += dbname + ":memory:?mode=memory&cache=shared" } else { dbfname := filepath.Join(DBPath, keyspace, dbname) e := os.MkdirAll(filepath.Dir(dbfname), os.ModePerm) if e != nil { err = merrs.NewError(e) return nil, err } fi, e := os.Stat(dbfname) if e != nil && !os.IsNotExist(e) { err = merrs.NewError(e) return nil, err } if fi != nil { if autorebuilding { e := os.Remove(dbfname) if e != nil { err = merrs.NewError(e) return nil, err } status = Uninited } else { status = Inited } } dbsourcename += dbfname + "?cache=shared" } db, e := sql.Open("sqlite", dbsourcename) if e != nil { err = merrs.NewError(e) return nil, err } if !inmemory { _, e = db.Exec("PRAGMA temp_store=memory") // 0=1=file 2=memory if e != nil { err = merrs.NewError(e) return nil, err } _, e = db.Exec("PRAGMA journal_mode=memory") // DELETE TRUNCATE PERSIST MEMORY OFF if e != nil { err = merrs.NewError(e) return nil, err } _, e = db.Exec("PRAGMA synchronous=off") // 0=off 1=normal 2=full if e != nil { err = merrs.NewError(e) return nil, err } // _, e = db.Exec("PRAGMA auto_vacuum=none") // 0=none 1=full // if e != nil { // err = merrs.NewError(e) // return nil // } } // _, e = db.Exec("PRAGMA count_changes=1") // if e != nil { // err = merrs.NewError(e) // return nil // } _, e = db.Exec("PRAGMA case_sensitive_like=1") if e != nil { err = merrs.NewError(e) return nil, err } sdb := &SQLDB{ db: db, keyspace: keyspace, dbname: dbname, dbsourcename: dbsourcename, bufferedCommands: []*Command{}, chstatus: make(chan Status, 1), chiniting: make(chan error, 1), reorg_rc: rc.NewRoutinesController("", 1), } sdb.chiniting <- nil if status == Inited { e := sdb.loadTableInfo() if e != nil { if !merrs.UninitedError.Contains(e) { err = merrs.NewError(e) return nil, err } if !autorebuilding { sdb.db.Close() return openSQLDB(keyspace, dbname, inmemory, true) } status = Uninited } } sdb.chstatus <- status return sdb, nil } func (me *SQLDB) DBName() string { return me.dbname } func (me *SQLDB) SourceName() string { return me.dbsourcename } func (me *SQLDB) Reset() { status := <-me.chstatus if status == Initing { e := <-me.chiniting // 等待初始化完成 me.chiniting <- e } me.chstatus <- Uninited } func (me *SQLDB) Close() error { status := <-me.chstatus if status == Initing { e := <-me.chiniting // 等待初始化完成 me.chiniting <- e } me.chstatus <- Closed if status == Closed { return nil } return me.db.Close() } func (me *SQLDB) Status() Status { select { case status := <-me.chstatus: me.chstatus <- status return status } } func (me *SQLDB) SetMaxAttached() (err error) { // set SQLITE_MAX_ATTACHED = 125 conn, e := me.db.Conn(context.Background()) if e != nil { return merrs.NormalError.NewCause(e) } defer conn.Close() sqlite.Limit(conn, sqlite3.SQLITE_MAX_ATTACHED, 125) return nil } func (me *SQLDB) AttacheDB(clsdb *SQLDB, dbname string) error { dbsourcename := clsdb.SourceName() sql := "ATTACH DATABASE '" + dbsourcename + "' as " + dbname logger.Trace(sql) _, e := me.exec(sql) return e } func (me *SQLDB) DBList() (dbs map[string]string, err error) { sql := "PRAGMA database_list" rows, e := me.queryMaps(sql) if e != nil { return nil, merrs.NormalError.NewWith("", []error{e}, merrs.SSMaps{{"sql": sql}}, 1) } dbs = map[string]string{} for _, m := range rows { dbs[cast.ToString(m["name"])] = cast.ToString(m["file"]) } return } func (me *SQLDB) InitTable(tabledefine *schema.TableDefine, force bool) (err error) { me.infomutex.Lock() defer me.infomutex.Unlock() status := <-me.chstatus if status == Inited && !force { // 初始化已完成,直接返回 me.chstatus <- status return nil } if status == Initing { me.chstatus <- status e := <-me.chiniting // 等待初始化完成 me.chiniting <- e return e } <-me.chiniting me.chstatus <- Initing // 操作计数 // PushCount(time.Now(), 1, me.keyspace, "renew", tabledefine.TableName) e := me.renewTable(tabledefine) if e != nil { err = e me.chiniting <- err return } logger.Debug("Init LocalDB", me.dbsourcename) // 执行初始化期间产生的更新操作 _, e = me.execBufferedCommands() if e != nil { err = merrs.NewError(e) me.chiniting <- err return } me.chiniting <- nil status = <-me.chstatus me.chstatus <- Inited // 执行新产生的更新操作 _, e = me.execBufferedCommands() if e != nil { err = merrs.NewError(e) return } return nil } func (me *SQLDB) loadTableInfo() (err error) { rows, e := me.queryMaps("SELECT name FROM sqlite_master WHERE type='table'") if e != nil { return e } if len(rows) < 2 { return merrs.UninitedError.New("uninited") } // 通过 sqlite_master 和 PRAGMA table_info 获得的信息有限 // 通过自定义的 schema 表,可以获得自己保存的信息 rows, e = me.queryMaps("SELECT key,value FROM __table_info__") if e != nil { return e } if len(rows) < 3 { return merrs.UninitedError.New("uninited") } mrows := map[string]any{} for _, row := range rows { mrows[row["key"].(string)] = row["value"] } // tableinfo v_tableinfo := mrows["tableinfo"] if v_tableinfo == nil { return merrs.UninitedError.New("not found tableinfo") } s_tableinfo, ok := v_tableinfo.(string) if !ok { return merrs.UninitedError.New("tableinfo type inconsistent") // 类型不一致 } load_tableinfo := &schema.TableInfo{} e = msgpack.Decode([]byte(s_tableinfo), load_tableinfo) if e != nil { err = merrs.NewError(e, merrs.SSMap{"tablename": load_tableinfo.TableName}) return } if load_tableinfo.Version != schema.TableInfoVersion { return merrs.UninitedError.New("uninited load_tableinfo.version=" + load_tableinfo.Version) } me.tableinfo = load_tableinfo // datainfo v_datainfo := mrows["datainfo"] if v_datainfo == nil { return merrs.UninitedError.New("not found datainfo") } s_datainfo, ok := v_datainfo.(string) if !ok { return merrs.UninitedError.New("datainfo type inconsistent") // 类型不一致 } me.datainfo = &DataInfo{} msgpack.Decode([]byte(s_datainfo), me.datainfo) // statinfo v_statinfo := mrows["statinfo"] if v_statinfo == nil { return merrs.UninitedError.New("not found statinfo") } s_statinfo, ok := v_statinfo.(string) if !ok { return merrs.UninitedError.New("statinfo type inconsistent") // 类型不一致 } me.statinfo = &StatInfo{} msgpack.Decode([]byte(s_statinfo), me.statinfo) if me.statinfo.CreateTime.Equal(time.Time{}) { return merrs.UninitedError.New("uninited") } // insertstmt, upsertstmt, e := me.initInsertStmt(false) if e != nil { err = merrs.NewError(e, merrs.SSMap{"tablename": me.tableinfo.TableName}) return } me.insertStmt = insertstmt me.upsertStmt = upsertstmt logger.Debug("Load LocalDB", me.dbsourcename) return } func (me *SQLDB) renewTable(tabledefine *schema.TableDefine) (err error) { tableinfo, createfieldtypes := tabledefine.TableInfo() if me.tableinfo != nil { // 简化结构,一个 DB 里只保留一个表或视图,可通过视图关联多个表 err = me.dropTable(me.tableinfo.TableName) if err != nil { return } } e := me.newTable(tableinfo, createfieldtypes) if e != nil { err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName}) return } me.tableinfo = tableinfo insertstmt, upsertstmt, e := me.initInsertStmt(true) if e != nil { err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName}) return } me.insertStmt = insertstmt me.upsertStmt = upsertstmt return } func (me *SQLDB) dropTable(tablename string) error { dropSql := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tablename) // logger.Tracef("Drop table %s sql: %s", tablename, dropSql) _, e := me.exec(dropSql) if e != nil { return merrs.NewError(e, merrs.SSMap{"tablename": tablename}) } return nil } func (me *SQLDB) newTable(tableinfo *schema.TableInfo, createfieldtypes []string, ) (err error) { tablename := tableinfo.TableName colums := strings.Join(createfieldtypes, `,`) if len(tableinfo.IDKeys) > 0 { colums = colums + `,` + `PRIMARY KEY("` + strings.Join(tableinfo.IDKeys, `","`) + `")` } createSql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, tablename, colums) // logger.Tracef("Create table %s sql: %s", tablename, createSql) if _, e := me.exec(createSql); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } for idxName, idxfields := range tableinfo.Indexes { var indexSql string if len(idxfields) > 1 { indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, strings.Join(idxfields, `", "`)) } else { indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, idxfields[0]) } // logger.Tracef("Create table %s index sql: %s", tablename, indexSql) if _, e := me.exec(indexSql); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table index error"), e) return } } if _, e := me.exec(`CREATE TABLE IF NOT EXISTS __table_info__ (key TEXT PRIMARY KEY, value TEXT)`); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } if _, e := me.exec(`DELETE FROM __table_info__`); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } tableinfobs, e := msgpack.Encode(tableinfo) if e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "tableinfo", string(tableinfobs)); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } me.datainfo = &DataInfo{} datainfobs, e := msgpack.Encode(me.datainfo) if e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "datainfo", string(datainfobs)); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } me.statinfo = &StatInfo{ CreateTime: time.Now(), } statinfobs, e := msgpack.Encode(me.statinfo) if e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "statinfo", string(statinfobs)); e != nil { err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e) return } return nil } func (me *SQLDB) initInsertStmt(initvaluetypes bool) (insertstmt, upsertstmt *Stmt, err error) { qmarks := []string{} for range me.tableinfo.FieldNames { qmarks = append(qmarks, "?") } upsertSql := fmt.Sprintf(`BEGIN TRANSACTION; DELETE FROM "`+me.tableinfo.TableName+`" WHERE "%s"=?; INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s); COMMIT;`, strings.Join(me.tableinfo.IDKeys, `"=? and "`), strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ",")) upsertstmt, err = me.prepare(upsertSql) if err != nil { return } insertSql := fmt.Sprintf(`INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s)`, strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ",")) insertstmt, err = me.prepare(insertSql) if err != nil { return } if initvaluetypes { if len(me.tableinfo.DefValues) != len(me.tableinfo.FieldNames) { err = merrs.NewError(fmt.Sprintf("sqlite prepare insert table %s error", me.tableinfo.TableName)) return } params := []any{} for _, k := range me.tableinfo.IDKeys { for i, f := range me.tableinfo.FieldNames { if f == k { params = append(params, me.tableinfo.DefValues[i]) } } } params = append(params, me.tableinfo.DefValues...) // 首次插入数据的类型决定该字段的实际存储类型 _, err = insertstmt.Exec(params...) if err != nil { return } _, err = me.delete(nil) if err != nil { return } } return } func (me *SQLDB) setLastQueryTime() { me.statinfo.LastQueryTime = time.Now() me.reorg_proc.Lock() defer me.reorg_proc.Unlock() me.reorg_proc.statinfo_update = me.updateStatInfo me.reorg() } func (me *SQLDB) setLastUpdateTime() { me.statinfo.LastUpdateTime = time.Now() me.reorg_proc.Lock() defer me.reorg_proc.Unlock() me.reorg_proc.statinfo_update = me.updateStatInfo me.reorg() } func (me *SQLDB) updateStatInfo() { statinfobs, e := msgpack.Encode(me.statinfo) if e != nil { logger.Error("UpdateStatInfo Error", e) } sql := "UPDATE __table_info__ SET value=? WHERE key=?" _, e = me.exec(sql, "statinfo", string(statinfobs)) if e != nil { logger.Error("UpdateStatInfo Error", e) } } func (me *SQLDB) clearExpireData() { // Format("2006-01-02 15:04:05") deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE `+me.tableinfo.LatField+` 0 { me.reorg_proc.Lock() defer me.reorg_proc.Unlock() me.reorg_proc.expire_clear = me.clearExpireData me.reorg() } } func (me *SQLDB) reorg() { me.reorg_rc.CallLast2Only(func() { me.reorg_proc.Lock() datainfo_update := me.reorg_proc.datainfo_update statinfo_update := me.reorg_proc.statinfo_update expire_clear := me.reorg_proc.expire_clear me.reorg_proc.datainfo_update = nil me.reorg_proc.statinfo_update = nil me.reorg_proc.expire_clear = nil me.reorg_proc.Unlock() if datainfo_update != nil { datainfo_update() } if statinfo_update != nil { statinfo_update() } if expire_clear != nil { expire_clear() } time.Sleep(1 * time.Second) }) } func (me *SQLDB) execBufferedCommands() (count int, err error) { for { var uc *Command me.bufferedCommandsMutex.Lock() if len(me.bufferedCommands) > 0 { uc = me.bufferedCommands[0] me.bufferedCommands = me.bufferedCommands[1:] } me.bufferedCommandsMutex.Unlock() if uc == nil { return } n, e := me.execCommand(uc) if e != nil { err = e if me.Status() != Inited { // 重新初始化 me.bufferedCommandsMutex.Lock() me.bufferedCommands = append(me.bufferedCommands, uc) me.bufferedCommandsMutex.Unlock() // err = nil err = merrs.NewError(merrs.SSMaps{ { "me.Status()": fmt.Sprint(me.Status()), }}, e) logger.Error(err) err = nil } return } count += n } } func (me *SQLDB) pushCommand(uc *Command) (n int, err error) { select { case status := <-me.chstatus: me.chstatus <- status switch status { case Uninited, Initing: me.bufferedCommandsMutex.Lock() me.bufferedCommands = append(me.bufferedCommands, uc) me.bufferedCommandsMutex.Unlock() return 1, nil case Inited: me.bufferedCommandsMutex.Lock() me.bufferedCommands = append(me.bufferedCommands, uc) me.bufferedCommandsMutex.Unlock() n, err = me.execBufferedCommands() return case Closed: return } } return } func (me *SQLDB) execCommand(uc *Command) (n int, err error) { switch uc.CmdType { case UCTInsert: return me.insert(uc.Data, uc.OverwriteExists) case UCTUpdate: return me.update(uc.Data, uc.Conds) case UCTDelete: return me.delete(uc.Conds) } panic(merrs.NewError(fmt.Sprint("unsupport UpdateCommandType ", uc.CmdType), nil)) } func (me *SQLDB) Prepare(sql string) (*Stmt, error) { return me.prepare(sql) } func (me *SQLDB) prepare(sql string) (*Stmt, error) { stmt, e := me.db.Prepare(sql) if e != nil { return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}}) } return &Stmt{Stmt: stmt}, nil } func (me *SQLDB) exec(sql string, args ...any) (n int, err error) { rslt, e := me.db.Exec(sql, args...) if e != nil { return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}}) } if rslt == nil { return 0, merrs.NewError("no result", merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}}) } rn, e := rslt.RowsAffected() if e != nil { return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}}) } n = int(rn) return } func (me *SQLDB) Select(ctx context.Context, sql string, args ...any) (iter *Iter, err error) { var status Status select { case status = <-me.chstatus: me.chstatus <- status if status == Closed { return nil, merrs.ClosedError.New("closed") } if status == Uninited || status == Initing { return nil, merrs.UninitedError.New("uninited") } } iter, err = me.query(ctx, sql, args...) me.setLastQueryTime() // PushCount(time.Now(), 1, me.keyspace, "select", me.tableinfo.TableName) return } func (me *SQLDB) queryMaps(sql string, args ...any) (rows []map[string]any, err error) { iter, e := me.query(nil, sql, args...) if e != nil { return nil, e } return iter.AllMaps() } func (me *SQLDB) query(ctx context.Context, sql string, args ...any) (iter *Iter, err error) { defer func() { x := recover() if x != nil { switch xv := x.(type) { case error: err = xv default: err = merrs.NewError(fmt.Sprint(x)) } } }() if ctx == nil { ctx = context.Background() } rows, e := me.db.QueryContext(ctx, sql, args...) if e != nil { return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}}) } coltypes, e := rows.ColumnTypes() if e != nil { return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}}) } iter = &Iter{ctx: ctx, rows: rows, cols: coltypes} return } func (me *SQLDB) ids(data map[string]any) (idmap map[string]any) { idmap = map[string]any{} for _, k := range me.tableinfo.IDKeys { idmap[k] = data[k] } return } func (me *SQLDB) exists(conds map[string]any) (exists bool, err error) { values := []any{} existsSql := fmt.Sprintf(`SELECT * FROM %s`, me.tableinfo.TableName) if len(conds) > 0 { existsSql += " WHERE " keys := []string{} for k, v := range conds { keys = append(keys, k+"=?") values = append(values, v) } existsSql += strings.Join(keys, " and ") } existsSql += " LIMIT 1" logger.Tracef("Exists sql: %s, params=%v", existsSql, values) iter, e := me.Select(nil, existsSql, values...) if e != nil { return false, merrs.NewError(e) } defer iter.Close() if iter.rows.Next() { return true, nil } return false, nil } func (me *SQLDB) Insert( data map[string]any, overwriteexists bool, ) ( n int, err error, ) { return me.pushCommand(&Command{CmdType: UCTInsert, Data: data, OverwriteExists: overwriteexists}) } func (me *SQLDB) insert( data map[string]any, overwriteexists bool, ) ( n int, err error, ) { encodedata := map[string]any{} for _, field := range me.tableinfo.FieldNames { ftype := me.tableinfo.MapNameFields[field].Type v, _ := data[field] if field == me.tableinfo.LatField { v = time.Now().UnixMilli() } else { v = SQLValueEncode(ftype, v) } encodedata[field] = v } values := []interface{}{} upsertStmt := me.insertStmt if overwriteexists { // 覆盖已有数据,需要指定主键值 upsertStmt = me.upsertStmt for _, field := range me.tableinfo.IDKeys { values = append(values, encodedata[field]) } } for _, field := range me.tableinfo.FieldNames { values = append(values, encodedata[field]) } logger.Tracef("Insert %s data: values=%v", me.tableinfo.TableName, values) rslt, err := upsertStmt.Exec(values...) if err != nil { if strings.Index(err.Error(), "UNIQUE constraint") >= 0 { return 0, merrs.ExistError.NewError(err, merrs.SSMaps{ {"TableName": me.tableinfo.TableName}, {"Values": fmt.Sprint(values)}, {"Status": fmt.Sprint(me.Status())}, }) } return 0, merrs.NewError(err, merrs.SSMaps{ {"TableName": me.tableinfo.TableName}, {"Values": fmt.Sprint(values)}, {"Status": fmt.Sprint(me.Status())}, }) } if rslt == nil { return 0, merrs.NewError(fmt.Sprintf("sqlite insert table %s no result", me.tableinfo.TableName)) } rn, err := rslt.RowsAffected() if err != nil { return 0, merrs.NewError(err, merrs.SSMaps{ {"TableName": me.tableinfo.TableName}, {"Values": fmt.Sprint(values)}, {"Status": fmt.Sprint(me.Status())}, }) } n = int(rn) if n > 0 { me.setLastUpdateTime() } // PushCount(time.Now(), n, me.keyspace, "insert", me.tableinfo.TableName) return } func (me *SQLDB) Update( data map[string]any, conds map[string]any, ) ( n int, err error, ) { return me.pushCommand(&Command{CmdType: UCTUpdate, Data: data, Conds: conds}) } func (me *SQLDB) update( data map[string]any, conds map[string]any, ) ( n int, err error, ) { updatefields := []string{} condfields := []string{} values := []interface{}{} for k, v := range data { updatefields = append(updatefields, k) values = append(values, v) } updatefields = append(updatefields, me.tableinfo.LatField) values = append(values, time.Now().UnixMilli()) for k, v := range conds { condfields = append(condfields, k) values = append(values, v) } if len(values) == 0 { return 0, nil } updateSql := fmt.Sprintf(`UPDATE %s SET "%s"=?`, me.tableinfo.TableName, strings.Join(updatefields, `"=?,"`)) if len(condfields) > 0 { updateSql += fmt.Sprintf(` WHERE "%s"=?`, strings.Join(condfields, `"=? and "`)) } logger.Tracef("Update sql: %s, params=%v", updateSql, values) n, err = me.exec(updateSql, values...) if err != nil { return 0, merrs.NewError(fmt.Sprintf("sqlite update table %s error", me.tableinfo.TableName), err) } if n > 0 { me.setLastUpdateTime() } // PushCount(time.Now(), n, me.keyspace, "update", me.tableinfo.TableName) return } func (me *SQLDB) Delete( conds map[string]any, ) ( n int, err error, ) { return me.pushCommand(&Command{CmdType: UCTDelete, Conds: conds}) } func (me *SQLDB) delete( conds map[string]any, ) ( n int, err error, ) { values := []interface{}{} deleteSql := fmt.Sprintf(`DELETE FROM %s`, me.tableinfo.TableName) if len(conds) > 0 { fields := []string{} for k, v := range conds { fields = append(fields, `"`+k+`"=?`) values = append(values, v) } deleteSql += " WHERE " + strings.Join(fields, " and ") } logger.Tracef("Delete sql: %s, params=%v", deleteSql, values) n, err = me.exec(deleteSql, values...) if err != nil { return 0, merrs.NewError(fmt.Sprintf("sqlite delete table %s error", me.tableinfo.TableName), err) } if n > 0 { me.setLastUpdateTime() } // PushCount(time.Now(), n, me.keyspace, "delete", me.tableinfo.TableName) return }