sqldb.go 24 KB


  1. package sqlite
  2. import (
  3. "context"
  4. "database/sql"
  5. "fmt"
  6. "os"
  7. "path/filepath"
  8. "regexp"
  9. "strings"
  10. "sync"
  11. "time"
  12. "git.wecise.com/wecise/cgimport/sqlite/schema"
  13. "git.wecise.com/wecise/util/cmap"
  14. "git.wecise.com/wecise/util/msgpack"
  15. "git.wecise.com/wecise/util/rc"
  16. "github.com/spf13/cast"
  17. "github.com/wecisecode/util/merrs"
  18. "modernc.org/sqlite"
  19. sqlite3 "modernc.org/sqlite/lib"
  20. )
  21. type Status int
  22. const (
  23. Uninited Status = iota // 未初始化,不可读,不可写
  24. Initing // 正在初始化,不可读,不可写
  25. Inited // 初始化完成,可读,可写
  26. Closed // 已关闭,不可读,不可写
  27. )
  28. func (status Status) String() string {
  29. switch status {
  30. case Uninited:
  31. return "Uninited"
  32. case Initing:
  33. return "Initing"
  34. case Inited:
  35. return "Inited"
  36. case Closed:
  37. return "Closed"
  38. }
  39. return "unkown"
  40. }
  41. var ReNonWordChars = regexp.MustCompile(`\W`)
  42. // 一个 SQLDB
  43. // 对应一个读写表
  44. // 或
  45. // 对应一个视图,关联多个只读表
  46. type SQLDB struct {
  47. db *sql.DB
  48. keyspace string
  49. dbname string
  50. dbsourcename string
  51. //
  52. bufferedCommandsMutex sync.Mutex
  53. bufferedCommands []*Command
  54. chstatus chan Status
  55. chiniting chan error
  56. // table info
  57. infomutex sync.RWMutex
  58. tableinfo *schema.TableInfo
  59. insertStmt *Stmt
  60. upsertStmt *Stmt
  61. datainfo *DataInfo
  62. statinfo *StatInfo
  63. //
  64. reorg_proc ReorgProc
  65. reorg_rc *rc.RoutinesController
  66. }
  67. type ReorgProc struct {
  68. sync.Mutex
  69. statinfo_update func()
  70. datainfo_update func()
  71. expire_clear func()
  72. }
  73. var dbs = cmap.New[string, *SQLDB]()
  74. func NewSQLDB(keyspace string, dbname string, inmemory bool) (sdb *SQLDB, err error) {
  75. dbkey := fmt.Sprint("K:", keyspace, "N:", dbname, "M:", inmemory)
  76. sdb, err = dbs.GetWithNew(dbkey, func() (*SQLDB, error) {
  77. return openSQLDB(keyspace, dbname, inmemory, false)
  78. })
  79. if err != nil {
  80. return
  81. }
  82. if sdb == nil {
  83. err = merrs.NewError("NewSQLDB " + dbkey + " error")
  84. return
  85. }
  86. return sdb, nil
  87. }
  88. var DBPath = "/opt/matrix/var/cgimport/"
  89. func openSQLDB(keyspace string, dbname string, inmemory bool, autorebuilding bool) (sqldb *SQLDB, err error) {
  90. var status Status = Uninited
  91. dbsourcename := "file:"
  92. if inmemory {
  93. dbsourcename += dbname + ":memory:?mode=memory&cache=shared"
  94. } else {
  95. dbfname := filepath.Join(DBPath, keyspace, dbname)
  96. e := os.MkdirAll(filepath.Dir(dbfname), os.ModePerm)
  97. if e != nil {
  98. err = merrs.NewError(e)
  99. return nil, err
  100. }
  101. fi, e := os.Stat(dbfname)
  102. if e != nil && !os.IsNotExist(e) {
  103. err = merrs.NewError(e)
  104. return nil, err
  105. }
  106. if fi != nil {
  107. if autorebuilding {
  108. e := os.Remove(dbfname)
  109. if e != nil {
  110. err = merrs.NewError(e)
  111. return nil, err
  112. }
  113. status = Uninited
  114. } else {
  115. status = Inited
  116. }
  117. }
  118. dbsourcename += dbfname + "?cache=shared"
  119. }
  120. db, e := sql.Open("sqlite", dbsourcename)
  121. if e != nil {
  122. err = merrs.NewError(e)
  123. return nil, err
  124. }
  125. if !inmemory {
  126. _, e = db.Exec("PRAGMA temp_store=memory") // 0=1=file 2=memory
  127. if e != nil {
  128. err = merrs.NewError(e)
  129. return nil, err
  130. }
  131. _, e = db.Exec("PRAGMA journal_mode=memory") // DELETE TRUNCATE PERSIST MEMORY OFF
  132. if e != nil {
  133. err = merrs.NewError(e)
  134. return nil, err
  135. }
  136. _, e = db.Exec("PRAGMA synchronous=off") // 0=off 1=normal 2=full
  137. if e != nil {
  138. err = merrs.NewError(e)
  139. return nil, err
  140. }
  141. // _, e = db.Exec("PRAGMA auto_vacuum=none") // 0=none 1=full
  142. // if e != nil {
  143. // err = merrs.NewError(e)
  144. // return nil
  145. // }
  146. }
  147. // _, e = db.Exec("PRAGMA count_changes=1")
  148. // if e != nil {
  149. // err = merrs.NewError(e)
  150. // return nil
  151. // }
  152. _, e = db.Exec("PRAGMA case_sensitive_like=1")
  153. if e != nil {
  154. err = merrs.NewError(e)
  155. return nil, err
  156. }
  157. sdb := &SQLDB{
  158. db: db,
  159. keyspace: keyspace,
  160. dbname: dbname,
  161. dbsourcename: dbsourcename,
  162. bufferedCommands: []*Command{},
  163. chstatus: make(chan Status, 1),
  164. chiniting: make(chan error, 1),
  165. reorg_rc: rc.NewRoutinesController("", 1),
  166. }
  167. sdb.chiniting <- nil
  168. if status == Inited {
  169. e := sdb.loadTableInfo()
  170. if e != nil {
  171. if !merrs.UninitedError.Contains(e) {
  172. err = merrs.NewError(e)
  173. return nil, err
  174. }
  175. if !autorebuilding {
  176. sdb.db.Close()
  177. return openSQLDB(keyspace, dbname, inmemory, true)
  178. }
  179. status = Uninited
  180. }
  181. }
  182. sdb.chstatus <- status
  183. return sdb, nil
  184. }
  185. func (me *SQLDB) DBName() string {
  186. return me.dbname
  187. }
  188. func (me *SQLDB) SourceName() string {
  189. return me.dbsourcename
  190. }
  191. func (me *SQLDB) Reset() {
  192. status := <-me.chstatus
  193. if status == Initing {
  194. e := <-me.chiniting // 等待初始化完成
  195. me.chiniting <- e
  196. }
  197. me.chstatus <- Uninited
  198. }
  199. func (me *SQLDB) Close() error {
  200. status := <-me.chstatus
  201. if status == Initing {
  202. e := <-me.chiniting // 等待初始化完成
  203. me.chiniting <- e
  204. }
  205. me.chstatus <- Closed
  206. if status == Closed {
  207. return nil
  208. }
  209. return me.db.Close()
  210. }
  211. func (me *SQLDB) Status() Status {
  212. select {
  213. case status := <-me.chstatus:
  214. me.chstatus <- status
  215. return status
  216. }
  217. }
  218. func (me *SQLDB) SetMaxAttached() (err error) {
  219. // set SQLITE_MAX_ATTACHED = 125
  220. conn, e := me.db.Conn(context.Background())
  221. if e != nil {
  222. return merrs.NormalError.NewCause(e)
  223. }
  224. defer conn.Close()
  225. sqlite.Limit(conn, sqlite3.SQLITE_MAX_ATTACHED, 125)
  226. return nil
  227. }
  228. func (me *SQLDB) AttacheDB(clsdb *SQLDB, dbname string) error {
  229. dbsourcename := clsdb.SourceName()
  230. sql := "ATTACH DATABASE '" + dbsourcename + "' as " + dbname
  231. logger.Trace(sql)
  232. _, e := me.exec(sql)
  233. return e
  234. }
  235. func (me *SQLDB) DBList() (dbs map[string]string, err error) {
  236. sql := "PRAGMA database_list"
  237. rows, e := me.queryMaps(sql)
  238. if e != nil {
  239. return nil, merrs.NormalError.NewWith("", []error{e}, merrs.SSMaps{{"sql": sql}}, 1)
  240. }
  241. dbs = map[string]string{}
  242. for _, m := range rows {
  243. dbs[cast.ToString(m["name"])] = cast.ToString(m["file"])
  244. }
  245. return
  246. }
  247. func (me *SQLDB) InitTable(tabledefine *schema.TableDefine, force bool) (err error) {
  248. me.infomutex.Lock()
  249. defer me.infomutex.Unlock()
  250. status := <-me.chstatus
  251. if status == Inited && !force {
  252. // 初始化已完成,直接返回
  253. me.chstatus <- status
  254. return nil
  255. }
  256. if status == Initing {
  257. me.chstatus <- status
  258. e := <-me.chiniting // 等待初始化完成
  259. me.chiniting <- e
  260. return e
  261. }
  262. <-me.chiniting
  263. me.chstatus <- Initing
  264. // 操作计数
  265. // PushCount(time.Now(), 1, me.keyspace, "renew", tabledefine.TableName)
  266. e := me.renewTable(tabledefine)
  267. if e != nil {
  268. err = e
  269. me.chiniting <- err
  270. return
  271. }
  272. logger.Debug("Init LocalDB", me.dbsourcename)
  273. // 执行初始化期间产生的更新操作
  274. _, e = me.execBufferedCommands()
  275. if e != nil {
  276. err = merrs.NewError(e)
  277. me.chiniting <- err
  278. return
  279. }
  280. me.chiniting <- nil
  281. status = <-me.chstatus
  282. me.chstatus <- Inited
  283. // 执行新产生的更新操作
  284. _, e = me.execBufferedCommands()
  285. if e != nil {
  286. err = merrs.NewError(e)
  287. return
  288. }
  289. return nil
  290. }
  291. func (me *SQLDB) loadTableInfo() (err error) {
  292. rows, e := me.queryMaps("SELECT name FROM sqlite_master WHERE type='table'")
  293. if e != nil {
  294. return e
  295. }
  296. if len(rows) < 2 {
  297. return merrs.UninitedError.New("uninited")
  298. }
  299. // 通过 sqlite_master 和 PRAGMA table_info 获得的信息有限
  300. // 通过自定义的 schema 表,可以获得自己保存的信息
  301. rows, e = me.queryMaps("SELECT key,value FROM __table_info__")
  302. if e != nil {
  303. return e
  304. }
  305. if len(rows) < 3 {
  306. return merrs.UninitedError.New("uninited")
  307. }
  308. mrows := map[string]any{}
  309. for _, row := range rows {
  310. mrows[row["key"].(string)] = row["value"]
  311. }
  312. // tableinfo
  313. v_tableinfo := mrows["tableinfo"]
  314. if v_tableinfo == nil {
  315. return merrs.UninitedError.New("not found tableinfo")
  316. }
  317. s_tableinfo, ok := v_tableinfo.(string)
  318. if !ok {
  319. return merrs.UninitedError.New("tableinfo type inconsistent") // 类型不一致
  320. }
  321. load_tableinfo := &schema.TableInfo{}
  322. e = msgpack.Decode([]byte(s_tableinfo), load_tableinfo)
  323. if e != nil {
  324. err = merrs.NewError(e, merrs.SSMap{"tablename": load_tableinfo.TableName})
  325. return
  326. }
  327. if load_tableinfo.Version != schema.TableInfoVersion {
  328. return merrs.UninitedError.New("uninited load_tableinfo.version=" + load_tableinfo.Version)
  329. }
  330. me.tableinfo = load_tableinfo
  331. // datainfo
  332. v_datainfo := mrows["datainfo"]
  333. if v_datainfo == nil {
  334. return merrs.UninitedError.New("not found datainfo")
  335. }
  336. s_datainfo, ok := v_datainfo.(string)
  337. if !ok {
  338. return merrs.UninitedError.New("datainfo type inconsistent") // 类型不一致
  339. }
  340. me.datainfo = &DataInfo{}
  341. msgpack.Decode([]byte(s_datainfo), me.datainfo)
  342. // statinfo
  343. v_statinfo := mrows["statinfo"]
  344. if v_statinfo == nil {
  345. return merrs.UninitedError.New("not found statinfo")
  346. }
  347. s_statinfo, ok := v_statinfo.(string)
  348. if !ok {
  349. return merrs.UninitedError.New("statinfo type inconsistent") // 类型不一致
  350. }
  351. me.statinfo = &StatInfo{}
  352. msgpack.Decode([]byte(s_statinfo), me.statinfo)
  353. if me.statinfo.CreateTime.Equal(time.Time{}) {
  354. return merrs.UninitedError.New("uninited")
  355. }
  356. //
  357. insertstmt, upsertstmt, e := me.initInsertStmt(false)
  358. if e != nil {
  359. err = merrs.NewError(e, merrs.SSMap{"tablename": me.tableinfo.TableName})
  360. return
  361. }
  362. me.insertStmt = insertstmt
  363. me.upsertStmt = upsertstmt
  364. logger.Debug("Load LocalDB", me.dbsourcename)
  365. return
  366. }
  367. func (me *SQLDB) renewTable(tabledefine *schema.TableDefine) (err error) {
  368. tableinfo, createfieldtypes := tabledefine.TableInfo()
  369. if me.tableinfo != nil {
  370. // 简化结构,一个 DB 里只保留一个表或视图,可通过视图关联多个表
  371. err = me.dropTable(me.tableinfo.TableName)
  372. if err != nil {
  373. return
  374. }
  375. }
  376. e := me.newTable(tableinfo, createfieldtypes)
  377. if e != nil {
  378. err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
  379. return
  380. }
  381. me.tableinfo = tableinfo
  382. insertstmt, upsertstmt, e := me.initInsertStmt(true)
  383. if e != nil {
  384. err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
  385. return
  386. }
  387. me.insertStmt = insertstmt
  388. me.upsertStmt = upsertstmt
  389. return
  390. }
  391. func (me *SQLDB) dropTable(tablename string) error {
  392. dropSql := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tablename)
  393. // logger.Tracef("Drop table %s sql: %s", tablename, dropSql)
  394. _, e := me.exec(dropSql)
  395. if e != nil {
  396. return merrs.NewError(e, merrs.SSMap{"tablename": tablename})
  397. }
  398. return nil
  399. }
  400. func (me *SQLDB) newTable(tableinfo *schema.TableInfo,
  401. createfieldtypes []string,
  402. ) (err error) {
  403. tablename := tableinfo.TableName
  404. colums := strings.Join(createfieldtypes, `,`)
  405. if len(tableinfo.IDKeys) > 0 {
  406. colums = colums + `,` + `PRIMARY KEY("` + strings.Join(tableinfo.IDKeys, `","`) + `")`
  407. }
  408. createSql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, tablename, colums)
  409. // logger.Tracef("Create table %s sql: %s", tablename, createSql)
  410. if _, e := me.exec(createSql); e != nil {
  411. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  412. return
  413. }
  414. for idxName, idxfields := range tableinfo.Indexes {
  415. var indexSql string
  416. if len(idxfields) > 1 {
  417. indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, strings.Join(idxfields, `", "`))
  418. } else {
  419. indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, idxfields[0])
  420. }
  421. // logger.Tracef("Create table %s index sql: %s", tablename, indexSql)
  422. if _, e := me.exec(indexSql); e != nil {
  423. err = merrs.NewError(fmt.Sprintf("sqlite create table index error"), e)
  424. return
  425. }
  426. }
  427. if _, e := me.exec(`CREATE TABLE IF NOT EXISTS __table_info__ (key TEXT PRIMARY KEY, value TEXT)`); e != nil {
  428. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  429. return
  430. }
  431. if _, e := me.exec(`DELETE FROM __table_info__`); e != nil {
  432. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  433. return
  434. }
  435. tableinfobs, e := msgpack.Encode(tableinfo)
  436. if e != nil {
  437. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  438. return
  439. }
  440. if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "tableinfo", string(tableinfobs)); e != nil {
  441. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  442. return
  443. }
  444. me.datainfo = &DataInfo{}
  445. datainfobs, e := msgpack.Encode(me.datainfo)
  446. if e != nil {
  447. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  448. return
  449. }
  450. if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "datainfo", string(datainfobs)); e != nil {
  451. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  452. return
  453. }
  454. me.statinfo = &StatInfo{
  455. CreateTime: time.Now(),
  456. }
  457. statinfobs, e := msgpack.Encode(me.statinfo)
  458. if e != nil {
  459. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  460. return
  461. }
  462. if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "statinfo", string(statinfobs)); e != nil {
  463. err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
  464. return
  465. }
  466. return nil
  467. }
  468. func (me *SQLDB) initInsertStmt(initvaluetypes bool) (insertstmt, upsertstmt *Stmt, err error) {
  469. qmarks := []string{}
  470. for range me.tableinfo.FieldNames {
  471. qmarks = append(qmarks, "?")
  472. }
  473. upsertSql := fmt.Sprintf(`BEGIN TRANSACTION;
  474. DELETE FROM "`+me.tableinfo.TableName+`" WHERE "%s"=?;
  475. INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s);
  476. COMMIT;`,
  477. strings.Join(me.tableinfo.IDKeys, `"=? and "`), strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
  478. upsertstmt, err = me.prepare(upsertSql)
  479. if err != nil {
  480. return
  481. }
  482. insertSql := fmt.Sprintf(`INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s)`,
  483. strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
  484. insertstmt, err = me.prepare(insertSql)
  485. if err != nil {
  486. return
  487. }
  488. if initvaluetypes {
  489. if len(me.tableinfo.DefValues) != len(me.tableinfo.FieldNames) {
  490. err = merrs.NewError(fmt.Sprintf("sqlite prepare insert table %s error", me.tableinfo.TableName))
  491. return
  492. }
  493. params := []any{}
  494. for _, k := range me.tableinfo.IDKeys {
  495. for i, f := range me.tableinfo.FieldNames {
  496. if f == k {
  497. params = append(params, me.tableinfo.DefValues[i])
  498. }
  499. }
  500. }
  501. params = append(params, me.tableinfo.DefValues...)
  502. // 首次插入数据的类型决定该字段的实际存储类型
  503. _, err = insertstmt.Exec(params...)
  504. if err != nil {
  505. return
  506. }
  507. _, err = me.delete(nil)
  508. if err != nil {
  509. return
  510. }
  511. }
  512. return
  513. }
  514. func (me *SQLDB) setLastQueryTime() {
  515. me.statinfo.LastQueryTime = time.Now()
  516. me.reorg_proc.Lock()
  517. defer me.reorg_proc.Unlock()
  518. me.reorg_proc.statinfo_update = me.updateStatInfo
  519. me.reorg()
  520. }
  521. func (me *SQLDB) setLastUpdateTime() {
  522. me.statinfo.LastUpdateTime = time.Now()
  523. me.reorg_proc.Lock()
  524. defer me.reorg_proc.Unlock()
  525. me.reorg_proc.statinfo_update = me.updateStatInfo
  526. me.reorg()
  527. }
  528. func (me *SQLDB) updateStatInfo() {
  529. statinfobs, e := msgpack.Encode(me.statinfo)
  530. if e != nil {
  531. logger.Error("UpdateStatInfo Error", e)
  532. }
  533. sql := "UPDATE __table_info__ SET value=? WHERE key=?"
  534. _, e = me.exec(sql, "statinfo", string(statinfobs))
  535. if e != nil {
  536. logger.Error("UpdateStatInfo Error", e)
  537. }
  538. }
  539. func (me *SQLDB) clearExpireData() {
  540. // Format("2006-01-02 15:04:05")
  541. deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE `+me.tableinfo.LatField+`<?`, me.tableinfo.TableName)
  542. n, e := me.exec(deleteSql, time.Now().Add(-me.tableinfo.Ttl).UnixMilli())
  543. if e != nil {
  544. logger.Error("ClearExpireData Error", e)
  545. }
  546. logger.Debug("ClearExpireData", n)
  547. }
  548. func (me *SQLDB) ClearExpireData() {
  549. if me.tableinfo.Ttl > 0 {
  550. me.reorg_proc.Lock()
  551. defer me.reorg_proc.Unlock()
  552. me.reorg_proc.expire_clear = me.clearExpireData
  553. me.reorg()
  554. }
  555. }
  556. func (me *SQLDB) reorg() {
  557. me.reorg_rc.CallLast2Only(func() {
  558. me.reorg_proc.Lock()
  559. datainfo_update := me.reorg_proc.datainfo_update
  560. statinfo_update := me.reorg_proc.statinfo_update
  561. expire_clear := me.reorg_proc.expire_clear
  562. me.reorg_proc.datainfo_update = nil
  563. me.reorg_proc.statinfo_update = nil
  564. me.reorg_proc.expire_clear = nil
  565. me.reorg_proc.Unlock()
  566. if datainfo_update != nil {
  567. datainfo_update()
  568. }
  569. if statinfo_update != nil {
  570. statinfo_update()
  571. }
  572. if expire_clear != nil {
  573. expire_clear()
  574. }
  575. time.Sleep(1 * time.Second)
  576. })
  577. }
  578. func (me *SQLDB) execBufferedCommands() (count int, err error) {
  579. for {
  580. var uc *Command
  581. me.bufferedCommandsMutex.Lock()
  582. if len(me.bufferedCommands) > 0 {
  583. uc = me.bufferedCommands[0]
  584. me.bufferedCommands = me.bufferedCommands[1:]
  585. }
  586. me.bufferedCommandsMutex.Unlock()
  587. if uc == nil {
  588. return
  589. }
  590. n, e := me.execCommand(uc)
  591. if e != nil {
  592. err = e
  593. if me.Status() != Inited {
  594. // 重新初始化
  595. me.bufferedCommandsMutex.Lock()
  596. me.bufferedCommands = append(me.bufferedCommands, uc)
  597. me.bufferedCommandsMutex.Unlock()
  598. // err = nil
  599. err = merrs.NewError(merrs.SSMaps{
  600. {
  601. "me.Status()": fmt.Sprint(me.Status()),
  602. }}, e)
  603. logger.Error(err)
  604. err = nil
  605. }
  606. return
  607. }
  608. count += n
  609. }
  610. }
  611. func (me *SQLDB) pushCommand(uc *Command) (n int, err error) {
  612. select {
  613. case status := <-me.chstatus:
  614. me.chstatus <- status
  615. switch status {
  616. case Uninited, Initing:
  617. me.bufferedCommandsMutex.Lock()
  618. me.bufferedCommands = append(me.bufferedCommands, uc)
  619. me.bufferedCommandsMutex.Unlock()
  620. return 1, nil
  621. case Inited:
  622. me.bufferedCommandsMutex.Lock()
  623. me.bufferedCommands = append(me.bufferedCommands, uc)
  624. me.bufferedCommandsMutex.Unlock()
  625. n, err = me.execBufferedCommands()
  626. return
  627. case Closed:
  628. return
  629. }
  630. }
  631. return
  632. }
  633. func (me *SQLDB) execCommand(uc *Command) (n int, err error) {
  634. switch uc.CmdType {
  635. case UCTInsert:
  636. return me.insert(uc.Data, uc.OverwriteExists)
  637. case UCTUpdate:
  638. return me.update(uc.Data, uc.Conds)
  639. case UCTDelete:
  640. return me.delete(uc.Conds)
  641. }
  642. panic(merrs.NewError(fmt.Sprint("unsupport UpdateCommandType ", uc.CmdType), nil))
  643. }
  644. func (me *SQLDB) Prepare(sql string) (*Stmt, error) {
  645. return me.prepare(sql)
  646. }
  647. func (me *SQLDB) prepare(sql string) (*Stmt, error) {
  648. stmt, e := me.db.Prepare(sql)
  649. if e != nil {
  650. return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}})
  651. }
  652. return &Stmt{Stmt: stmt}, nil
  653. }
  654. func (me *SQLDB) exec(sql string, args ...any) (n int, err error) {
  655. rslt, e := me.db.Exec(sql, args...)
  656. if e != nil {
  657. return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
  658. }
  659. if rslt == nil {
  660. return 0, merrs.NewError("no result", merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
  661. }
  662. rn, e := rslt.RowsAffected()
  663. if e != nil {
  664. return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
  665. }
  666. n = int(rn)
  667. return
  668. }
  669. func (me *SQLDB) Select(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
  670. var status Status
  671. select {
  672. case status = <-me.chstatus:
  673. me.chstatus <- status
  674. if status == Closed {
  675. return nil, merrs.ClosedError.New("closed")
  676. }
  677. if status == Uninited || status == Initing {
  678. return nil, merrs.UninitedError.New("uninited")
  679. }
  680. }
  681. iter, err = me.query(ctx, sql, args...)
  682. me.setLastQueryTime()
  683. // PushCount(time.Now(), 1, me.keyspace, "select", me.tableinfo.TableName)
  684. return
  685. }
  686. func (me *SQLDB) queryMaps(sql string, args ...any) (rows []map[string]any, err error) {
  687. iter, e := me.query(nil, sql, args...)
  688. if e != nil {
  689. return nil, e
  690. }
  691. return iter.AllMaps()
  692. }
  693. func (me *SQLDB) query(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
  694. defer func() {
  695. x := recover()
  696. if x != nil {
  697. switch xv := x.(type) {
  698. case error:
  699. err = xv
  700. default:
  701. err = merrs.NewError(fmt.Sprint(x))
  702. }
  703. }
  704. }()
  705. if ctx == nil {
  706. ctx = context.Background()
  707. }
  708. rows, e := me.db.QueryContext(ctx, sql, args...)
  709. if e != nil {
  710. return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
  711. }
  712. coltypes, e := rows.ColumnTypes()
  713. if e != nil {
  714. return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
  715. }
  716. iter = &Iter{ctx: ctx, rows: rows, cols: coltypes}
  717. return
  718. }
  719. func (me *SQLDB) ids(data map[string]any) (idmap map[string]any) {
  720. idmap = map[string]any{}
  721. for _, k := range me.tableinfo.IDKeys {
  722. idmap[k] = data[k]
  723. }
  724. return
  725. }
  726. func (me *SQLDB) exists(conds map[string]any) (exists bool, err error) {
  727. values := []any{}
  728. existsSql := fmt.Sprintf(`SELECT * FROM %s`, me.tableinfo.TableName)
  729. if len(conds) > 0 {
  730. existsSql += " WHERE "
  731. keys := []string{}
  732. for k, v := range conds {
  733. keys = append(keys, k+"=?")
  734. values = append(values, v)
  735. }
  736. existsSql += strings.Join(keys, " and ")
  737. }
  738. existsSql += " LIMIT 1"
  739. logger.Tracef("Exists sql: %s, params=%v", existsSql, values)
  740. iter, e := me.Select(nil, existsSql, values...)
  741. if e != nil {
  742. return false, merrs.NewError(e)
  743. }
  744. defer iter.Close()
  745. if iter.rows.Next() {
  746. return true, nil
  747. }
  748. return false, nil
  749. }
  750. func (me *SQLDB) Insert(
  751. data map[string]any,
  752. overwriteexists bool,
  753. ) (
  754. n int,
  755. err error,
  756. ) {
  757. return me.pushCommand(&Command{CmdType: UCTInsert, Data: data, OverwriteExists: overwriteexists})
  758. }
  759. func (me *SQLDB) insert(
  760. data map[string]any,
  761. overwriteexists bool,
  762. ) (
  763. n int,
  764. err error,
  765. ) {
  766. encodedata := map[string]any{}
  767. for _, field := range me.tableinfo.FieldNames {
  768. ftype := me.tableinfo.MapNameFields[field].Type
  769. v, _ := data[field]
  770. if field == me.tableinfo.LatField {
  771. v = time.Now().UnixMilli()
  772. } else {
  773. v = SQLValueEncode(ftype, v)
  774. }
  775. encodedata[field] = v
  776. }
  777. values := []interface{}{}
  778. upsertStmt := me.insertStmt
  779. if overwriteexists {
  780. // 覆盖已有数据,需要指定主键值
  781. upsertStmt = me.upsertStmt
  782. for _, field := range me.tableinfo.IDKeys {
  783. values = append(values, encodedata[field])
  784. }
  785. }
  786. for _, field := range me.tableinfo.FieldNames {
  787. values = append(values, encodedata[field])
  788. }
  789. logger.Tracef("Insert %s data: values=%v", me.tableinfo.TableName, values)
  790. rslt, err := upsertStmt.Exec(values...)
  791. if err != nil {
  792. if strings.Index(err.Error(), "UNIQUE constraint") >= 0 {
  793. return 0, merrs.ExistError.NewError(err,
  794. merrs.SSMaps{
  795. {"TableName": me.tableinfo.TableName},
  796. {"Values": fmt.Sprint(values)},
  797. {"Status": fmt.Sprint(me.Status())},
  798. })
  799. }
  800. return 0, merrs.NewError(err,
  801. merrs.SSMaps{
  802. {"TableName": me.tableinfo.TableName},
  803. {"Values": fmt.Sprint(values)},
  804. {"Status": fmt.Sprint(me.Status())},
  805. })
  806. }
  807. if rslt == nil {
  808. return 0, merrs.NewError(fmt.Sprintf("sqlite insert table %s no result", me.tableinfo.TableName))
  809. }
  810. rn, err := rslt.RowsAffected()
  811. if err != nil {
  812. return 0, merrs.NewError(err,
  813. merrs.SSMaps{
  814. {"TableName": me.tableinfo.TableName},
  815. {"Values": fmt.Sprint(values)},
  816. {"Status": fmt.Sprint(me.Status())},
  817. })
  818. }
  819. n = int(rn)
  820. if n > 0 {
  821. me.setLastUpdateTime()
  822. }
  823. // PushCount(time.Now(), n, me.keyspace, "insert", me.tableinfo.TableName)
  824. return
  825. }
  826. func (me *SQLDB) Update(
  827. data map[string]any,
  828. conds map[string]any,
  829. ) (
  830. n int,
  831. err error,
  832. ) {
  833. return me.pushCommand(&Command{CmdType: UCTUpdate, Data: data, Conds: conds})
  834. }
  835. func (me *SQLDB) update(
  836. data map[string]any,
  837. conds map[string]any,
  838. ) (
  839. n int,
  840. err error,
  841. ) {
  842. updatefields := []string{}
  843. condfields := []string{}
  844. values := []interface{}{}
  845. for k, v := range data {
  846. updatefields = append(updatefields, k)
  847. values = append(values, v)
  848. }
  849. updatefields = append(updatefields, me.tableinfo.LatField)
  850. values = append(values, time.Now().UnixMilli())
  851. for k, v := range conds {
  852. condfields = append(condfields, k)
  853. values = append(values, v)
  854. }
  855. if len(values) == 0 {
  856. return 0, nil
  857. }
  858. updateSql := fmt.Sprintf(`UPDATE %s SET "%s"=?`, me.tableinfo.TableName, strings.Join(updatefields, `"=?,"`))
  859. if len(condfields) > 0 {
  860. updateSql += fmt.Sprintf(` WHERE "%s"=?`, strings.Join(condfields, `"=? and "`))
  861. }
  862. logger.Tracef("Update sql: %s, params=%v", updateSql, values)
  863. n, err = me.exec(updateSql, values...)
  864. if err != nil {
  865. return 0, merrs.NewError(fmt.Sprintf("sqlite update table %s error", me.tableinfo.TableName), err)
  866. }
  867. if n > 0 {
  868. me.setLastUpdateTime()
  869. }
  870. // PushCount(time.Now(), n, me.keyspace, "update", me.tableinfo.TableName)
  871. return
  872. }
  873. func (me *SQLDB) Delete(
  874. conds map[string]any,
  875. ) (
  876. n int,
  877. err error,
  878. ) {
  879. return me.pushCommand(&Command{CmdType: UCTDelete, Conds: conds})
  880. }
  881. func (me *SQLDB) delete(
  882. conds map[string]any,
  883. ) (
  884. n int,
  885. err error,
  886. ) {
  887. values := []interface{}{}
  888. deleteSql := fmt.Sprintf(`DELETE FROM %s`, me.tableinfo.TableName)
  889. if len(conds) > 0 {
  890. fields := []string{}
  891. for k, v := range conds {
  892. fields = append(fields, `"`+k+`"=?`)
  893. values = append(values, v)
  894. }
  895. deleteSql += " WHERE " + strings.Join(fields, " and ")
  896. }
  897. logger.Tracef("Delete sql: %s, params=%v", deleteSql, values)
  898. n, err = me.exec(deleteSql, values...)
  899. if err != nil {
  900. return 0, merrs.NewError(fmt.Sprintf("sqlite delete table %s error", me.tableinfo.TableName), err)
  901. }
  902. if n > 0 {
  903. me.setLastUpdateTime()
  904. }
  905. // PushCount(time.Now(), n, me.keyspace, "delete", me.tableinfo.TableName)
  906. return
  907. }