package sqlite_test import ( "context" "database/sql" "encoding/json" "fmt" "math/rand" "reflect" "runtime" "strconv" "strings" "sync" "sync/atomic" "testing" "time" "git.wecise.com/wecise/common/matrix/util" "github.com/spf13/cast" "github.com/wecisecode/util/logger" "modernc.org/sqlite" ) func TestSQLiteProfmanceMultiTableConcurrent(t *testing.T) { runtime.GOMAXPROCS(4) logger.Info("GOMAXPROCS ", runtime.GOMAXPROCS(0)) dbcount := 10 // 最大测试到 20000,仅受限于硬件性能 // 为后续组合操作,测试需要,必须 >= 3 logger.Info("Databases count ", dbcount) tablecount := 10 // 为后续组合操作,测试需要,必须 >= 3 logger.Info("Tables count ", tablecount) varfieldscount := 30 logger.Info("VarFields count ", varfieldscount+4) emptyfieldscount := 30 logger.Info("EmptyFields count ", emptyfieldscount) vardatalength := 1024 / varfieldscount logger.Info("VarFields data length ", vardatalength, ", about 1K Bytes per record") tableindexcount := 10 logger.Info("TableIndex count ", tableindexcount) datacount := 10000 logger.Info("Data count ", datacount) sampledata := []map[string]interface{}{} { // 模拟数据 st := time.Now() for i := 1; i <= datacount; i++ { m := map[string]interface{}{} m["id"] = i m["name"] = fmt.Sprint("name", i) m["value"] = 1.23456789 * float64(i) m["desc"] = fmt.Sprint("desc", i) for vi := 1; vi <= varfieldscount; vi++ { bs := make([]byte, vardatalength) for bsi := 0; bsi < len(bs); bsi++ { bs[bsi] = byte(32 + rand.Intn(95)) } m["var"+strconv.Itoa(vi)] = string(bs) } sampledata = append(sampledata, m) } logger.Info("generate maps ", datacount, " use time ", time.Since(st).String()) st = time.Now() memdataset := map[string]map[string]interface{}{} for _, d := range sampledata { id := cast.ToString(d["id"]) memdataset[id] = d } logger.Info("put id-data maps ", len(memdataset), " use time ", time.Since(st).String()) st = time.Now() ids := []string{} for id, d := range memdataset { if id == cast.ToString(d["id"]) { ids = append(ids, id) } } logger.Info("fetch maps data ids ", len(ids), " use time ", time.Since(st).String()) st = time.Now() sampledata = make([]map[string]interface{}, 0, len(ids)) for _, id := range ids { d := map[string]interface{}{} for k, v := range memdataset[id] { d[k] = v } sampledata = append(sampledata, d) } logger.Info("copy maps data ", len(sampledata), " use time ", time.Since(st).String()) } insertcount := int32(0) var dbwg sync.WaitGroup mdba, err := sql.Open("sqlite", "file::memory:?cache=shared") if err != nil { logger.Info("err", fmt.Sprint(err)) return } mdbs := []*sql.DB{} for dbi := 1; dbi <= dbcount; dbi++ { dbname := fmt.Sprint("memdb", dbi) dbsourcename := "file:" + dbname + ":memory:?mode=memory&cache=shared" // mdb, err := sql.Open("sqlite", ":memory:") mdb, err := sql.Open("sqlite", dbsourcename) if err != nil { logger.Info("err", fmt.Sprint(err)) return } // _, err = mdba.Exec(fmt.Sprint("PRAGMA threads = ", dbcount)) // 没有看出效果 // if err != nil { // logger.Info("err", fmt.Sprint(err)) // return // } mdbs = append(mdbs, mdb) conn, _ := mdba.Conn(context.Background()) sqlite.Limit(conn, 7, 125) if dbi <= 100 { // 最多只能 attach 10 _, err = mdba.Exec("attach database '" + dbsourcename + "' as " + dbname) if err != nil { logger.Info("err", fmt.Sprint(err)) return } } } st := time.Now() for dbi := 1; dbi <= dbcount; dbi++ { dbwg.Add(1) dbname := fmt.Sprint("memdb", dbi) mdb := mdbs[dbi-1] go func() { defer dbwg.Done() // var mtx sync.RWMutex var wg sync.WaitGroup st := time.Now() for ti := 1; ti <= tablecount; ti++ { wg.Add(1) go func(ti int) { defer wg.Done() tablename := fmt.Sprint("test", ti) fields := []string{"id", "name", "value", "desc"} varfields := strings.Join(fields, ",") askmarks := "?" + strings.Repeat(",?", len(fields)-1) for i := 1; i <= varfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) varfields += ",var" + strconv.Itoa(i) askmarks += ",?" } allfields := varfields for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) allfields += ",var" + strconv.Itoa(i) } func() { // mtx.Lock() // defer mtx.Unlock() st := time.Now() stmt, err := mdb.Prepare("create table if not exists " + tablename + "(" + allfields + ")") if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rslt, err := stmt.Exec() if err != nil { logger.Info("err", fmt.Sprint(err)) return } if rslt == nil { logger.Info("sqlite init table return nil") } else { id, err := rslt.LastInsertId() if err != nil { logger.Info("err", fmt.Sprint(err)) return } ra, err := rslt.RowsAffected() if err != nil { logger.Info("err", fmt.Sprint(err)) return } if id != 0 || ra != 0 { logger.Info("sqlite init table return ", id, ra) } } go logger.Info("sqlite create table", tablename, "use time ", time.Since(st).String()) }() }(ti) } wg.Wait() logger.Info("sqlite create table", tablecount, "done, use time ", time.Since(st).String()) st = time.Now() for ti := 1; ti <= tablecount; ti++ { wg.Add(1) go func(ti int) { defer wg.Done() tablename := fmt.Sprint("test", ti) fields := []string{"id", "name", "value", "desc"} varfields := strings.Join(fields, ",") askmarks := "?" + strings.Repeat(",?", len(fields)-1) for i := 1; i <= varfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) varfields += ",var" + strconv.Itoa(i) askmarks += ",?" } allfields := varfields for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) allfields += ",var" + strconv.Itoa(i) } func() { // mtx.RLock() // defer mtx.RUnlock() st := time.Now() for i, field := range fields { if i > tableindexcount { break } stmt, err := mdb.Prepare("create index " + dbname + "_" + tablename + "_" + field + " on " + tablename + " (" + field + ")") if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rslt, err := stmt.Exec() if err != nil { logger.Info("err", fmt.Sprint(err)) return } if rslt == nil { logger.Info("sqlite init table index return nil") } else { id, err := rslt.LastInsertId() if err != nil { logger.Info("err", fmt.Sprint(err)) return } ra, err := rslt.RowsAffected() if err != nil { logger.Info("err", fmt.Sprint(err)) return } if id != 0 || ra != 0 { logger.Info("sqlite init table index return ", id, ra) } } } go logger.Info("sqlite create index for table "+dbname+"."+tablename+" use time ", time.Since(st).String()) }() }(ti) } wg.Wait() logger.Info("sqlite create table index", tablecount, "done, use time ", time.Since(st).String()) }() } dbwg.Wait() st = time.Now() for dbi := 1; dbi <= dbcount; dbi++ { dbwg.Add(1) dbname := fmt.Sprint("memdb", dbi) mdb := mdbs[dbi-1] ist := time.Now() go func() { defer dbwg.Done() var wg sync.WaitGroup st = time.Now() for ti := 1; ti <= tablecount; ti++ { wg.Add(1) go func(ti int) { defer wg.Done() tablename := fmt.Sprint("test", ti) fields := []string{"id", "name", "value", "desc"} varfields := strings.Join(fields, ",") askmarks := "?" + strings.Repeat(",?", len(fields)-1) for i := 1; i <= varfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) varfields += ",var" + strconv.Itoa(i) askmarks += ",?" } allfields := varfields for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) allfields += ",var" + strconv.Itoa(i) } func() { // insert var wgi sync.WaitGroup n := 0 tx, err := mdb.Begin() if err != nil { logger.Info("err", fmt.Sprint(err)) return } // begin, err := mdb.Prepare("begin") // if err != nil { // logger.Info("err", fmt.Sprint(err)) // return // } // commit, err := mdb.Prepare("commit") // if err != nil { // logger.Info("err", fmt.Sprint(err)) // return // } stmt, err := tx.Prepare("insert into " + tablename + "(" + varfields + ") values(" + askmarks + ")") if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rq := make(chan interface{}, 100) // _, err = begin.Exec() // if err != nil { // logger.Info("err", fmt.Sprint(err)) // return // } for _, d := range sampledata { args := []interface{}{ d["id"], d["name"], d["value"], d["desc"], fmt.Sprint(dbname, ".", tablename, ".", d["id"]), } for vi := 2; vi <= varfieldscount; vi++ { args = append(args, d["var"+strconv.Itoa(vi)]) } rq <- 1 wgi.Add(1) go func() { defer wgi.Done() defer func() { <-rq }() // mtx.RLock() // defer mtx.RUnlock() _, err = stmt.Exec(args...) if err != nil { logger.Info("err", fmt.Sprint(err)) } // logger.Info("rslt", fmt.Sprint(rslt.LastInsertId()), fmt.Sprint(rslt.RowsAffected())) n++ x := atomic.AddInt32(&insertcount, 1) if x%10000 == 0 || n%10000 == 0 { logger.Info("sqlite insert total ", x, " ", dbname+"."+tablename, n, " use time ", time.Since(ist).String()) } }() } wgi.Wait() err = tx.Commit() // _, err = begin.Exec() if err != nil { logger.Info("err", fmt.Sprint(err)) return } var m runtime.MemStats runtime.ReadMemStats(&m) var meminuse = m.HeapInuse + m.MCacheInuse + m.MSpanInuse + m.StackInuse logger.Info("sqlite insert ", insertcount, dbname+"."+tablename, len(sampledata), " use time ", time.Since(ist).String(), " memory in use ", util.BytesSize(meminuse)) }() }(ti) } wg.Wait() }() } dbwg.Wait() st = time.Now() for dbi := 1; dbi <= dbcount; dbi++ { dbwg.Add(1) dbname := fmt.Sprint("memdb", dbi) mdb := mdbs[dbi-1] go func() { defer dbwg.Done() var wg sync.WaitGroup st = time.Now() for ti := 1; ti <= tablecount; ti++ { wg.Add(1) go func(ti int) { defer wg.Done() tablename := fmt.Sprint("test", ti) fields := []string{"id", "name", "value", "desc"} varfields := strings.Join(fields, ",") askmarks := "?" + strings.Repeat(",?", len(fields)-1) for i := 1; i <= varfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) varfields += ",var" + strconv.Itoa(i) askmarks += ",?" } allfields := varfields for i := varfieldscount + 1; i <= varfieldscount+emptyfieldscount; i++ { fields = append(fields, "var"+strconv.Itoa(i)) allfields += ",var" + strconv.Itoa(i) } func() { // select // mtx.RLock() // defer mtx.RUnlock() st := time.Now() stmt, err := mdb.Prepare("select " + varfields + " from " + tablename) if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rows, err := stmt.Query() if err != nil { logger.Info("err", fmt.Sprint(err)) return } logger.Info("sqlite select query all", " use time ", time.Since(st).String()) defer func() { _ = rows.Close() }() cols, err := rows.Columns() if err != nil { logger.Info("err", fmt.Sprint(err)) return } colTypes, err := rows.ColumnTypes() if err != nil { logger.Info("err", fmt.Sprint(err)) return } types := make([]reflect.Type, len(colTypes)) for i, tp := range colTypes { st := tp.ScanType() if st == nil { continue } types[i] = st } data := []map[string]interface{}{} for rows.Next() { values := make([]interface{}, len(cols)) for i := range values { if types[i] == nil { values[i] = &[]byte{} } else { values[i] = reflect.New(types[i]).Interface() } } if err = rows.Scan(values...); err != nil { logger.Info("err", fmt.Sprint(err)) return } m := make(map[string]interface{}) for i, v := range values { if v == nil { //values[i] = nil m[colTypes[i].Name()] = nil } else { n, ok := colTypes[i].Length() m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface()) } } data = append(data, m) // if len(data)%10000 == 0 { // logger.Info("sqlite fetch data ", len(data), " use time ", time.Since(st).String()) // } } logger.Info("sqlite fetch data ", dbname+"."+tablename, len(data), " use time ", time.Since(st).String()) }() }(ti) } wg.Wait() }() } dbwg.Wait() logger.Info("sqlite insert and fetch data total ", insertcount, " use time ", time.Since(st).String()) stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id") if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rows, err := stmt.Query() if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer func() { _ = rows.Close() }() cols, err := rows.Columns() if err != nil { logger.Info("err", fmt.Sprint(err)) return } colTypes, err := rows.ColumnTypes() if err != nil { logger.Info("err", fmt.Sprint(err)) return } types := make([]reflect.Type, len(colTypes)) for i, tp := range colTypes { st := tp.ScanType() if st == nil { continue } types[i] = st } data := []map[string]interface{}{} for rows.Next() { values := make([]interface{}, len(cols)) for i := range values { if types[i] == nil { values[i] = &[]byte{} } else { values[i] = reflect.New(types[i]).Interface() } } if err = rows.Scan(values...); err != nil { logger.Info("err", fmt.Sprint(err)) return } m := make(map[string]interface{}) for i, v := range values { if v == nil { m[colTypes[i].Name()] = nil } else { n, ok := colTypes[i].Length() m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface()) } } data = append(data, m) if len(data)%1000 == 0 { logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String()) bs, _ := json.MarshalIndent(m, "", " ") logger.Info("eg:", string(bs)) } } logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String()) { _, err := mdbs[0].Exec("insert into test1 (id, var1) values (-1, '--0--1--2--')") if err != nil { logger.Info("err", fmt.Sprint(err)) return } _, err = mdbs[1].Exec("insert into test3 (id, var1) values (-1, '--1--3--2--')") if err != nil { logger.Info("err", fmt.Sprint(err)) return } _, err = mdbs[2].Exec("insert into test2 (id, var1) values (-1, '--2--2--2--')") if err != nil { logger.Info("err", fmt.Sprint(err)) return } stmt, err := mdba.Prepare("select a.id, a.var1 as avar1, b.var1 as bvar1, c.var1 as cvar1 from memdb1.test1 as a, memdb2.test3 as b , memdb3.test2 as c where a.id=b.id and c.id=a.id and b.id<0") if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer stmt.Close() rows, err := stmt.Query() if err != nil { logger.Info("err", fmt.Sprint(err)) return } defer func() { _ = rows.Close() }() cols, err := rows.Columns() if err != nil { logger.Info("err", fmt.Sprint(err)) return } colTypes, err := rows.ColumnTypes() if err != nil { logger.Info("err", fmt.Sprint(err)) return } types := make([]reflect.Type, len(colTypes)) for i, tp := range colTypes { st := tp.ScanType() if st == nil { continue } types[i] = st } data = []map[string]interface{}{} for rows.Next() { values := make([]interface{}, len(cols)) for i := range values { if types[i] == nil { values[i] = &[]byte{} } else { values[i] = reflect.New(types[i]).Interface() } } if err = rows.Scan(values...); err != nil { logger.Info("err", fmt.Sprint(err)) return } m := make(map[string]interface{}) for i, v := range values { if v == nil { m[colTypes[i].Name()] = nil } else { n, ok := colTypes[i].Length() m[colTypes[i].Name()] = fmt.Sprint(types[i], "[", n, "]", ok, ":", reflect.ValueOf(v).Elem().Interface()) } } data = append(data, m) bs, _ := json.MarshalIndent(m, "", " ") logger.Info("eg:", string(bs)) } logger.Info("sqlite fetch data from mixdb ", len(data), " use time ", time.Since(st).String()) } }