datasync.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. package datasync
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "regexp"
  7. "runtime"
  8. "strings"
  9. "sync"
  10. "time"
  11. "git.wecise.com/wecise/odb-go/dbo"
  12. "git.wecise.com/wecise/odb-go/odb"
  13. "git.wecise.com/wecise/odb-go/odbc"
  14. "gitee.com/wecisecode/util/merrs"
  15. "gitee.com/wecisecode/util/mfmt"
  16. "gitee.com/wecisecode/util/rc"
  17. "github.com/scylladb/go-set/strset"
  18. "github.com/spf13/cast"
  19. )
  20. var mcfg = odbc.Config
  21. var logger = odbc.Logger
  22. type DataSync struct {
  23. odbcFrom odb.Client
  24. odbcTo odb.Client
  25. schemaFrom *dbo.Schema
  26. schemaTo *dbo.Schema
  27. fromodbserver string
  28. fromkeyspace string
  29. fromdc string
  30. toodbserver string
  31. tokeyspace string
  32. todc string
  33. fromdata []string
  34. classmapping map[string][]string
  35. datatimesince time.Duration
  36. buckettimesince time.Duration
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. wg *sync.WaitGroup
  40. mutex sync.Mutex
  41. errs []error
  42. ctrlrc *rc.RoutinesController
  43. objectrc *rc.RoutinesController
  44. bucketrc *rc.RoutinesController
  45. syncstatus *SyncStatus
  46. }
  47. func NewDataSync() *DataSync {
  48. return &DataSync{}
  49. }
  50. func (ds *DataSync) Init() (err error) {
  51. ds.fromodbserver = mcfg.GetString("datasync.from.odbserver")
  52. ds.fromkeyspace = mcfg.GetString("datasync.from.keyspace")
  53. ds.toodbserver = mcfg.GetString("datasync.to.odbserver")
  54. ds.tokeyspace = mcfg.GetString("datasync.to.keyspace")
  55. if ds.fromodbserver == "" ||
  56. ds.fromkeyspace == "" ||
  57. ds.toodbserver == "" ||
  58. ds.tokeyspace == "" {
  59. return odbc.NoConfError.New("need configure settings: datasync.from.odbserver, datasync.from.keyspace, datasync.to.odbserver, datasync.to.keyspace")
  60. }
  61. ds.fromdc = mcfg.GetString("datasync.from.dc")
  62. ds.todc = mcfg.GetString("datasync.to.dc")
  63. ds.fromdata = mcfg.GetStrings("datasync.from.data")
  64. ds.classmapping = mcfg.GetMapping("datasync.mapping.class")
  65. ds.odbcFrom, err = odb.NewClient(&odb.Config{
  66. Keyspace: ds.fromkeyspace,
  67. Hosts: strings.Split(ds.fromodbserver, ","),
  68. })
  69. if err != nil {
  70. if strings.Contains(err.Error(), "error: EOF") {
  71. println("\n!!!should add your ip to odbserver(" + ds.fromodbserver + ") whitelist!!!\n")
  72. os.Exit(1)
  73. }
  74. return merrs.New(err)
  75. }
  76. ds.odbcTo, err = odb.NewClient(&odb.Config{
  77. Keyspace: ds.tokeyspace,
  78. Hosts: strings.Split(ds.toodbserver, ","),
  79. })
  80. if err != nil {
  81. if strings.Contains(err.Error(), "error: EOF") {
  82. println("\n!!!should add your ip to odbserver(" + ds.toodbserver + ") whitelist!!!\n")
  83. os.Exit(1)
  84. }
  85. return merrs.New(err)
  86. }
  87. ds.schemaFrom = dbo.NewSchema(ds.odbcFrom)
  88. ds.schemaTo = dbo.NewSchema(ds.odbcTo)
  89. ctrlthreads := mcfg.GetInt("datasync.ctrl.threads", runtime.GOMAXPROCS(0))
  90. ds.ctrlrc = rc.NewRoutinesControllerLimit("", ctrlthreads, ctrlthreads*2)
  91. concurthreads := mcfg.GetInt("datasync.concur.threads", runtime.GOMAXPROCS(0))
  92. ds.objectrc = rc.NewRoutinesControllerLimit("", concurthreads, concurthreads*2)
  93. bucketthreads := mcfg.GetInt("datasync.bucket.threads", runtime.GOMAXPROCS(0))
  94. ds.bucketrc = rc.NewRoutinesControllerLimit("", bucketthreads, bucketthreads*2)
  95. ds.datatimesince = mcfg.GetDuration("datasync.data.time.since", "365d")
  96. ds.buckettimesince = mcfg.GetDuration("datasync.bucket.time.since", "30d")
  97. return nil
  98. }
  99. func (ds *DataSync) Run() (done <-chan error) {
  100. logger.Info(mcfg.Info())
  101. ret := make(chan error, 1)
  102. key := regexp.MustCompile(`\W`).ReplaceAllString(strings.Split(ds.fromodbserver, ",")[0]+"_"+ds.fromkeyspace+"_"+strings.Split(ds.toodbserver, ",")[0]+"_"+ds.tokeyspace, "_")
  103. ds.syncstatus = NewSyncStatus(key)
  104. if !mcfg.GetBool("reload") {
  105. e := ds.syncstatus.Load()
  106. if e != nil {
  107. ret <- e
  108. return ret
  109. }
  110. }
  111. go ds.run(ret)
  112. return ret
  113. }
  114. func (ds *DataSync) run(ret chan error) {
  115. fromdatas := []string{}
  116. for _, fromdata := range ds.fromdata {
  117. fromdata = strings.TrimSpace(fromdata)
  118. if len(fromdata) > 0 {
  119. fromdatas = append(fromdatas, fromdata)
  120. }
  121. }
  122. if len(fromdatas) == 0 {
  123. cis, e := ds.odbcFrom.ClassInfo("/", true)
  124. if e != nil {
  125. ret <- e
  126. return
  127. }
  128. for _, ci := range cis {
  129. fromdatas = append(fromdatas, ci.Fullname)
  130. }
  131. }
  132. ctx, cancel := context.WithCancel(context.Background())
  133. defer cancel()
  134. ds.ctx = ctx
  135. ds.cancel = cancel
  136. logger.Info("resume sync data, from", len(fromdatas), "configure")
  137. for {
  138. ds.wg = &sync.WaitGroup{}
  139. // logger.Trace(1)
  140. ds.syncstatus.Resume()
  141. // logger.Trace(2)
  142. for _, fromdata := range fromdatas {
  143. mqlfrom := fromdata
  144. // logger.Trace(3)
  145. ds.startsyncproc(ds.wg, ds.ctrlrc, func() error {
  146. logger.Info("sync data, from", mqlfrom)
  147. return ds.syncdata(mqlfrom)
  148. })
  149. }
  150. ds.wg.Wait()
  151. ds.syncstatus.WaitSaveDone()
  152. if len(ds.errs) > 0 {
  153. e := merrs.New(ds.errs)
  154. logger.Error(e)
  155. ret <- e
  156. return
  157. }
  158. logger.Info("total sync data", ds.syncstatus.TotalChunks(), "chunks", ds.syncstatus.TotalRecords(), "records,", "use time:", mfmt.FormatDuration(ds.syncstatus.TotalUseTime()))
  159. interval := mcfg.GetDuration("datasync.run.interval", 0)
  160. if interval > 0 {
  161. time.Sleep(interval)
  162. } else {
  163. break
  164. }
  165. }
  166. ret <- nil
  167. }
  168. func (ds *DataSync) startsyncproc(wg *sync.WaitGroup, rc *rc.RoutinesController, proc func() error) {
  169. wg.Add(1)
  170. // logger.Trace(4)
  171. e := rc.ConcurCall(1, func() {
  172. // logger.Trace(5)
  173. defer wg.Done()
  174. e := ds.ctx.Err()
  175. if e != nil {
  176. logger.Error(e)
  177. return
  178. }
  179. e = proc()
  180. if e != nil {
  181. logger.Error(e)
  182. if !merrs.ContextError.Contains(e) {
  183. ds.mutex.Lock()
  184. ds.errs = append(ds.errs, e)
  185. ds.mutex.Unlock()
  186. }
  187. ds.cancel()
  188. }
  189. })
  190. if e != nil {
  191. // logger.Trace(6)
  192. ds.mutex.Lock()
  193. ds.errs = append(ds.errs, merrs.New(e))
  194. ds.mutex.Unlock()
  195. wg.Done()
  196. }
  197. // logger.Trace(7)
  198. }
  199. // 同步一块数据
  200. // mqlfrom 可以是类名 或 查询语句
  201. func (ds *DataSync) syncdata(mqlfrom string) error {
  202. // 统一格式化为查询语句
  203. logger.Debug("syncdata", mqlfrom)
  204. mqlfrom = FormatMQL(mqlfrom)
  205. // 已完成同步进度
  206. fromclass, fields, condition, e := ds.parseMQL(mqlfrom)
  207. if e != nil {
  208. return merrs.New(e)
  209. }
  210. cifroms, e := ds.schemaFrom.LoadClassinfos(fromclass)
  211. if e != nil {
  212. return merrs.New(e)
  213. }
  214. logger.Debug("loadClassinfos", fromclass, len(cifroms))
  215. for _, v := range cifroms {
  216. cifrom := v
  217. ds.startsyncproc(ds.wg, ds.objectrc, func() error {
  218. return ds.syncclassdata(cifrom, fields, condition)
  219. })
  220. }
  221. return nil
  222. }
  223. var reselect = regexp.MustCompile(`(?is)select\s.*`)
  224. var reselectfromclass = regexp.MustCompile(`(?is)select\s+(.*)\s+from\s+(\S+)(?:\s+(.*)\s*)?`)
  225. var commentexprs = regexp.MustCompile(`(?s)\/\*(?:[^\*]|\*+[^\*\/])*\*+\/`)
  226. var commentexprs_2 = regexp.MustCompile(`(?ms)(?:^|\n)\-\-[^\n]*(?:\n|$)`)
  227. var commentexprs_3 = regexp.MustCompile(`(?ms)(?:^|\n)//[^\n]*(?:\n|$)`)
  228. func FormatMQL(mql string) string {
  229. mql = commentexprs.ReplaceAllString(mql, "")
  230. mql = commentexprs_2.ReplaceAllString(mql, "")
  231. mql = commentexprs_3.ReplaceAllString(mql, "")
  232. mql = strings.TrimSpace(mql)
  233. if !reselect.MatchString(mql) {
  234. mql = "select * from " + mql
  235. }
  236. return mql
  237. }
  238. func (ds *DataSync) parseMQL(mql string) (class string, fields, condition string, err error) {
  239. logger.Debug("parseMQL", mql)
  240. defer func() {
  241. logger.Debug("parseMQL return", mql, class, fields, condition)
  242. }()
  243. selstmt := reselectfromclass.FindStringSubmatch(mql)
  244. if len(selstmt) < 3 {
  245. return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
  246. }
  247. fields = selstmt[1]
  248. class = selstmt[2]
  249. condition = selstmt[3]
  250. if class == "" {
  251. return "", "", "", merrs.New("from.data select statement error", []string{"mql", mql})
  252. }
  253. return class, fields, condition, nil
  254. }
  255. var rewhere = regexp.MustCompile(`(?is)\swhere\s(.*)`)
  256. func mqlAddVtimeRange(mql, sbeginvtime, sendvtime string) string {
  257. mqlseg := mql
  258. segwhere := fmt.Sprint(" where vtime>='", sbeginvtime, "' and vtime<'", sendvtime, "'")
  259. if rewhere.MatchString(mql) {
  260. mqlseg = rewhere.ReplaceAllString(mqlseg, segwhere+" and ($1)")
  261. } else {
  262. mqlseg = mqlseg + segwhere
  263. }
  264. return mqlseg
  265. }
  266. func (ds *DataSync) syncclassdata(cifrom *dbo.ClassInfoHelper, fields, condition string) error {
  267. logger.Debug("syncclassdata", cifrom.Fullname, fields, condition)
  268. denyclass := strset.New(mcfg.GetStrings("datasync.deny.class")...)
  269. logger.Debug("denyclass", denyclass)
  270. if denyclass.Has(cifrom.Fullname) || denyclass.Has(cifrom.Fullname+"/") {
  271. logger.Debug(cifrom.Fullname, "in denyclass")
  272. return nil
  273. }
  274. checkdenyclass := cifrom
  275. for checkdenyclass != nil {
  276. if denyclass.Has(checkdenyclass.BaseClassFullname() + "/") {
  277. logger.Debug(checkdenyclass.BaseClassFullname(), "in denyclass")
  278. return nil
  279. }
  280. checkdenyclass = ds.schemaFrom.GetClassInfo(checkdenyclass.BaseClassFullname())
  281. }
  282. // 确定目标类已创建
  283. toclass := ds.classmapping[cifrom.Fullname]
  284. if len(toclass) > 1 {
  285. return merrs.New("datasync.mapping.class config error, should be map to one class only", []string{"fromclass", cifrom.Fullname, "toclass", fmt.Sprint(toclass)})
  286. }
  287. if len(toclass) == 0 || toclass[0] == "" {
  288. toclass = []string{cifrom.Fullname}
  289. }
  290. cito, e := ds.assureToClass(toclass[0], cifrom)
  291. if e != nil {
  292. return merrs.New(e)
  293. }
  294. //
  295. mqlfrom := "select " + fields + " from " + cifrom.Fullname
  296. if condition != "" {
  297. mqlfrom += " " + condition
  298. }
  299. dc := ds.syncstatus.DoneCount(mqlfrom)
  300. isrunning := <-dc.isrunning
  301. if isrunning {
  302. dc.isrunning <- isrunning
  303. return nil
  304. }
  305. dc.isrunning <- true
  306. defer func() {
  307. <-dc.isrunning
  308. dc.isrunning <- false
  309. }()
  310. recordscount := dc.RecordsCount
  311. sfromvtime := dc.FromVtime
  312. slastdatavtime := dc.LastDataVtime
  313. slastsyncvtime := dc.LastSyncVtime
  314. // 分段获取数据
  315. fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
  316. lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
  317. lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
  318. sincevtime := time.Now().Add(-ds.datatimesince)
  319. if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
  320. ssincevtime := sincevtime.Format("2006-01-02 00:00:00")
  321. firstdatavtime := time.Now()
  322. sfirstdatavtime := firstdatavtime.Format("2006-01-02 15:04:05.000000")
  323. for i := 0; ; i++ {
  324. mqlseg := mqlAddVtimeRange(mqlfrom, ssincevtime, sfirstdatavtime)
  325. mqlchunk := mqlseg + " order by vtime limit 1"
  326. logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, mqlchunk)
  327. // 读取源数据
  328. r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
  329. if e != nil {
  330. return merrs.New(e)
  331. }
  332. if len(r.Data) == 0 {
  333. // 没有更多数据
  334. if i == 0 {
  335. logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, "no data")
  336. ds.syncstatus.RemoveDoneCount(mqlfrom)
  337. return nil
  338. }
  339. logger.Info("check first data vtime:", ds.odbcFrom.Config().Keyspace, cifrom.Fullname, sfirstdatavtime, mqlfrom)
  340. break
  341. }
  342. firstdata := r.Data[0]
  343. firstdatavtime = firstdata["vtime"].(time.Time)
  344. sfirstdatavtime = firstdatavtime.Format("2006-01-02 15:04:05.000000")
  345. logger.Debug("check first data vtime:", ds.odbcFrom.Config().Keyspace, firstdata["class"], firstdata["id"], sfirstdatavtime)
  346. }
  347. fromvtime = firstdatavtime
  348. lastdatavtime = firstdatavtime
  349. lastsyncvtime = firstdatavtime
  350. sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
  351. slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
  352. slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
  353. recordscount = 0
  354. // 初始化DataCount进度信息
  355. dc.RecordsCount = recordscount
  356. dc.FromVtime = sfromvtime
  357. dc.LastDataVtime = slastdatavtime
  358. dc.LastSyncVtime = slastsyncvtime
  359. }
  360. // 继续执行相关bucket数据同步
  361. e = ds.syncbucketdatacontinue(cifrom, cito, mqlfrom)
  362. if e != nil {
  363. return merrs.New(e)
  364. }
  365. // 继续执行对象数据同步
  366. interval := mcfg.GetDuration("datasync.run.interval", "1m")
  367. lastsyncvtime = lastsyncvtime.Add(-interval)
  368. if lastdatavtime.After(lastsyncvtime) {
  369. lastsyncvtime = lastdatavtime
  370. }
  371. lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
  372. nextvtime := lastsyncvtime
  373. run := true
  374. for run {
  375. nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.pagetime", "1d"))
  376. if time.Now().Before(nextvtime) {
  377. nextvtime = time.Now()
  378. run = false
  379. }
  380. snextvtime := nextvtime.Format("2006-01-02 15:04:05")
  381. mqlseg := mqlAddVtimeRange(mqlfrom, slastsyncvtime, snextvtime)
  382. offset := 0
  383. for {
  384. mqlchunk := mqlseg + fmt.Sprint(" limit ", offset, ",", mcfg.GetInt("datasync.pagesize", 50))
  385. logger.Debug(mqlchunk)
  386. // 读取源数据
  387. r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
  388. if e != nil {
  389. return merrs.New(e)
  390. }
  391. if len(r.Data) == 0 {
  392. // 没有更多数据
  393. break
  394. }
  395. // 写入目标数据
  396. for _, data := range r.Data {
  397. e = ds.insertData(mqlfrom, cifrom, cito, data)
  398. if e != nil {
  399. return e
  400. }
  401. datavtime := data["vtime"].(time.Time)
  402. if lastdatavtime.Before(datavtime) {
  403. lastdatavtime = datavtime
  404. }
  405. }
  406. offset += len(r.Data)
  407. }
  408. lastsyncvtime = nextvtime
  409. slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
  410. recordscount += int64(offset)
  411. //
  412. dc.RecordsCount = recordscount
  413. dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
  414. dc.LastSyncVtime = slastsyncvtime
  415. ds.syncstatus.Save(mqlfrom, dc)
  416. }
  417. logger.Info("total sync data:", mqlfrom, "vtime:", dc.FromVtime, "~", dc.LastSyncVtime, "records:", dc.RecordsCount, "lastvtime:", dc.LastDataVtime)
  418. return nil
  419. }
  420. func (ds *DataSync) insertData(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, data map[string]any) error {
  421. if data["class"] != cito.Fullname {
  422. data["class"] = cito.Fullname
  423. }
  424. logger.Debug("insertData", data["class"], data["id"], data["vtime"])
  425. vals := []any{}
  426. for _, fn := range cito.Fieldslist {
  427. if cito.Fieldinfos[fn].Fieldtype == "bucket" {
  428. vals = append(vals, nil)
  429. continue
  430. }
  431. v := data[fn]
  432. if v == nil {
  433. i := strings.Index(fn, ":")
  434. if i >= 0 {
  435. fn = fn[i+1:]
  436. }
  437. v = data[fn]
  438. }
  439. vals = append(vals, v)
  440. }
  441. _, e := ds.odbcTo.Query(cito.Insertmql, vals...).WithContext(ds.ctx).Do()
  442. if e != nil {
  443. return merrs.New(e, []string{"mql", cito.Insertmql, "vals", fmt.Sprint(data)})
  444. }
  445. e = ds.syncbucketdatanew(cifrom, cito, mqlfrom, data)
  446. if e != nil {
  447. return e
  448. }
  449. return nil
  450. }
  451. func (ds *DataSync) syncbucketdatanew(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string, data map[string]any) error {
  452. if ds.buckettimesince == 0 {
  453. return nil
  454. }
  455. for _, bf := range cifrom.BucketFields {
  456. if _, has := data[bf]; has {
  457. ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
  458. oid := data["id"].(string)
  459. buckettype := cast.ToString(cifrom.Fieldinfos[bf].Fieldoption["type"])
  460. key := buckettype + ":" + cifrom.Fullname + ":" + bf + "[" + oid + "]"
  461. dc := ds.syncstatus.DoneCount(key)
  462. dc.BucketClass = cifrom.Fullname
  463. dc.BucketField = bf
  464. dc.BucketObjID = oid
  465. return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
  466. })
  467. }
  468. }
  469. return nil
  470. }
  471. func (ds *DataSync) syncbucketdatacontinue(cifrom, cito *dbo.ClassInfoHelper, mqlfrom string) error {
  472. if ds.buckettimesince == 0 {
  473. return nil
  474. }
  475. mqlchunk := mqlfrom + " limit 1"
  476. logger.Debug("check data fields:", ds.odbcFrom.Config().Keyspace, mqlchunk)
  477. // 读取源数据
  478. r, e := ds.odbcFrom.Query(mqlchunk).WithContext(ds.ctx).Do()
  479. if e != nil {
  480. return e
  481. }
  482. if len(r.Data) == 0 {
  483. return nil
  484. }
  485. data := r.Data[0]
  486. for _, bf := range cifrom.BucketFields {
  487. if _, has := data[bf]; has {
  488. e := func() error {
  489. bucketdonecount := ds.syncstatus.DoneCountCopy(func(k string, v *DoneCount) bool {
  490. return v.BucketClass == cifrom.Fullname && v.BucketField == bf
  491. })
  492. for k, v := range bucketdonecount {
  493. key := k
  494. dc := v
  495. ds.startsyncproc(ds.wg, ds.bucketrc, func() error {
  496. return ds.syncbucketdata(mqlfrom, cifrom, cito, key, dc)
  497. })
  498. }
  499. return nil
  500. }()
  501. if e != nil {
  502. return e
  503. }
  504. }
  505. }
  506. return nil
  507. }
  508. func (ds *DataSync) syncbucketdata(mqlfrom string, cifrom, cito *dbo.ClassInfoHelper, dckey string, dc *DoneCount) error {
  509. isrunning := <-dc.isrunning
  510. if isrunning {
  511. dc.isrunning <- isrunning
  512. return nil
  513. }
  514. dc.isrunning <- true
  515. defer func() {
  516. <-dc.isrunning
  517. dc.isrunning <- false
  518. }()
  519. bucketType := cast.ToString(cifrom.Fieldinfos[dc.BucketField].Fieldoption["type"])
  520. logger.Debug("to sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, mqlfrom)
  521. recordscount := dc.RecordsCount
  522. sfromvtime := dc.FromVtime
  523. slastdatavtime := dc.LastDataVtime
  524. slastsyncvtime := dc.LastSyncVtime
  525. // 分段获取数据
  526. fromvtime, _ := time.Parse("2006-01-02 15:04:05", sfromvtime)
  527. lastdatavtime, _ := time.Parse("2006-01-02 15:04:05.000000", slastdatavtime)
  528. lastsyncvtime, _ := time.Parse("2006-01-02 15:04:05", slastsyncvtime)
  529. sincevtime := time.Now().Add(-ds.buckettimesince)
  530. if fromvtime.Before(sincevtime) || lastsyncvtime.Before(fromvtime) || lastdatavtime.Before(fromvtime) {
  531. fromvtime = sincevtime
  532. lastdatavtime = sincevtime
  533. lastsyncvtime = sincevtime
  534. sfromvtime = fromvtime.Format("2006-01-02 15:04:05")
  535. slastdatavtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
  536. slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
  537. recordscount = 0
  538. // 初始化DataCount进度信息
  539. dc.RecordsCount = recordscount
  540. dc.FromVtime = sfromvtime
  541. dc.LastDataVtime = slastdatavtime
  542. dc.LastSyncVtime = slastsyncvtime
  543. }
  544. // 继续执行数据同步
  545. interval := mcfg.GetDuration("datasync.run.interval", "1m")
  546. lastsyncvtime = lastsyncvtime.Add(-interval)
  547. if lastdatavtime.After(lastsyncvtime) {
  548. lastsyncvtime = lastdatavtime
  549. }
  550. lastsyncvtime = lastsyncvtime.Add(-mcfg.GetDuration("datasync.run.overtime", "30s"))
  551. nextvtime := lastsyncvtime
  552. run := true
  553. for run {
  554. nextvtime = lastsyncvtime.Add(mcfg.GetDuration("datasync.bucket.pagetime", "1h"))
  555. if time.Now().Before(nextvtime) {
  556. nextvtime = time.Now()
  557. run = false
  558. }
  559. snextvtime := nextvtime.Format("2006-01-02 15:04:05")
  560. offset := 0
  561. {
  562. mqlchunk := "select " + dc.BucketField + ".time('" + slastsyncvtime + "','" + snextvtime + "')" + " from " + dc.BucketClass + " where id=?"
  563. logger.Debug(mqlchunk, dc.BucketObjID)
  564. // 读取源数据
  565. r, e := ds.odbcFrom.Query(mqlchunk, dc.BucketObjID).WithContext(ds.ctx).Do()
  566. if e != nil {
  567. return merrs.New(e)
  568. }
  569. if len(r.Data) == 0 {
  570. return merrs.New("bucket host data not found id="+dc.BucketObjID, merrs.Map{"mql": mqlchunk})
  571. }
  572. idata := r.Data[0][dc.BucketField]
  573. if idata != nil {
  574. data := cast.ToSlice(idata)
  575. ms := []map[string]any{}
  576. for i := 0; i < len(data); i++ {
  577. dat := cast.ToSlice(data[i])
  578. if len(dat) >= 3 {
  579. m := map[string]any{}
  580. m["__timestamp__"] = dat[0]
  581. m["__name__"] = dat[1]
  582. m["__value__"] = dat[2]
  583. if len(dat) >= 4 {
  584. tm := cast.ToStringMap(dat[3])
  585. for k, v := range tm {
  586. m[k] = v
  587. }
  588. }
  589. ms = append(ms, m)
  590. }
  591. }
  592. // 写入目标数据
  593. mqlinsert := "insert into " + dc.BucketClass + "(id," + dc.BucketField + ") values(?,?)"
  594. _, e = ds.odbcTo.Query(mqlinsert, dc.BucketObjID, ms).WithContext(ds.ctx).Do()
  595. if e != nil {
  596. return merrs.New(e)
  597. }
  598. lastdatavtime = nextvtime
  599. offset += len(data)
  600. }
  601. }
  602. lastsyncvtime = nextvtime
  603. slastsyncvtime = lastsyncvtime.Format("2006-01-02 15:04:05")
  604. recordscount += int64(offset)
  605. //
  606. dc.RecordsCount = recordscount
  607. dc.LastDataVtime = lastdatavtime.Format("2006-01-02 15:04:05.000000")
  608. dc.LastSyncVtime = slastsyncvtime
  609. ds.syncstatus.Save(dckey, dc)
  610. }
  611. logger.Info("total sync", bucketType, "data", dc.BucketClass, dc.BucketField, "id:", dc.BucketObjID, recordscount, "records", "to time", dc.LastSyncVtime)
  612. return nil
  613. }
  614. func (ds *DataSync) assureToClass(toclass string, cifrom *dbo.ClassInfoHelper) (cito *dbo.ClassInfoHelper, err error) {
  615. ddl := cifrom.DDL
  616. logger.Info("assureToClass", cifrom.Classfullname, toclass)
  617. if toclass != cifrom.Classfullname {
  618. re := regexp.MustCompile(`(?i)(create\s+class(?:\s+if\s+not\s+exists)*\s+)([^\()]+)(\s*\(.*)`)
  619. ddl = re.ReplaceAllString(ddl, "$1"+toclass+"$3")
  620. }
  621. cis, e := ds.odbcTo.ClassInfo(toclass, false)
  622. if e != nil && !merrs.NotExistError.Contains(e) && !strings.Contains(e.Error(), "not exists") {
  623. return nil, merrs.New(e)
  624. }
  625. if len(cis) == 0 {
  626. logger.Debug("auto create class", ddl)
  627. _, e = ds.odbcTo.Query(ddl).WithContext(ds.ctx).Do()
  628. if e != nil {
  629. return nil, merrs.New(e)
  630. }
  631. cis, e = ds.odbcTo.ClassInfo(toclass, false)
  632. if e != nil {
  633. return nil, merrs.New(e)
  634. }
  635. if len(cis) == 0 {
  636. return nil, merrs.New("len(cis) == 0")
  637. }
  638. }
  639. cito, e = ds.schemaTo.NewClassinfo(cis[0])
  640. if e != nil {
  641. return nil, merrs.New(e)
  642. }
  643. logger.Debug("sync to class", cito.Classfullname)
  644. return cito, nil
  645. }