123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- package sqlite_test
- import (
- "context"
- "database/sql"
- "encoding/json"
- "fmt"
- "math/rand"
- "reflect"
- "runtime"
- "strconv"
- "strings"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "git.wecise.com/wecise/common/matrix/util"
- "github.com/spf13/cast"
- "github.com/wecisecode/util/logger"
- "modernc.org/sqlite"
- )
- func TestSQLiteProfmanceMultiTableConcurrent(t *testing.T) {
- runtime.GOMAXPROCS(4)
- logger.Info("GOMAXPROCS ", runtime.GOMAXPROCS(0))
- dbcount := 10 // 最大测试到 20000,仅受限于硬件性能 // 为后续组合操作,测试需要,必须 >= 3
- logger.Info("Databases count ", dbcount)
- tablecount := 10 // 为后续组合操作,测试需要,必须 >= 3
- logger.Info("Tables count ", tablecount)
- varfieldscount := 30
- logger.Info("VarFields count ", varfieldscount+4)
- emptyfieldscount := 30
- logger.Info("EmptyFields count ", emptyfieldscount)
- vardatalength := 1024 / varfieldscount
- logger.Info("VarFields data length ", vardatalength, ", about 1K Bytes per record")
- tableindexcount := 10
- logger.Info("TableIndex count ", tableindexcount)
- datacount := 10000
- logger.Info("Data count ", datacount)
- sampledata := []map[string]interface{}{}
- { // 模拟数据
- st := time.Now()
- for i := 1; i <= datacount; i++ {
- m := map[string]interface{}{}
- m["id"] = i
- m["name"] = fmt.Sprint("name", i)
- m["value"] = 1.23456789 * float64(i)
- m["desc"] = fmt.Sprint("desc", i)
- for vi := 1; vi <= varfieldscount; vi++ {
- bs := make([]byte, vardatalength)
- for bsi := 0; bsi < len(bs); bsi++ {
- bs[bsi] = byte(32 + rand.Intn(95))
- }
- m["var"+strconv.Itoa(vi)] = string(bs)
- }
- sampledata = append(sampledata, m)
- }
- logger.Info("generate maps ", datacount, " use time ", time.Since(st).String())
- st = time.Now()
- memdataset := map[string]map[string]interface{}{}
- for _, d := range sampledata {
- id := cast.ToString(d["id"])
- memdataset[id] = d
- }
- logger.Info("put id-data maps ", len(memdataset), " use time ", time.Since(st).String())
- st = time.Now()
- ids := []string{}
- for id, d := range memdataset {
- if id == cast.ToString(d["id"]) {
- ids = append(ids, id)
- }
- }
- logger.Info("fetch maps data ids ", len(ids), " use time ", time.Since(st).String())
- st = time.Now()
- sampledata = make([]map[string]interface{}, 0, len(ids))
- for _, id := range ids {
- d := map[string]interface{}{}
- for k, v := range memdataset[id] {
- d[k] = v
- }
- sampledata = append(sampledata, d)
- }
- logger.Info("copy maps data ", len(sampledata), " use time ", time.Since(st).String())
- }
- insertcount := int32(0)
- var dbwg sync.WaitGroup
- mdba, err := sql.Open("sqlite", "file::memory:?cache=shared")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- mdbs := []*sql.DB{}
- for dbi := 1; dbi <= dbcount; dbi++ {
- dbname := fmt.Sprint("memdb", dbi)
- dbsourcename := "file:" + dbname + ":memory:?mode=memory&cache=shared"
- // mdb, err := sql.Open("sqlite", ":memory:")
- mdb, err := sql.Open("sqlite", dbsourcename)
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- // _, err = mdba.Exec(fmt.Sprint("PRAGMA threads = ", dbcount)) // 没有看出效果
- // if err != nil {
- // logger.Info("err", fmt.Sprint(err))
- // return
- // }
- mdbs = append(mdbs, mdb)
- conn, _ := mdba.Conn(context.Background())
- sqlite.Limit(conn, 7, 125)
- if dbi <= 100 {
- // 最多只能 attach 10
- _, err = mdba.Exec("attach database '" + dbsourcename + "' as " + dbname)
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- }
- }
- st := time.Now()
- for dbi := 1; dbi <= dbcount; dbi++ {
- dbwg.Add(1)
- dbname := fmt.Sprint("memdb", dbi)
- mdb := mdbs[dbi-1]
- go func() {
- defer dbwg.Done()
- // var mtx sync.RWMutex
- var wg sync.WaitGroup
- st := time.Now()
- for ti := 1; ti <= tablecount; ti++ {
- wg.Add(1)
- go func(ti int) {
- defer wg.Done()
- tablename := fmt.Sprint("test", ti)
- fields := []string{"id", "name", "value", "desc"}
- varfields := strings.Join(fields, ",")
- askmarks := "?" + strings.Repeat(",?", len(fields)-1)
- for i := 1; i <= varfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- varfields += ",var" + strconv.Itoa(i)
- askmarks += ",?"
- }
- allfields := varfields
- for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- allfields += ",var" + strconv.Itoa(i)
- }
- func() {
- // mtx.Lock()
- // defer mtx.Unlock()
- st := time.Now()
- stmt, err := mdb.Prepare("create table if not exists " + tablename + "(" + allfields + ")")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rslt, err := stmt.Exec()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- if rslt == nil {
- logger.Info("sqlite init table return nil")
- } else {
- id, err := rslt.LastInsertId()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- ra, err := rslt.RowsAffected()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- if id != 0 || ra != 0 {
- logger.Info("sqlite init table return ", id, ra)
- }
- }
- go logger.Info("sqlite create table", tablename, "use time ", time.Since(st).String())
- }()
- }(ti)
- }
- wg.Wait()
- logger.Info("sqlite create table", tablecount, "done, use time ", time.Since(st).String())
- st = time.Now()
- for ti := 1; ti <= tablecount; ti++ {
- wg.Add(1)
- go func(ti int) {
- defer wg.Done()
- tablename := fmt.Sprint("test", ti)
- fields := []string{"id", "name", "value", "desc"}
- varfields := strings.Join(fields, ",")
- askmarks := "?" + strings.Repeat(",?", len(fields)-1)
- for i := 1; i <= varfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- varfields += ",var" + strconv.Itoa(i)
- askmarks += ",?"
- }
- allfields := varfields
- for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- allfields += ",var" + strconv.Itoa(i)
- }
- func() {
- // mtx.RLock()
- // defer mtx.RUnlock()
- st := time.Now()
- for i, field := range fields {
- if i > tableindexcount {
- break
- }
- stmt, err := mdb.Prepare("create index " + dbname + "_" + tablename + "_" + field + " on " + tablename + " (" + field + ")")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rslt, err := stmt.Exec()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- if rslt == nil {
- logger.Info("sqlite init table index return nil")
- } else {
- id, err := rslt.LastInsertId()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- ra, err := rslt.RowsAffected()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- if id != 0 || ra != 0 {
- logger.Info("sqlite init table index return ", id, ra)
- }
- }
- }
- go logger.Info("sqlite create index for table "+dbname+"."+tablename+" use time ", time.Since(st).String())
- }()
- }(ti)
- }
- wg.Wait()
- logger.Info("sqlite create table index", tablecount, "done, use time ", time.Since(st).String())
- }()
- }
- dbwg.Wait()
- st = time.Now()
- for dbi := 1; dbi <= dbcount; dbi++ {
- dbwg.Add(1)
- dbname := fmt.Sprint("memdb", dbi)
- mdb := mdbs[dbi-1]
- ist := time.Now()
- go func() {
- defer dbwg.Done()
- var wg sync.WaitGroup
- st = time.Now()
- for ti := 1; ti <= tablecount; ti++ {
- wg.Add(1)
- go func(ti int) {
- defer wg.Done()
- tablename := fmt.Sprint("test", ti)
- fields := []string{"id", "name", "value", "desc"}
- varfields := strings.Join(fields, ",")
- askmarks := "?" + strings.Repeat(",?", len(fields)-1)
- for i := 1; i <= varfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- varfields += ",var" + strconv.Itoa(i)
- askmarks += ",?"
- }
- allfields := varfields
- for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- allfields += ",var" + strconv.Itoa(i)
- }
- func() { // insert
- var wgi sync.WaitGroup
- n := 0
- tx, err := mdb.Begin()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- // begin, err := mdb.Prepare("begin")
- // if err != nil {
- // logger.Info("err", fmt.Sprint(err))
- // return
- // }
- // commit, err := mdb.Prepare("commit")
- // if err != nil {
- // logger.Info("err", fmt.Sprint(err))
- // return
- // }
- stmt, err := tx.Prepare("insert into " + tablename + "(" + varfields + ") values(" + askmarks + ")")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rq := make(chan interface{}, 100)
- // _, err = begin.Exec()
- // if err != nil {
- // logger.Info("err", fmt.Sprint(err))
- // return
- // }
- for _, d := range sampledata {
- args := []interface{}{
- d["id"],
- d["name"],
- d["value"],
- d["desc"],
- fmt.Sprint(dbname, ".", tablename, ".", d["id"]),
- }
- for vi := 2; vi <= varfieldscount; vi++ {
- args = append(args, d["var"+strconv.Itoa(vi)])
- }
- rq <- 1
- wgi.Add(1)
- go func() {
- defer wgi.Done()
- defer func() { <-rq }()
- // mtx.RLock()
- // defer mtx.RUnlock()
- _, err = stmt.Exec(args...)
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- }
- // logger.Info("rslt", fmt.Sprint(rslt.LastInsertId()), fmt.Sprint(rslt.RowsAffected()))
- n++
- x := atomic.AddInt32(&insertcount, 1)
- if x%10000 == 0 || n%10000 == 0 {
- logger.Info("sqlite insert total ", x, " ", dbname+"."+tablename, n, " use time ", time.Since(ist).String())
- }
- }()
- }
- wgi.Wait()
- err = tx.Commit()
- // _, err = begin.Exec()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- var m runtime.MemStats
- runtime.ReadMemStats(&m)
- var meminuse = m.HeapInuse + m.MCacheInuse + m.MSpanInuse + m.StackInuse
- logger.Info("sqlite insert ", insertcount, dbname+"."+tablename, len(sampledata), " use time ", time.Since(ist).String(), " memory in use ", util.BytesSize(meminuse))
- }()
- }(ti)
- }
- wg.Wait()
- }()
- }
- dbwg.Wait()
- st = time.Now()
- for dbi := 1; dbi <= dbcount; dbi++ {
- dbwg.Add(1)
- dbname := fmt.Sprint("memdb", dbi)
- mdb := mdbs[dbi-1]
- go func() {
- defer dbwg.Done()
- var wg sync.WaitGroup
- st = time.Now()
- for ti := 1; ti <= tablecount; ti++ {
- wg.Add(1)
- go func(ti int) {
- defer wg.Done()
- tablename := fmt.Sprint("test", ti)
- fields := []string{"id", "name", "value", "desc"}
- varfields := strings.Join(fields, ",")
- askmarks := "?" + strings.Repeat(",?", len(fields)-1)
- for i := 1; i <= varfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- varfields += ",var" + strconv.Itoa(i)
- askmarks += ",?"
- }
- allfields := varfields
- for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ {
- fields = append(fields, "var"+strconv.Itoa(i))
- allfields += ",var" + strconv.Itoa(i)
- }
- func() { // select
- // mtx.RLock()
- // defer mtx.RUnlock()
- st := time.Now()
- stmt, err := mdb.Prepare("select " + varfields + " from " + tablename)
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rows, err := stmt.Query()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- logger.Info("sqlite select query all", " use time ", time.Since(st).String())
- defer func() { _ = rows.Close() }()
- cols, err := rows.Columns()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- colTypes, err := rows.ColumnTypes()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- types := make([]reflect.Type, len(colTypes))
- for i, tp := range colTypes {
- st := tp.ScanType()
- if st == nil {
- continue
- }
- types[i] = st
- }
- data := []map[string]interface{}{}
- for rows.Next() {
- values := make([]interface{}, len(cols))
- for i := range values {
- if types[i] == nil {
- values[i] = &[]byte{}
- } else {
- values[i] = reflect.New(types[i]).Interface()
- }
- }
- if err = rows.Scan(values...); err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- m := make(map[string]interface{})
- for i, v := range values {
- if v == nil {
- //values[i] = nil
- m[colTypes[i].Name()] = nil
- } else {
- n, ok := colTypes[i].Length()
- m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
- }
- }
- data = append(data, m)
- // if len(data)%10000 == 0 {
- // logger.Info("sqlite fetch data ", len(data), " use time ", time.Since(st).String())
- // }
- }
- logger.Info("sqlite fetch data ", dbname+"."+tablename, len(data), " use time ", time.Since(st).String())
- }()
- }(ti)
- }
- wg.Wait()
- }()
- }
- dbwg.Wait()
- logger.Info("sqlite insert and fetch data total ", insertcount, " use time ", time.Since(st).String())
- stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rows, err := stmt.Query()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer func() { _ = rows.Close() }()
- cols, err := rows.Columns()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- colTypes, err := rows.ColumnTypes()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- types := make([]reflect.Type, len(colTypes))
- for i, tp := range colTypes {
- st := tp.ScanType()
- if st == nil {
- continue
- }
- types[i] = st
- }
- data := []map[string]interface{}{}
- for rows.Next() {
- values := make([]interface{}, len(cols))
- for i := range values {
- if types[i] == nil {
- values[i] = &[]byte{}
- } else {
- values[i] = reflect.New(types[i]).Interface()
- }
- }
- if err = rows.Scan(values...); err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- m := make(map[string]interface{})
- for i, v := range values {
- if v == nil {
- m[colTypes[i].Name()] = nil
- } else {
- n, ok := colTypes[i].Length()
- m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
- }
- }
- data = append(data, m)
- if len(data)%1000 == 0 {
- logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
- bs, _ := json.MarshalIndent(m, "", " ")
- logger.Info("eg:", string(bs))
- }
- }
- logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
- {
- _, err := mdbs[0].Exec("insert into test1 (id, var1) values (-1, '--0--1--2--')")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- _, err = mdbs[1].Exec("insert into test3 (id, var1) values (-1, '--1--3--2--')")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- _, err = mdbs[2].Exec("insert into test2 (id, var1) values (-1, '--2--2--2--')")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id and b.id<0")
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer stmt.Close()
- rows, err := stmt.Query()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- defer func() { _ = rows.Close() }()
- cols, err := rows.Columns()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- colTypes, err := rows.ColumnTypes()
- if err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- types := make([]reflect.Type, len(colTypes))
- for i, tp := range colTypes {
- st := tp.ScanType()
- if st == nil {
- continue
- }
- types[i] = st
- }
- data = []map[string]interface{}{}
- for rows.Next() {
- values := make([]interface{}, len(cols))
- for i := range values {
- if types[i] == nil {
- values[i] = &[]byte{}
- } else {
- values[i] = reflect.New(types[i]).Interface()
- }
- }
- if err = rows.Scan(values...); err != nil {
- logger.Info("err", fmt.Sprint(err))
- return
- }
- m := make(map[string]interface{})
- for i, v := range values {
- if v == nil {
- m[colTypes[i].Name()] = nil
- } else {
- n, ok := colTypes[i].Length()
- m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface())
- }
- }
- data = append(data, m)
- bs, _ := json.MarshalIndent(m, "", " ")
- logger.Info("eg:", string(bs))
- }
- logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String())
- }
- }
|