sqlite_prof_test.go 17 KB


  1. package sqlite_test
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "math/rand"
  8. "reflect"
  9. "runtime"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "testing"
  15. "time"
  16. "git.wecise.com/wecise/common/matrix/util"
  17. "github.com/spf13/cast"
  18. "github.com/wecisecode/util/logger"
  19. "modernc.org/sqlite"
  20. )
  21. func TestSQLiteProfmanceMultiTableConcurrent(t *testing.T) {
  22. runtime.GOMAXPROCS(4)
  23. logger.Info("GOMAXPROCS ", runtime.GOMAXPROCS(0))
  24. dbcount := 10 // 最大测试到 20000,仅受限于硬件性能 // 为后续组合操作,测试需要,必须 >= 3
  25. logger.Info("Databases count ", dbcount)
  26. tablecount := 10 // 为后续组合操作,测试需要,必须 >= 3
  27. logger.Info("Tables count ", tablecount)
  28. varfieldscount := 30
  29. logger.Info("VarFields count ", varfieldscount+4)
  30. emptyfieldscount := 30
  31. logger.Info("EmptyFields count ", emptyfieldscount)
  32. vardatalength := 1024 / varfieldscount
  33. logger.Info("VarFields data length ", vardatalength, ", about 1K Bytes per record")
  34. tableindexcount := 10
  35. logger.Info("TableIndex count ", tableindexcount)
  36. datacount := 10000
  37. logger.Info("Data count ", datacount)
  38. sampledata := []map[string]interface{}{}
  39. { // 模拟数据
  40. st := time.Now()
  41. for i := 1; i <= datacount; i++ {
  42. m := map[string]interface{}{}
  43. m["id"] = i
  44. m["name"] = fmt.Sprint("name", i)
  45. m["value"] = 1.23456789 * float64(i)
  46. m["desc"] = fmt.Sprint("desc", i)
  47. for vi := 1; vi <= varfieldscount; vi++ {
  48. bs := make([]byte, vardatalength)
  49. for bsi := 0; bsi < len(bs); bsi++ {
  50. bs[bsi] = byte(32 + rand.Intn(95))
  51. }
  52. m["var"+strconv.Itoa(vi)] = string(bs)
  53. }
  54. sampledata = append(sampledata, m)
  55. }
  56. logger.Info("generate maps ", datacount, " use time ", time.Since(st).String())
  57. st = time.Now()
  58. memdataset := map[string]map[string]interface{}{}
  59. for _, d := range sampledata {
  60. id := cast.ToString(d["id"])
  61. memdataset[id] = d
  62. }
  63. logger.Info("put id-data maps ", len(memdataset), " use time ", time.Since(st).String())
  64. st = time.Now()
  65. ids := []string{}
  66. for id, d := range memdataset {
  67. if id == cast.ToString(d["id"]) {
  68. ids = append(ids, id)
  69. }
  70. }
  71. logger.Info("fetch maps data ids ", len(ids), " use time ", time.Since(st).String())
  72. st = time.Now()
  73. sampledata = make([]map[string]interface{}, 0, len(ids))
  74. for _, id := range ids {
  75. d := map[string]interface{}{}
  76. for k, v := range memdataset[id] {
  77. d[k] = v
  78. }
  79. sampledata = append(sampledata, d)
  80. }
  81. logger.Info("copy maps data ", len(sampledata), " use time ", time.Since(st).String())
  82. }
  83. insertcount := int32(0)
  84. var dbwg sync.WaitGroup
  85. mdba, err := sql.Open("sqlite", "file::memory:?cache=shared")
  86. if err != nil {
  87. logger.Info("err", fmt.Sprint(err))
  88. return
  89. }
  90. mdbs := []*sql.DB{}
  91. for dbi := 1; dbi <= dbcount; dbi++ {
  92. dbname := fmt.Sprint("memdb", dbi)
  93. dbsourcename := "file:" + dbname + ":memory:?mode=memory&cache=shared"
  94. // mdb, err := sql.Open("sqlite", ":memory:")
  95. mdb, err := sql.Open("sqlite", dbsourcename)
  96. if err != nil {
  97. logger.Info("err", fmt.Sprint(err))
  98. return
  99. }
  100. // _, err = mdba.Exec(fmt.Sprint("PRAGMA threads = ", dbcount)) // 没有看出效果
  101. // if err != nil {
  102. // logger.Info("err", fmt.Sprint(err))
  103. // return
  104. // }
  105. mdbs = append(mdbs, mdb)
  106. conn, _ := mdba.Conn(context.Background())
  107. sqlite.Limit(conn, 7, 125)
  108. if dbi <= 100 {
  109. // 最多只能 attach 10
  110. _, err = mdba.Exec("attach database '" + dbsourcename + "' as " + dbname)
  111. if err != nil {
  112. logger.Info("err", fmt.Sprint(err))
  113. return
  114. }
  115. }
  116. }
  117. st := time.Now()
  118. for dbi := 1; dbi <= dbcount; dbi++ {
  119. dbwg.Add(1)
  120. dbname := fmt.Sprint("memdb", dbi)
  121. mdb := mdbs[dbi-1]
  122. go func() {
  123. defer dbwg.Done()
  124. // var mtx sync.RWMutex
  125. var wg sync.WaitGroup
  126. st := time.Now()
  127. for ti := 1; ti <= tablecount; ti++ {
  128. wg.Add(1)
  129. go func(ti int) {
  130. defer wg.Done()
  131. tablename := fmt.Sprint("test", ti)
  132. fields := []string{"id", "name", "value", "desc"}
  133. varfields := strings.Join(fields, ",")
  134. askmarks := "?" + strings.Repeat(",?", len(fields)-1)
  135. for i := 1; i <= varfieldscount; i++ {
  136. fields = append(fields, "var"+strconv.Itoa(i))
  137. varfields += ",var" + strconv.Itoa(i)
  138. askmarks += ",?"
  139. }
  140. allfields := varfields
  141. for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
  142. fields = append(fields, "var"+strconv.Itoa(i))
  143. allfields += ",var" + strconv.Itoa(i)
  144. }
  145. func() {
  146. // mtx.Lock()
  147. // defer mtx.Unlock()
  148. st := time.Now()
  149. stmt, err := mdb.Prepare("create table if not exists " + tablename + "(" + allfields + ")")
  150. if err != nil {
  151. logger.Info("err", fmt.Sprint(err))
  152. return
  153. }
  154. defer stmt.Close()
  155. rslt, err := stmt.Exec()
  156. if err != nil {
  157. logger.Info("err", fmt.Sprint(err))
  158. return
  159. }
  160. if rslt == nil {
  161. logger.Info("sqlite init table return nil")
  162. } else {
  163. id, err := rslt.LastInsertId()
  164. if err != nil {
  165. logger.Info("err", fmt.Sprint(err))
  166. return
  167. }
  168. ra, err := rslt.RowsAffected()
  169. if err != nil {
  170. logger.Info("err", fmt.Sprint(err))
  171. return
  172. }
  173. if id != 0 || ra != 0 {
  174. logger.Info("sqlite init table return ", id, ra)
  175. }
  176. }
  177. go logger.Info("sqlite create table", tablename, "use time ", time.Since(st).String())
  178. }()
  179. }(ti)
  180. }
  181. wg.Wait()
  182. logger.Info("sqlite create table", tablecount, "done, use time ", time.Since(st).String())
  183. st = time.Now()
  184. for ti := 1; ti <= tablecount; ti++ {
  185. wg.Add(1)
  186. go func(ti int) {
  187. defer wg.Done()
  188. tablename := fmt.Sprint("test", ti)
  189. fields := []string{"id", "name", "value", "desc"}
  190. varfields := strings.Join(fields, ",")
  191. askmarks := "?" + strings.Repeat(",?", len(fields)-1)
  192. for i := 1; i <= varfieldscount; i++ {
  193. fields = append(fields, "var"+strconv.Itoa(i))
  194. varfields += ",var" + strconv.Itoa(i)
  195. askmarks += ",?"
  196. }
  197. allfields := varfields
  198. for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
  199. fields = append(fields, "var"+strconv.Itoa(i))
  200. allfields += ",var" + strconv.Itoa(i)
  201. }
  202. func() {
  203. // mtx.RLock()
  204. // defer mtx.RUnlock()
  205. st := time.Now()
  206. for i, field := range fields {
  207. if i > tableindexcount {
  208. break
  209. }
  210. stmt, err := mdb.Prepare("create index " + dbname + "_" + tablename + "_" + field + " on " + tablename + " (" + field + ")")
  211. if err != nil {
  212. logger.Info("err", fmt.Sprint(err))
  213. return
  214. }
  215. defer stmt.Close()
  216. rslt, err := stmt.Exec()
  217. if err != nil {
  218. logger.Info("err", fmt.Sprint(err))
  219. return
  220. }
  221. if rslt == nil {
  222. logger.Info("sqlite init table index return nil")
  223. } else {
  224. id, err := rslt.LastInsertId()
  225. if err != nil {
  226. logger.Info("err", fmt.Sprint(err))
  227. return
  228. }
  229. ra, err := rslt.RowsAffected()
  230. if err != nil {
  231. logger.Info("err", fmt.Sprint(err))
  232. return
  233. }
  234. if id != 0 || ra != 0 {
  235. logger.Info("sqlite init table index return ", id, ra)
  236. }
  237. }
  238. }
  239. go logger.Info("sqlite create index for table "+dbname+"."+tablename+" use time ", time.Since(st).String())
  240. }()
  241. }(ti)
  242. }
  243. wg.Wait()
  244. logger.Info("sqlite create table index", tablecount, "done, use time ", time.Since(st).String())
  245. }()
  246. }
  247. dbwg.Wait()
  248. st = time.Now()
  249. for dbi := 1; dbi <= dbcount; dbi++ {
  250. dbwg.Add(1)
  251. dbname := fmt.Sprint("memdb", dbi)
  252. mdb := mdbs[dbi-1]
  253. ist := time.Now()
  254. go func() {
  255. defer dbwg.Done()
  256. var wg sync.WaitGroup
  257. st = time.Now()
  258. for ti := 1; ti <= tablecount; ti++ {
  259. wg.Add(1)
  260. go func(ti int) {
  261. defer wg.Done()
  262. tablename := fmt.Sprint("test", ti)
  263. fields := []string{"id", "name", "value", "desc"}
  264. varfields := strings.Join(fields, ",")
  265. askmarks := "?" + strings.Repeat(",?", len(fields)-1)
  266. for i := 1; i <= varfieldscount; i++ {
  267. fields = append(fields, "var"+strconv.Itoa(i))
  268. varfields += ",var" + strconv.Itoa(i)
  269. askmarks += ",?"
  270. }
  271. allfields := varfields
  272. for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
  273. fields = append(fields, "var"+strconv.Itoa(i))
  274. allfields += ",var" + strconv.Itoa(i)
  275. }
  276. func() { // insert
  277. var wgi sync.WaitGroup
  278. n := 0
  279. tx, err := mdb.Begin()
  280. if err != nil {
  281. logger.Info("err", fmt.Sprint(err))
  282. return
  283. }
  284. // begin, err := mdb.Prepare("begin")
  285. // if err != nil {
  286. // logger.Info("err", fmt.Sprint(err))
  287. // return
  288. // }
  289. // commit, err := mdb.Prepare("commit")
  290. // if err != nil {
  291. // logger.Info("err", fmt.Sprint(err))
  292. // return
  293. // }
  294. stmt, err := tx.Prepare("insert into " + tablename + "(" + varfields + ") values(" + askmarks + ")")
  295. if err != nil {
  296. logger.Info("err", fmt.Sprint(err))
  297. return
  298. }
  299. defer stmt.Close()
  300. rq := make(chan interface{}, 100)
  301. // _, err = begin.Exec()
  302. // if err != nil {
  303. // logger.Info("err", fmt.Sprint(err))
  304. // return
  305. // }
  306. for _, d := range sampledata {
  307. args := []interface{}{
  308. d["id"],
  309. d["name"],
  310. d["value"],
  311. d["desc"],
  312. fmt.Sprint(dbname, ".", tablename, ".", d["id"]),
  313. }
  314. for vi := 2; vi <= varfieldscount; vi++ {
  315. args = append(args, d["var"+strconv.Itoa(vi)])
  316. }
  317. rq <- 1
  318. wgi.Add(1)
  319. go func() {
  320. defer wgi.Done()
  321. defer func() { <-rq }()
  322. // mtx.RLock()
  323. // defer mtx.RUnlock()
  324. _, err = stmt.Exec(args...)
  325. if err != nil {
  326. logger.Info("err", fmt.Sprint(err))
  327. }
  328. // logger.Info("rslt", fmt.Sprint(rslt.LastInsertId()), fmt.Sprint(rslt.RowsAffected()))
  329. n++
  330. x := atomic.AddInt32(&insertcount, 1)
  331. if x%10000 == 0 || n%10000 == 0 {
  332. logger.Info("sqlite insert total ", x, " ", dbname+"."+tablename, n, " use time ", time.Since(ist).String())
  333. }
  334. }()
  335. }
  336. wgi.Wait()
  337. err = tx.Commit()
  338. // _, err = begin.Exec()
  339. if err != nil {
  340. logger.Info("err", fmt.Sprint(err))
  341. return
  342. }
  343. var m runtime.MemStats
  344. runtime.ReadMemStats(&m)
  345. var meminuse = m.HeapInuse + m.MCacheInuse + m.MSpanInuse + m.StackInuse
  346. logger.Info("sqlite insert ", insertcount, dbname+"."+tablename, len(sampledata), " use time ", time.Since(ist).String(), " memory in use ", util.BytesSize(meminuse))
  347. }()
  348. }(ti)
  349. }
  350. wg.Wait()
  351. }()
  352. }
  353. dbwg.Wait()
  354. st = time.Now()
  355. for dbi := 1; dbi <= dbcount; dbi++ {
  356. dbwg.Add(1)
  357. dbname := fmt.Sprint("memdb", dbi)
  358. mdb := mdbs[dbi-1]
  359. go func() {
  360. defer dbwg.Done()
  361. var wg sync.WaitGroup
  362. st = time.Now()
  363. for ti := 1; ti <= tablecount; ti++ {
  364. wg.Add(1)
  365. go func(ti int) {
  366. defer wg.Done()
  367. tablename := fmt.Sprint("test", ti)
  368. fields := []string{"id", "name", "value", "desc"}
  369. varfields := strings.Join(fields, ",")
  370. askmarks := "?" + strings.Repeat(",?", len(fields)-1)
  371. for i := 1; i <= varfieldscount; i++ {
  372. fields = append(fields, "var"+strconv.Itoa(i))
  373. varfields += ",var" + strconv.Itoa(i)
  374. askmarks += ",?"
  375. }
  376. allfields := varfields
  377. for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
  378. fields = append(fields, "var"+strconv.Itoa(i))
  379. allfields += ",var" + strconv.Itoa(i)
  380. }
  381. func() { // select
  382. // mtx.RLock()
  383. // defer mtx.RUnlock()
  384. st := time.Now()
  385. stmt, err := mdb.Prepare("select " + varfields + " from " + tablename)
  386. if err != nil {
  387. logger.Info("err", fmt.Sprint(err))
  388. return
  389. }
  390. defer stmt.Close()
  391. rows, err := stmt.Query()
  392. if err != nil {
  393. logger.Info("err", fmt.Sprint(err))
  394. return
  395. }
  396. logger.Info("sqlite select query all", " use time ", time.Since(st).String())
  397. defer func() { _ = rows.Close() }()
  398. cols, err := rows.Columns()
  399. if err != nil {
  400. logger.Info("err", fmt.Sprint(err))
  401. return
  402. }
  403. colTypes, err := rows.ColumnTypes()
  404. if err != nil {
  405. logger.Info("err", fmt.Sprint(err))
  406. return
  407. }
  408. types := make([]reflect.Type, len(colTypes))
  409. for i, tp := range colTypes {
  410. st := tp.ScanType()
  411. if st == nil {
  412. continue
  413. }
  414. types[i] = st
  415. }
  416. data := []map[string]interface{}{}
  417. for rows.Next() {
  418. values := make([]interface{}, len(cols))
  419. for i := range values {
  420. if types[i] == nil {
  421. values[i] = &[]byte{}
  422. } else {
  423. values[i] = reflect.New(types[i]).Interface()
  424. }
  425. }
  426. if err = rows.Scan(values...); err != nil {
  427. logger.Info("err", fmt.Sprint(err))
  428. return
  429. }
  430. m := make(map[string]interface{})
  431. for i, v := range values {
  432. if v == nil {
  433. //values[i] = nil
  434. m[colTypes[i].Name()] = nil
  435. } else {
  436. n, ok := colTypes[i].Length()
  437. m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
  438. }
  439. }
  440. data = append(data, m)
  441. // if len(data)%10000 == 0 {
  442. // logger.Info("sqlite fetch data ", len(data), " use time ", time.Since(st).String())
  443. // }
  444. }
  445. logger.Info("sqlite fetch data ", dbname+"."+tablename, len(data), " use time ", time.Since(st).String())
  446. }()
  447. }(ti)
  448. }
  449. wg.Wait()
  450. }()
  451. }
  452. dbwg.Wait()
  453. logger.Info("sqlite insert and fetch data total ", insertcount, " use time ", time.Since(st).String())
  454. stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id")
  455. if err != nil {
  456. logger.Info("err", fmt.Sprint(err))
  457. return
  458. }
  459. defer stmt.Close()
  460. rows, err := stmt.Query()
  461. if err != nil {
  462. logger.Info("err", fmt.Sprint(err))
  463. return
  464. }
  465. defer func() { _ = rows.Close() }()
  466. cols, err := rows.Columns()
  467. if err != nil {
  468. logger.Info("err", fmt.Sprint(err))
  469. return
  470. }
  471. colTypes, err := rows.ColumnTypes()
  472. if err != nil {
  473. logger.Info("err", fmt.Sprint(err))
  474. return
  475. }
  476. types := make([]reflect.Type, len(colTypes))
  477. for i, tp := range colTypes {
  478. st := tp.ScanType()
  479. if st == nil {
  480. continue
  481. }
  482. types[i] = st
  483. }
  484. data := []map[string]interface{}{}
  485. for rows.Next() {
  486. values := make([]interface{}, len(cols))
  487. for i := range values {
  488. if types[i] == nil {
  489. values[i] = &[]byte{}
  490. } else {
  491. values[i] = reflect.New(types[i]).Interface()
  492. }
  493. }
  494. if err = rows.Scan(values...); err != nil {
  495. logger.Info("err", fmt.Sprint(err))
  496. return
  497. }
  498. m := make(map[string]interface{})
  499. for i, v := range values {
  500. if v == nil {
  501. m[colTypes[i].Name()] = nil
  502. } else {
  503. n, ok := colTypes[i].Length()
  504. m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
  505. }
  506. }
  507. data = append(data, m)
  508. if len(data)%1000 == 0 {
  509. logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
  510. bs, _ := json.MarshalIndent(m, "", " ")
  511. logger.Info("eg:", string(bs))
  512. }
  513. }
  514. logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
  515. {
  516. _, err := mdbs[0].Exec("insert into test1 (id, var1) values (-1, '--0--1--2--')")
  517. if err != nil {
  518. logger.Info("err", fmt.Sprint(err))
  519. return
  520. }
  521. _, err = mdbs[1].Exec("insert into test3 (id, var1) values (-1, '--1--3--2--')")
  522. if err != nil {
  523. logger.Info("err", fmt.Sprint(err))
  524. return
  525. }
  526. _, err = mdbs[2].Exec("insert into test2 (id, var1) values (-1, '--2--2--2--')")
  527. if err != nil {
  528. logger.Info("err", fmt.Sprint(err))
  529. return
  530. }
  531. stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id and b.id<0")
  532. if err != nil {
  533. logger.Info("err", fmt.Sprint(err))
  534. return
  535. }
  536. defer stmt.Close()
  537. rows, err := stmt.Query()
  538. if err != nil {
  539. logger.Info("err", fmt.Sprint(err))
  540. return
  541. }
  542. defer func() { _ = rows.Close() }()
  543. cols, err := rows.Columns()
  544. if err != nil {
  545. logger.Info("err", fmt.Sprint(err))
  546. return
  547. }
  548. colTypes, err := rows.ColumnTypes()
  549. if err != nil {
  550. logger.Info("err", fmt.Sprint(err))
  551. return
  552. }
  553. types := make([]reflect.Type, len(colTypes))
  554. for i, tp := range colTypes {
  555. st := tp.ScanType()
  556. if st == nil {
  557. continue
  558. }
  559. types[i] = st
  560. }
  561. data = []map[string]interface{}{}
  562. for rows.Next() {
  563. values := make([]interface{}, len(cols))
  564. for i := range values {
  565. if types[i] == nil {
  566. values[i] = &[]byte{}
  567. } else {
  568. values[i] = reflect.New(types[i]).Interface()
  569. }
  570. }
  571. if err = rows.Scan(values...); err != nil {
  572. logger.Info("err", fmt.Sprint(err))
  573. return
  574. }
  575. m := make(map[string]interface{})
  576. for i, v := range values {
  577. if v == nil {
  578. m[colTypes[i].Name()] = nil
  579. } else {
  580. n, ok := colTypes[i].Length()
  581. m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
  582. }
  583. }
  584. data = append(data, m)
  585. bs, _ := json.MarshalIndent(m, "", " ")
  586. logger.Info("eg:", string(bs))
  587. }
  588. logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
  589. }
  590. }