psync.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. package main
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "os"
  8. "strings"
  9. "time"
  10. //"reflect"
  11. "git.wecise.com/wecise/common/cassandra"
  12. "git.wecise.com/wecise/odbserver/odb/cass/gocql"
  13. "git.wecise.com/wecise/odbserver/odb/promdb/pdstru"
  14. "git.wecise.com/wecise/odbserver/odb/util"
  15. "gitee.com/wecisecode/util/logger"
  16. "gitee.com/wecisecode/util/set/iset"
  17. "gitee.com/wecisecode/util/set/strset"
  18. "gitee.com/wecisecode/util/set/u64set"
  19. "github.com/dgryski/go-farm"
  20. /*. "git.wecise.com/wecise/odbserver/odb"
  21. "git.wecise.com/wecise/odbserver/odb/gerrs"
  22. */)
  23. const (
  24. PROMDB_SINGLE = 0x0001
  25. PROMDB_SHARE = 0x0002
  26. PROMDB_SYNCDC = 0x0004
  27. PROMDB_NOCACHE = 0x0008
  28. // feature
  29. PROMDB_EIGEN = 0x0100
  30. //PROMDB_PARTITION = 0x0200
  31. )
  32. type PromdbInfo struct {
  33. Cluster []string `json:"cluster"`
  34. Keyspace string `json:"keyspace"`
  35. DC string `json:"dc"`
  36. Field string `json:"field"`
  37. }
  38. type SyncConfig struct {
  39. From *PromdbInfo `json:"from"`
  40. To *PromdbInfo `json:"to"`
  41. Class []string `json:"class"`
  42. Date []string `json:"date"`
  43. SyncDict bool `json:"syncdict,omitempty"`
  44. }
  45. func PromdbKeyspaceWithOption(option int, keyspace, dc string) string {
  46. if option&PROMDB_SYNCDC > 0 {
  47. return keyspace
  48. } else {
  49. if dc == "dc1" {
  50. return keyspace + "_tsdb"
  51. } else {
  52. return keyspace + "_" + dc + "_tsdb"
  53. }
  54. }
  55. }
  56. func DateToPartition(dateString string) (int, error) {
  57. if t, err := time.Parse("2006-01-02", dateString); err != nil {
  58. return 0, err
  59. } else {
  60. return MSToPartition(t.UnixNano() / 1000000), nil
  61. }
  62. }
  63. func MSToPartition(msseconds int64) int {
  64. //----------------------
  65. // align with local date
  66. //----------------------
  67. return int((msseconds + util.G_TIME_ZONE*1000) / (86400000))
  68. }
  69. func iddate(from_info *Info, from_promdb_session *gocql.Session, ids []string, dates []string) ([]uint64, []int, error) {
  70. var rows []map[string]interface{}
  71. var err error
  72. if from_info.Option&PROMDB_SHARE > 0 {
  73. var cql string
  74. if from_info.Slot == 0 {
  75. cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid))
  76. if rows, err = from_promdb_session.SliceMap(cql); err != nil {
  77. return nil, nil, err
  78. }
  79. } else {
  80. for i := 0; i < from_info.Slot; i++ {
  81. if i == 0 {
  82. cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid))
  83. } else {
  84. cql = fmt.Sprintf(`select distinct(id, partition, tid) as i_p from %s.promdb_%d_%d`, from_info.Keyspace, uint64(from_info.Tid), i)
  85. }
  86. if one, err := from_promdb_session.SliceMap(cql); err != nil {
  87. return nil, nil, err
  88. } else {
  89. rows = append(rows, one...)
  90. }
  91. }
  92. }
  93. } else {
  94. var cql string
  95. if from_info.Slot == 0 {
  96. cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid))
  97. logger.Warn(cql)
  98. if rows, err = from_promdb_session.SliceMap(cql); err != nil {
  99. return nil, nil, err
  100. }
  101. } else {
  102. for i := 0; i < from_info.Slot; i++ {
  103. if i == 0 {
  104. cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d`, from_info.Keyspace, uint64(from_info.Tid))
  105. } else {
  106. cql = fmt.Sprintf(`select distinct(id, partition) as i_p from %s.promdb_%d_%d`, from_info.Keyspace, uint64(from_info.Tid), i)
  107. }
  108. //logger.Warn(cql)
  109. if one, err := from_promdb_session.SliceMap(cql); err != nil {
  110. return nil, nil, err
  111. } else {
  112. rows = append(rows, one...)
  113. }
  114. }
  115. }
  116. }
  117. var forid, fordate bool
  118. nids := u64set.New()
  119. if len(ids) == 0 {
  120. forid = true
  121. } else {
  122. for _, pid := range ids {
  123. id := farm.Hash64([]byte(strings.TrimSpace(pid)))
  124. nids.Add(id)
  125. }
  126. }
  127. partitions := iset.New()
  128. if len(dates) == 0 {
  129. fordate = true
  130. } else {
  131. for _, sdate := range dates {
  132. sdate = strings.TrimSpace(sdate)
  133. var partition int
  134. if partition, err = DateToPartition(sdate); err != nil {
  135. return nil, nil, errors.New(fmt.Sprintf("date format error :%v .", err))
  136. }
  137. partitions.Add(partition)
  138. }
  139. }
  140. //logger.Warn(rows)
  141. for _, row := range rows {
  142. if forid {
  143. if v, ok := row["i_p[0]"]; ok {
  144. nids.Add(uint64(v.(int64)))
  145. }
  146. }
  147. if fordate {
  148. if v, ok := row["i_p[1]"]; ok {
  149. partitions.Add(v.(int))
  150. }
  151. }
  152. }
  153. return nids.List(), partitions.List(), nil
  154. }
  155. func copyIDPartition(from_info, to_info *Info, from_promdb_session, to_promdb_session *gocql.Session, id uint64, partition int) error {
  156. var rows []map[string]interface{}
  157. var err error
  158. var tmod int
  159. if from_info.Option&PROMDB_SHARE > 0 {
  160. if from_info.Slot > 1 {
  161. tmod = int(id % uint64(from_info.Slot))
  162. }
  163. var cql string
  164. if tmod > 0 {
  165. cql = fmt.Sprintf(`select * from %s.promdbs_%d where tid=? and id=? and partition=?`, from_info.Keyspace, tmod)
  166. } else {
  167. cql = fmt.Sprintf(`select * from %s.promdbs where tid=? and id=? and partition=?`, from_info.Keyspace)
  168. }
  169. rows, err = from_promdb_session.SliceMap(cql, from_info.Tid, id, partition)
  170. } else {
  171. if from_info.Slot > 1 {
  172. tmod = int(id % uint64(from_info.Slot))
  173. }
  174. var cql string
  175. if tmod > 0 {
  176. cql = fmt.Sprintf(`select * from %s.promdb_%d_%d where id=? and partition=?`, from_info.Keyspace, uint64(from_info.Tid), tmod)
  177. } else {
  178. cql = fmt.Sprintf(`select * from %s.promdb_%d where id=? and partition=?`, from_info.Keyspace, uint64(from_info.Tid))
  179. }
  180. //logger.Error(cql, id, partition)
  181. rows, err = from_promdb_session.SliceMap(cql, id, partition)
  182. //logger.Error(rows)
  183. }
  184. if err != nil {
  185. return err
  186. }
  187. if to_info.Slot > 1 {
  188. tmod = int(id % uint64(to_info.Slot))
  189. } else {
  190. tmod = 0
  191. }
  192. var insert_cql string
  193. if tmod > 0 {
  194. if to_info.Option&PROMDB_EIGEN > 0 {
  195. if to_info.Option&PROMDB_SHARE > 0 {
  196. insert_cql = fmt.Sprintf(`insert into %s.promdbs_%d (tid, id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, tmod)
  197. } else {
  198. insert_cql = fmt.Sprintf(`insert into %s.promdb_%d_%d (id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid), tmod)
  199. }
  200. } else {
  201. if to_info.Option&PROMDB_SHARE > 0 {
  202. insert_cql = fmt.Sprintf(`insert into %s.promdbs_%d (tid, id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, tmod)
  203. } else {
  204. insert_cql = fmt.Sprintf(`insert into %s.promdb_%d_%d (id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid), tmod)
  205. }
  206. }
  207. } else {
  208. if to_info.Option&PROMDB_EIGEN > 0 {
  209. if to_info.Option&PROMDB_SHARE > 0 {
  210. insert_cql = fmt.Sprintf(`insert into %s.promdbs (tid, id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace)
  211. } else {
  212. insert_cql = fmt.Sprintf(`insert into %s.promdb_%d (id, partition, data, sdata, endtime, time, hour, num, eigen) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid))
  213. }
  214. } else {
  215. if to_info.Option&PROMDB_SHARE > 0 {
  216. insert_cql = fmt.Sprintf(`insert into %s.promdbs (tid, id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace)
  217. } else {
  218. insert_cql = fmt.Sprintf(`insert into %s.promdb_%d (id, partition, data, sdata, endtime, time, hour, num) values (?, ?, ?, ?, ?, ?, ?, ?)`, to_info.Keyspace, uint64(to_info.Tid))
  219. }
  220. }
  221. }
  222. for _, row := range rows {
  223. var data []byte
  224. if row["data"] != nil {
  225. data = row["data"].([]byte)
  226. }
  227. var sdata map[int64]string
  228. if row["sdata"] != nil {
  229. sdata = row["sdata"].(map[int64]string)
  230. }
  231. endtime := row["endtime"].(float64)
  232. time := row["time"].(float64)
  233. var hour int
  234. if row["hour"] != nil {
  235. hour = row["hour"].(int)
  236. }
  237. var num int
  238. if row["num"] != nil {
  239. num = row["num"].(int)
  240. }
  241. var eigen int64
  242. if from_info.Option&PROMDB_EIGEN > 0 && row["eigen"] != nil {
  243. eigen = row["eigen"].(int64)
  244. }
  245. if to_info.Option&PROMDB_EIGEN > 0 {
  246. if eigen == 0 {
  247. pdistru := &pdstru.PromdbInsertStru{Sdata: sdata, Data: data}
  248. eigen = pdistru.Eigen()
  249. }
  250. if to_info.Option&PROMDB_SHARE > 0 {
  251. if err = to_promdb_session.Exec(insert_cql, from_info.Tid, id, partition, data, sdata, endtime, time, hour, num, eigen); err != nil {
  252. return err
  253. }
  254. } else {
  255. //logger.Warn(insert_cql, id, partition, data, sdata, endtime, time, hour, num, eigen)
  256. if err = to_promdb_session.Exec(insert_cql, id, partition, data, sdata, endtime, time, hour, num, eigen); err != nil {
  257. logger.Warn(err)
  258. return err
  259. }
  260. }
  261. } else {
  262. if to_info.Option&PROMDB_SHARE > 0 {
  263. if err = to_promdb_session.Exec(insert_cql, from_info.Tid, id, partition, data, sdata, endtime, time, hour, num); err != nil {
  264. return err
  265. }
  266. } else {
  267. //logger.Warn(id, partition, data, sdata, endtime, time, hour, num, eigen)
  268. if err = to_promdb_session.Exec(insert_cql, id, partition, data, sdata, endtime, time, hour, num); err != nil {
  269. return err
  270. }
  271. }
  272. }
  273. }
  274. return nil
  275. }
  276. func copyPromdb(from_info, to_info *Info, from_promdb_session, to_promdb_session *gocql.Session, ids, dates []string) (err error) {
  277. logger.Infof("sync from(tid): %v, to(tid): %v, id: %v, date: %v .", from_info.Tid, to_info.Tid, ids, dates)
  278. logger.Infof("from info: %v , to_info: %v", from_info, to_info)
  279. if len(ids) == 0 || len(dates) == 0 {
  280. ids, partitions, err := iddate(from_info, from_promdb_session, ids, dates)
  281. //logger.Warn(ids, partitions)
  282. if err != nil {
  283. return err
  284. }
  285. for _, id := range ids {
  286. for _, partition := range partitions {
  287. logger.Infof("sync id:%d , partition:%d .", id, partition)
  288. if err := copyIDPartition(from_info, to_info, from_promdb_session, to_promdb_session, id, partition); err != nil {
  289. return err
  290. }
  291. }
  292. }
  293. return nil
  294. }
  295. for _, pid := range ids {
  296. id := farm.Hash64([]byte(strings.TrimSpace(pid)))
  297. for _, sdate := range dates {
  298. logger.Infof("sync id:%s , date:%s .", pid, sdate)
  299. sdate = strings.TrimSpace(sdate)
  300. var partition int
  301. if partition, err = DateToPartition(sdate); err != nil {
  302. return errors.New(fmt.Sprintf("date format error :%v .", err))
  303. }
  304. if err := copyIDPartition(from_info, to_info, from_promdb_session, to_promdb_session, id, partition); err != nil {
  305. return err
  306. }
  307. }
  308. }
  309. return nil
  310. }
  311. type Option struct {
  312. Keyspace string
  313. Addrs []string
  314. User string
  315. Pass string
  316. DisableInitialHostLookup bool
  317. }
  318. type Client struct {
  319. Session *gocql.Session
  320. }
  321. type Info struct {
  322. Keyspace string
  323. Option int
  324. Tid int64
  325. Slot int
  326. Dict bool
  327. }
  328. func GetEnv() (addrs []string, user string, pass string, err error) {
  329. return cassandra.GetEnv()
  330. }
  331. func NewCassClient(opt *Option) (*Client, error) {
  332. c, e := cassandra.NewClientByOption(cassandra.Option{
  333. Keyspace: opt.Keyspace,
  334. Addrs: opt.Addrs, // 默认通过 GetEnv() 获取
  335. User: opt.User, // 默认通过 GetEnv() 获取
  336. Pass: opt.Pass, // 默认通过 GetEnv() 获取
  337. DCPolicy: "", // 默认从ETCD获取
  338. Consistency: "", // 默认从ETCD获取
  339. DisableInitialHostLookup: opt.DisableInitialHostLookup})
  340. if e != nil {
  341. return nil, e
  342. }
  343. return &Client{
  344. Session: &gocql.Session{
  345. Session: c.Session,
  346. ClusterConfig: c.Config,
  347. Keyspace: opt.Keyspace,
  348. }}, nil
  349. }
  350. func NewCassSession(cassOption *Option) (*gocql.Session, error) {
  351. client, err := NewCassClient(cassOption)
  352. if err != nil {
  353. return nil, err
  354. }
  355. return client.Session, nil
  356. }
  357. func classInfo(from_session *gocql.Session, class string) ([]string, string, error) {
  358. cql := `select subclass, namespace from class where name= ?`
  359. if rows, err := from_session.SliceMap(cql, class); err != nil {
  360. return nil, "", err
  361. } else if len(rows) == 0 {
  362. return nil, "", errors.New(fmt.Sprintf("class %s not exists .", class))
  363. } else {
  364. subclass := []string{}
  365. namespace := ""
  366. if sub, ok := rows[0]["subclass"]; ok {
  367. subclass = sub.([]string)
  368. }
  369. if ns, ok := rows[0]["namespace"]; ok {
  370. namespace = ns.(string)
  371. }
  372. return subclass, namespace, nil
  373. }
  374. }
  375. func findSubClass(from_session *gocql.Session, class string) ([]string, error) {
  376. cql := `select name from class`
  377. if rows, err := from_session.SliceMap(cql); err != nil {
  378. return nil, err
  379. } else {
  380. clazz := []string{class[:len(class)-1]}
  381. for _, row := range rows {
  382. cls := row["name"].(string)
  383. if strings.HasPrefix(cls, class) {
  384. clazz = append(clazz, cls)
  385. }
  386. }
  387. return clazz, nil
  388. }
  389. }
  390. func main() {
  391. /*config := `
  392. {
  393. "From" : {
  394. "cluster" : ["47.92.151.165"],
  395. "keyspace": "matrix",
  396. "DC" : "dc1",
  397. "field" : "prom"
  398. },
  399. "To" : {
  400. "cluster" : ["47.92.151.165"],
  401. "keyspace": "matrix",
  402. "DC" : "dc1",
  403. "field" : "prom2"
  404. },
  405. "start": "2024-09-14 00:00:00",
  406. "end" : "2024-09-15 00:00:00" }
  407. `*/
  408. config_file := "config.json"
  409. if len(os.Args) >= 2 {
  410. config_file = os.Args[1]
  411. }
  412. var from_session, to_session, from_promdb_session, to_promdb_session *gocql.Session
  413. var err error
  414. conf := &SyncConfig{}
  415. if content, err := ioutil.ReadFile(config_file); err != nil {
  416. logger.Errorf("read config file : %s, error: %v", config_file, err)
  417. } else {
  418. if err := json.Unmarshal([]byte(content), conf); err != nil {
  419. logger.Errorf("config error: %v", err)
  420. } else {
  421. logger.Infof("From: %v", conf.From)
  422. logger.Infof("To: %v", conf.To)
  423. logger.Infof("Class: %v", conf.Class)
  424. logger.Infof("Date: %v", conf.Date)
  425. }
  426. }
  427. logger.SetConsole(true)
  428. from_session, err = NewCassSession(&Option{Addrs: conf.From.Cluster, Keyspace: conf.From.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true})
  429. if err != nil {
  430. logger.Error(err.Error())
  431. return
  432. } else {
  433. defer from_session.Close()
  434. }
  435. to_session, err = NewCassSession(&Option{Addrs: conf.To.Cluster, Keyspace: conf.To.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true})
  436. if err != nil {
  437. logger.Error(err.Error())
  438. return
  439. } else {
  440. defer to_session.Close()
  441. }
  442. cql := `select tid, option, slot, dict from promdb where field= ? limit 1 ALLOW FILTERING`
  443. var from_info, to_info *Info
  444. if srows, err := from_session.SliceMap(cql, conf.From.Field); err != nil {
  445. logger.Error(err.Error())
  446. return
  447. } else if trows, err := to_session.SliceMap(cql, conf.To.Field); err != nil {
  448. logger.Error(err.Error())
  449. return
  450. } else {
  451. if len(srows) == 0 {
  452. logger.Error(fmt.Sprintf("field %s not exist .", conf.From.Field))
  453. return
  454. }
  455. if len(trows) == 0 {
  456. logger.Error(fmt.Sprintf("field %s not exist .", conf.To.Field))
  457. return
  458. }
  459. from_option := srows[0]["option"].(int)
  460. from_keyspace := PromdbKeyspaceWithOption(from_option, conf.From.Keyspace, conf.From.DC)
  461. var self_dict bool
  462. if dict, ok := srows[0]["dict"]; ok {
  463. self_dict = dict.(bool)
  464. }
  465. from_info = &Info{Keyspace: from_keyspace, Option: from_option, Tid: srows[0]["tid"].(int64), Slot: srows[0]["slot"].(int), Dict: self_dict}
  466. to_option := trows[0]["option"].(int)
  467. to_keyspace := PromdbKeyspaceWithOption(to_option, conf.To.Keyspace, conf.To.DC)
  468. if dict, ok := trows[0]["dict"]; ok {
  469. self_dict = dict.(bool)
  470. }
  471. to_info = &Info{Keyspace: to_keyspace, Option: to_option, Tid: trows[0]["tid"].(int64), Slot: trows[0]["slot"].(int), Dict: self_dict}
  472. }
  473. from_promdb_session, err = NewCassSession(&Option{Addrs: conf.From.Cluster, Keyspace: from_info.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true})
  474. if err != nil {
  475. logger.Error(err.Error())
  476. return
  477. } else {
  478. defer from_promdb_session.Close()
  479. }
  480. to_promdb_session, err = NewCassSession(&Option{Addrs: conf.To.Cluster, Keyspace: to_info.Keyspace, User: "cassandra", Pass: "9CxUjyNZ1QrztwigBeFjxA==", DisableInitialHostLookup: true})
  481. if err != nil {
  482. logger.Error(err.Error())
  483. return
  484. } else {
  485. defer to_promdb_session.Close()
  486. }
  487. clazz := strset.New()
  488. if len(conf.Class) > 0 {
  489. for _, class := range conf.Class {
  490. normal_class := true
  491. if strings.HasSuffix(class, "/") {
  492. normal_class = false
  493. if clss, err := findSubClass(from_session, class); err != nil {
  494. logger.Warnf(err.Error())
  495. continue
  496. } else {
  497. for _, c := range clss {
  498. clazz.Add(c)
  499. }
  500. }
  501. class = class[:len(class)-1]
  502. } else {
  503. clazz.Add(class)
  504. }
  505. // class exists ?
  506. subclass, namespace, errr := classInfo(from_session, class)
  507. if errr != nil {
  508. logger.Warnf(errr.Error())
  509. continue
  510. } else if len(subclass) > 0 {
  511. normal_class = false
  512. }
  513. table := "object"
  514. if namespace != "" {
  515. table = "object_" + namespace
  516. }
  517. var cql, param string
  518. if normal_class {
  519. cql = fmt.Sprintf("SELECT id FROM %s WHERE class = ?", table)
  520. param = class
  521. } else {
  522. idx := table + "_lucene"
  523. cql = fmt.Sprintf("SELECT id FROM %s WHERE expr(%s, ?)", table, idx)
  524. param = fmt.Sprintf(`{
  525. filter: {
  526. type: "boolean",
  527. should: [
  528. {type: "match", field: "class", value: "%s"},
  529. {type: "prefix", field: "class", value: "%s/"}
  530. ]
  531. }
  532. }`, class, class)
  533. }
  534. iter := from_session.Query(cql, param).PageSize(1000).Iter()
  535. defer iter.Close()
  536. var id string
  537. ids := []string{}
  538. for iter.Scan(&id) {
  539. ids = append(ids, id)
  540. if len(ids) >= 100 {
  541. if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, ids, conf.Date); err != nil {
  542. logger.Error(err.Error())
  543. return
  544. }
  545. ids = []string{}
  546. }
  547. }
  548. if len(ids) > 0 {
  549. if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, ids, conf.Date); err != nil {
  550. logger.Error(err.Error())
  551. return
  552. }
  553. }
  554. }
  555. } else {
  556. if err := copyPromdb(from_info, to_info, from_promdb_session, to_promdb_session, []string{}, conf.Date); err != nil {
  557. logger.Error(err.Error())
  558. return
  559. }
  560. }
  561. if conf.SyncDict {
  562. logger.Info("sync promdb dict .")
  563. var from_dict_cql string
  564. if from_info.Dict {
  565. from_dict_cql = fmt.Sprintf(`select name, class, ikey, labels from promdb_dict_%s where field='%s' allow filtering`, conf.From.Field, conf.From.Field)
  566. } else {
  567. from_dict_cql = fmt.Sprintf(`select name, class, ikey, labels from promdb_dict where field='%s' allow filtering`, conf.From.Field)
  568. }
  569. //logger.Error(from_dict_cql)
  570. iter2 := from_session.Query(from_dict_cql).PageSize(1000).Iter()
  571. defer iter2.Close()
  572. var name, class string
  573. var ikey int64
  574. var labels map[string]string
  575. var to_dict_cql string
  576. if from_info.Dict {
  577. to_dict_cql = fmt.Sprintf(`insert into promdb_dict_%s (field, name, class, ikey, labels) values (?, ?, ?, ?, ?)`, conf.To.Field)
  578. } else {
  579. to_dict_cql = `insert into promdb_dict (field, name, class, ikey, labels) values (?, ?, ?, ?, ?)`
  580. }
  581. count := 0
  582. for iter2.Scan(&name, &class, &ikey, &labels) {
  583. //logger.Error(to_dict_cql, conf.To.Field, name, class, ikey, labels)
  584. if clazz.Size() == 0 || clazz.Has(class) {
  585. count++
  586. if count%1000 == 0 {
  587. logger.Infof("%d", count)
  588. }
  589. //logger.Error(to_dict_cql, conf.To.Field, name, class, ikey, labels)
  590. if err = to_session.Exec(to_dict_cql, conf.To.Field, name, class, ikey, labels); err != nil {
  591. logger.Warn(err)
  592. }
  593. }
  594. }
  595. logger.Infof("total : %d", count)
  596. }
  597. }