Quellcode durchsuchen

fixbug: rc queue limit count

libf vor 4 Monaten
Ursprung
Commit
d28b682c1d
17 geänderte Dateien mit 2041 neuen und 124 gelöschten Zeilen
  1. 15 5
      go.mod
  2. 41 10
      go.sum
  3. 131 0
      importer/datainfo.go
  4. 5 1
      importer/importer.go
  5. 127 108
      importer/odbcimporter.go
  6. 10 0
      sqlite/0.cfglog.go
  7. 16 0
      sqlite/command.go
  8. 120 0
      sqlite/iter.go
  9. 13 0
      sqlite/schema/field.go
  10. 97 0
      sqlite/schema/tableinfo.go
  11. 45 0
      sqlite/schema/value.go
  12. 958 0
      sqlite/sqldb.go
  13. 57 0
      sqlite/sqltype.go
  14. 306 0
      sqlite/sqlvalue.go
  15. 7 0
      sqlite/stmt.go
  16. 14 0
      sqlite/tableinfo.go
  17. 79 0
      sqlite/timestamp.go

+ 15 - 5
go.mod

@@ -3,11 +3,12 @@ module git.wecise.com/wecise/cgimport
 go 1.20
 
 require (
-	git.wecise.com/wecise/odb-go v0.0.0-20250206115649-e441a00d8837
-	git.wecise.com/wecise/util v0.0.0-20250207094530-58e32065187c
+	git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316
+	git.wecise.com/wecise/util v0.0.0-20250211084319-911232a9300a
 	github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13
 	github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e
 	github.com/spf13/cast v1.7.0
+	modernc.org/sqlite v1.30.1
 )
 
 require (
@@ -16,15 +17,18 @@ require (
 	github.com/coreos/go-semver v0.3.0 // indirect
 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+	github.com/dustin/go-humanize v1.0.1 // indirect
 	github.com/fatih/color v1.18.0 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect
 	github.com/golang/protobuf v1.5.4 // indirect
 	github.com/gomodule/redigo v1.8.5 // indirect
-	github.com/google/uuid v1.3.1 // indirect
+	github.com/google/uuid v1.6.0 // indirect
+	github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
 	github.com/mattn/go-colorable v0.1.13 // indirect
 	github.com/mattn/go-isatty v0.0.20 // indirect
-	github.com/pkg/errors v0.9.1 // indirect
+	github.com/ncruces/go-strftime v0.1.9 // indirect
 	github.com/redis/go-redis/v9 v9.0.5 // indirect
+	github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
 	github.com/spacemonkeygo/errors v0.0.0-20201030155909-2f5f890dbc62 // indirect
 	github.com/tidwall/uhatools v0.4.1 // indirect
 	github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
@@ -35,7 +39,7 @@ require (
 	go.uber.org/atomic v1.7.0 // indirect
 	go.uber.org/multierr v1.6.0 // indirect
 	go.uber.org/zap v1.17.0 // indirect
-	golang.org/x/net v0.23.0 // indirect
+	golang.org/x/net v0.25.0 // indirect
 	golang.org/x/sys v0.28.0 // indirect
 	golang.org/x/text v0.21.0 // indirect
 	google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
@@ -45,4 +49,10 @@ require (
 	google.golang.org/protobuf v1.33.0 // indirect
 	gopkg.in/ini.v1 v1.67.0 // indirect
 	gopkg.in/yaml.v3 v3.0.1 // indirect
+	modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect
+	modernc.org/libc v1.55.3 // indirect
+	modernc.org/mathutil v1.6.0 // indirect
+	modernc.org/memory v1.8.0 // indirect
+	modernc.org/strutil v1.2.0 // indirect
+	modernc.org/token v1.1.0 // indirect
 )

+ 41 - 10
go.sum

@@ -1,7 +1,7 @@
-git.wecise.com/wecise/odb-go v0.0.0-20250206115649-e441a00d8837 h1:drVImCwSwQkFL6fUIu30VzosQIFn4S0oMnDL0KRNi90=
-git.wecise.com/wecise/odb-go v0.0.0-20250206115649-e441a00d8837/go.mod h1:V8ZATNv1dapTbBKakWi2uhGTMvtShpIK1V8ks3mQGS0=
-git.wecise.com/wecise/util v0.0.0-20250207094530-58e32065187c h1:/vu0uQEnqKITft0ULqqgmZCRN42maUuzn3Y2mUH72hk=
-git.wecise.com/wecise/util v0.0.0-20250207094530-58e32065187c/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
+git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316 h1:4ljPCr2MUA9w3HGRiS4dwYuqKJAbuLBoaho70kO8D8I=
+git.wecise.com/wecise/odb-go v0.0.0-20250208123107-b502a8626316/go.mod h1:0/+7FWRDsMND6k6fgW836IXTYEd1vzOdBuIDEA2FnX8=
+git.wecise.com/wecise/util v0.0.0-20250211084319-911232a9300a h1:411X8V9jkrpo8Klj7gS/LuauhSv0UxiNSs1vt94WvSQ=
+git.wecise.com/wecise/util v0.0.0-20250211084319-911232a9300a/go.mod h1:2YXWE9m5mNgAu40zpYrL3woGz6S8CoHAW/CJeWXaIko=
 github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
 github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
 github.com/bsm/ginkgo/v2 v2.7.0 h1:ItPMPH90RbmZJt5GtkcNvIRuGEdwlBItdNVoyzaNQao=
@@ -19,6 +19,8 @@ github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WA
 github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
+github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
+github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
 github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM=
 github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU=
 github.com/fatih/set v0.2.1 h1:nn2CaJyknWE/6txyUDGwysr3G5QC6xWB/PtVjPBbeaA=
@@ -33,8 +35,11 @@ github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUz
 github.com/gomodule/redigo v1.8.5 h1:nRAxCa+SVsyjSBrtZmG/cqb6VbTmuRzpg/PoTFlpumc=
 github.com/gomodule/redigo v1.8.5/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
-github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
-github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -44,13 +49,16 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
 github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
+github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4=
+github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
+github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
-github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/redis/go-redis/v9 v9.0.5 h1:CuQcn5HIEeK7BgElubPP8CGtE0KakrnbBSTLjathl5o=
 github.com/redis/go-redis/v9 v9.0.5/go.mod h1:WqMKv5vnQbRuZstUwxQI195wHy+t4PuXDOjzMvcuQHk=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
+github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
 github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
 github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e h1:7q6NSFZDeGfvvtIRwBrU/aegEYJYmvev0cHAwo17zZQ=
 github.com/scylladb/go-set v1.0.3-0.20200225121959-cc7b2070d91e/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
@@ -88,15 +96,17 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
-golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
-golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
+golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
+golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -112,6 +122,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
+golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -137,3 +148,23 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C
 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ=
+modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y=
+modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE=
+modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw=
+modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI=
+modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4=
+modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U=
+modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w=
+modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4=
+modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo=
+modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E=
+modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU=
+modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
+modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
+modernc.org/sqlite v1.30.1 h1:YFhPVfu2iIgUf9kuA1CR7iiHdcEEsI2i+yjRYHscyxk=
+modernc.org/sqlite v1.30.1/go.mod h1:DUmsiWQDaAvU4abhc/N+djlom/L2o8f7gZ95RCvyoLU=
+modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
+modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
+modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
+modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=

+ 131 - 0
importer/datainfo.go

@@ -0,0 +1,131 @@
+package importer
+
+import (
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"git.wecise.com/wecise/cgimport/schema"
+	"git.wecise.com/wecise/util/cmap"
+	"git.wecise.com/wecise/util/merrs"
+)
+
+type classdatainfo struct {
+	*schema.ClassInfo
+	insertcount   int64
+	lastlogtime   time.Time
+	lastlogicount int64
+	mutex         sync.Mutex
+}
+
+var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
+
+// 根据数据修正类定义
+func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
+	for _, classname := range schema.ClassNames {
+		ci := schema.ClassInfos.GetIFPresent(classname)
+		if ci == nil {
+			return merrs.NewError("classinfo not found " + classname)
+		}
+		cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
+			if odbci.client != nil {
+				_, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
+				if e != nil {
+					if !strings.Contains(e.Error(), "not find") {
+						return nil, e
+					}
+					logger.Info("create class " + ci.Classfullname)
+					_, e = odbci.client.Query(ci.Createmql).Do()
+					if e != nil {
+						return nil, e
+					}
+					// add graph tags
+					_, e = odbci.client.Query(ci.Addtagmql, ci.Classaliasname, ci.Classaliasname, []string{ci.Classaliasname}).Do()
+					if e != nil {
+						return nil, e
+					}
+				}
+			}
+			cdi = &classdatainfo{ClassInfo: ci}
+			return
+		})
+		if e != nil {
+			return e
+		}
+		classdatainfos.Set(ci.Classfullname, cdi)
+	}
+	if odbci.client != nil {
+		for _, createedgemql := range schema.CreateEdgeMqls {
+			_, e := odbci.client.Query(createedgemql).Do()
+			if e != nil && !strings.Contains(e.Error(), "already exist") {
+				return e
+			}
+		}
+	}
+	return
+}
+
+func (odbci *ODBCImporter) reload() error {
+	if odbci.client != nil {
+		for i := len(schema.ClassNames) - 1; i >= 0; i-- {
+			classname := schema.ClassNames[i]
+			ci := schema.ClassInfos.GetIFPresent(classname)
+			if ci == nil {
+				continue
+			}
+			e := odbci.dropclass(ci.Classfullname)
+			if e != nil {
+				return e
+			}
+		}
+		e := odbci.InitLocalDB(true)
+		if e != nil {
+			return e
+		}
+	}
+	return nil
+}
+
+func (odbci *ODBCImporter) init() error {
+	e := odbci.InitLocalDB(false)
+	if e != nil {
+		return e
+	}
+	return nil
+}
+
+func (odbci *ODBCImporter) dropclass(classnames ...string) error {
+	for _, classname := range classnames {
+		for retry := 2; retry >= 0; retry-- {
+			_, e := odbci.client.Query(`delete from /matrix/tagdir where tags='` + classname + `'`).Do()
+			_ = e
+			_, e = odbci.client.Query(`delete from "` + classname + `" with version`).Do()
+			_ = e
+			_, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
+			if e != nil {
+				matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
+				if len(matchstr) >= 2 {
+					e = odbci.dropclass(matchstr[1])
+					if e != nil {
+						return e
+					}
+				} else {
+					matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
+					if len(matchstr) >= 2 {
+						e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
+						if e != nil {
+							return e
+						}
+					}
+				}
+				if retry > 0 {
+					continue
+				}
+				return e
+			}
+		}
+		logger.Info("drop class " + classname)
+	}
+	return nil
+}

+ 5 - 1
importer/importer.go

@@ -43,7 +43,7 @@ func ImportDir(datapath string, parallel int, reload bool) (totalfilescount, tot
 		reload:       reload,
 		importstatus: NewCGIStatus(),
 		fileimportrc: rc.NewRoutinesController("", parallel),
-		odbcqueryrc:  rc.NewRoutinesController("", parallel*10),
+		odbcqueryrc:  rc.NewRoutinesController("", mcfg.GetInt("odbc.concurrent.limit", parallel*5)),
 		odbcimporter: NewODBCImporter(),
 	}
 	return importer.Import()
@@ -55,6 +55,10 @@ func (importer *Importer) Import() (totalfilescount, totalrecordscount int64, to
 		if err != nil {
 			return
 		}
+		err = importer.odbcimporter.init()
+		if err != nil {
+			return
+		}
 	} else {
 		// reload
 		// 清除已有类

+ 127 - 108
importer/odbcimporter.go

@@ -3,15 +3,12 @@ package importer
 import (
 	"encoding/json"
 	"fmt"
-	"regexp"
-	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
 
 	"git.wecise.com/wecise/cgimport/graph"
 	"git.wecise.com/wecise/cgimport/odbc"
-	"git.wecise.com/wecise/cgimport/schema"
 	"git.wecise.com/wecise/odb-go/odb"
 	"git.wecise.com/wecise/util/cast"
 	"git.wecise.com/wecise/util/cmap"
@@ -20,71 +17,37 @@ import (
 	"github.com/scylladb/go-set/strset"
 )
 
-type classdatainfo struct {
-	*schema.ClassInfo
-	insertcount   int64
-	lastlogtime   time.Time
-	lastlogicount int64
-	mutex         sync.Mutex
-}
-
-var classdatainfos = cmap.NewSingle[string, *classdatainfo]()
-
 type ODBCImporter struct {
 	client odb.Client
+	// localdb *sqlite.SQLDB
 }
 
 func NewODBCImporter() *ODBCImporter {
 	odbci := &ODBCImporter{}
 	if odbc.DevPhase&(odbc.DP_CREATECLASS|odbc.DP_INSERTDATA) != 0 {
 		odbci.client = odbc.ODBC()
+		// var e error
+		// odbci.localdb, e = sqlite.NewSQLDB(odbc.Keyspace, "localdb", false)
+		// if e != nil {
+		// 	panic(e)
+		// }
 	}
 	return odbci
 }
 
-// 根据数据修正类定义
-func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
-	for _, classname := range schema.ClassNames {
-		ci := schema.ClassInfos.GetIFPresent(classname)
-		if ci == nil {
-			return merrs.NewError("classinfo not found " + classname)
-		}
-		cdi, e := classdatainfos.GetWithNew(ci.Classaliasname, func() (cdi *classdatainfo, err error) {
-			if odbci.client != nil {
-				_, e := odbci.client.Query("select class,id from " + ci.Classfullname + " limit 1").Do()
-				if e != nil {
-					if !strings.Contains(e.Error(), "not find") {
-						return nil, e
-					}
-					logger.Info("create class " + ci.Classfullname)
-					_, e = odbci.client.Query(ci.Createmql).Do()
-					if e != nil {
-						return nil, e
-					}
-					// add graph tags
-					_, e = odbci.client.Query(ci.Addtagmql, ci.Classaliasname, ci.Classaliasname, []string{ci.Classaliasname}).Do()
-					if e != nil {
-						return nil, e
-					}
-				}
-			}
-			cdi = &classdatainfo{ClassInfo: ci}
-			return
-		})
-		if e != nil {
-			return e
-		}
-		classdatainfos.Set(ci.Classfullname, cdi)
-	}
-	if odbci.client != nil {
-		for _, createedgemql := range schema.CreateEdgeMqls {
-			_, e := odbci.client.Query(createedgemql).Do()
-			if e != nil && !strings.Contains(e.Error(), "already exist") {
-				return e
-			}
-		}
-	}
-	return
+func (odbci *ODBCImporter) InitLocalDB(force bool) error {
+	// return odbci.localdb.InitTable(&schema.TableDefine{
+	// 	TableName: "localcache",
+	// 	Fields: schema.Fields{
+	// 		{Name: "key", Type: "TEXT"},
+	// 		{Name: "value", Type: "TEXT"},
+	// 	},
+	// 	Indexes: map[string][]string{
+	// 		"key": {"key"},
+	// 	},
+	// 	Ttl: 0,
+	// }, force)
+	return nil
 }
 
 // func (odbci *ODBCImporter) InsertEdge(data map[string]any) (err error) {
@@ -146,6 +109,7 @@ func (odbci *ODBCImporter) ReviseClassStruct() (err error) {
 // }
 
 var masterlevel1mutex = make([]sync.Mutex, 256)
+
 var masterdatas = cmap.New[string, map[string]any]()
 var level1datas = cmap.New[string, map[string]any]()
 
@@ -210,6 +174,113 @@ func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string,
 	return nil
 }
 
+// func (odbci *ODBCImporter) masterlevel1data(classaliasname string, suid string, data map[string]any) error {
+// 	key := strings.ReplaceAll("suid", "'", "''")
+// 	hidx := int(farm.Hash32([]byte(suid)) % uint32(len(masterlevel1mutex)))
+// 	masterlevel1mutex[hidx].Lock()
+// 	defer masterlevel1mutex[hidx].Unlock()
+// 	switch classaliasname {
+// 	case "master":
+// 		iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
+// 		if e != nil {
+// 			return e
+// 		}
+// 		maps, e := iter.AllMaps()
+// 		if e != nil {
+// 			return e
+// 		}
+// 		var level1data = map[string]any{}
+// 		if len(maps) == 0 {
+// 			bs_data, e := msgpack.Encode(data)
+// 			if e != nil {
+// 				return e
+// 			}
+// 			_, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
+// 			if e != nil {
+// 				return e
+// 			}
+// 			level1data = map[string]any{}
+// 			for k, v := range data {
+// 				if k == "id" {
+// 					oid, _, e := graph.GetNodeId("level1", data)
+// 					if e != nil {
+// 						return e
+// 					}
+// 					v = oid
+// 				}
+// 				level1data[k] = v
+// 			}
+// 		} else {
+// 			// 后插入 master
+// 			s_level1data := maps[0]["value"].(string)
+// 			e = msgpack.Decode([]byte(s_level1data), &level1data)
+// 			if e != nil {
+// 				return e
+// 			}
+// 			_, e = odbci.localdb.Delete(map[string]any{"key": key})
+// 			if e != nil {
+// 				return e
+// 			}
+// 			// 用 master 补齐 level1 数据
+// 			// data 数据不能变,需要后续插入 master
+// 			for k, v := range data {
+// 				if _, has := level1data[k]; !has {
+// 					level1data[k] = v
+// 				}
+// 			}
+// 		}
+// 		// 重新插入完整的 level1
+// 		e = odbci.insertData("level1", level1data)
+// 		if e != nil {
+// 			return e
+// 		}
+// 	case "level1":
+// 		iter, e := odbci.localdb.Select(nil, "select * from localcache where key='"+strings.ReplaceAll("suid", "'", "''")+"'")
+// 		if e != nil {
+// 			return e
+// 		}
+// 		maps, e := iter.AllMaps()
+// 		if e != nil {
+// 			return e
+// 		}
+// 		var masterdata = map[string]any{}
+// 		if len(maps) == 0 {
+// 			// 先插入 level 1
+// 			bs_data, e := msgpack.Encode(data)
+// 			if e != nil {
+// 				return e
+// 			}
+// 			_, e = odbci.localdb.Insert(map[string]any{"key": key, "value": string(bs_data)}, false)
+// 			if e != nil {
+// 				return e
+// 			}
+// 		} else {
+// 			// 后插入 level1
+// 			s_masterdata := maps[0]["value"].(string)
+// 			e = msgpack.Decode([]byte(s_masterdata), &masterdata)
+// 			if e != nil {
+// 				return e
+// 			}
+// 			_, e = odbci.localdb.Delete(map[string]any{"key": key})
+// 			if e != nil {
+// 				return e
+// 			}
+// 			// 用 level1 补齐 master 数据
+// 			for k, v := range data {
+// 				masterdata[k] = v
+// 			}
+// 			// 完整 level1 数据
+// 			data = masterdata
+// 		}
+// 		// 插入 level1 数据
+// 		e = odbci.insertData("level1", data)
+// 		if e != nil {
+// 			return e
+// 		}
+// 	}
+// 	return nil
+// }
+
 // 插入数据
 func (odbci *ODBCImporter) InsertData(classaliasname string, data map[string]any) (err error) {
 	oid, suid, e := graph.GetNodeId(classaliasname, data)
@@ -393,55 +464,3 @@ func (odbci *ODBCImporter) alldone() {
 		return true
 	})
 }
-
-func (odbci *ODBCImporter) reload() error {
-	if odbci.client != nil {
-		for i := len(schema.ClassNames) - 1; i >= 0; i-- {
-			classname := schema.ClassNames[i]
-			ci := schema.ClassInfos.GetIFPresent(classname)
-			if ci == nil {
-				continue
-			}
-			e := odbci.dropclass(ci.Classfullname)
-			if e != nil {
-				return e
-			}
-		}
-	}
-	return nil
-}
-
-func (odbci *ODBCImporter) dropclass(classnames ...string) error {
-	for _, classname := range classnames {
-		for retry := 2; retry >= 0; retry-- {
-			_, e := odbci.client.Query(`delete from /matrix/tagdir where tags='` + classname + `'`).Do()
-			_ = e
-			_, e = odbci.client.Query(`delete from "` + classname + `" with version`).Do()
-			_ = e
-			_, e = odbci.client.Query(`drop class if exists "` + classname + `"`).Do()
-			if e != nil {
-				matchstr := regexp.MustCompile(`refer by ([^,]+)`).FindStringSubmatch(e.Error())
-				if len(matchstr) >= 2 {
-					e = odbci.dropclass(matchstr[1])
-					if e != nil {
-						return e
-					}
-				} else {
-					matchstr := regexp.MustCompile(`has children \[([^\]]+)\]`).FindStringSubmatch(e.Error())
-					if len(matchstr) >= 2 {
-						e = odbci.dropclass(strings.Split(matchstr[1], ",")...)
-						if e != nil {
-							return e
-						}
-					}
-				}
-				if retry > 0 {
-					continue
-				}
-				return e
-			}
-		}
-		logger.Info("drop class " + classname)
-	}
-	return nil
-}

+ 10 - 0
sqlite/0.cfglog.go

@@ -0,0 +1,10 @@
+package sqlite
+
+import (
+	"git.wecise.com/wecise/cgimport/odbc"
+)
+
+var mcfg = odbc.Config
+
+// 与当前配置信息 cfg 匹配的日志输出
+var logger = odbc.Logger

+ 16 - 0
sqlite/command.go

@@ -0,0 +1,16 @@
+package sqlite
+
+type CommandType int
+
+const (
+	UCTInsert CommandType = iota + 1
+	UCTUpdate
+	UCTDelete
+)
+
+type Command struct {
+	CmdType         CommandType
+	Data            map[string]any
+	Conds           map[string]any
+	OverwriteExists bool
+}

+ 120 - 0
sqlite/iter.go

@@ -0,0 +1,120 @@
+package sqlite
+
+import (
+	"context"
+	"database/sql"
+	"reflect"
+
+	"git.wecise.com/wecise/util/merrs"
+)
+
+type Row struct {
+	data map[string]interface{}
+}
+
+func (rr *Row) Data() map[string]interface{} {
+	return rr.data
+}
+
+// 迭代器
+type Iter struct {
+	ctx  context.Context
+	rows *sql.Rows
+	cols []*sql.ColumnType
+}
+
+// 获取下一数据
+func (iter *Iter) NextRow() (row *Row, err error) {
+	if iter.ctx != nil {
+		err = iter.ctx.Err()
+		if err != nil {
+			return
+		}
+	}
+	if !iter.rows.Next() {
+		iter.Close()
+		return
+	}
+	ftypes := make([]string, len(iter.cols))
+	values := make([]interface{}, len(iter.cols))
+	for i := range values {
+		coltype := iter.cols[i]
+		scantype := coltype.ScanType()
+		ftypes[i] = coltype.DatabaseTypeName()
+		if scantype == nil {
+			bs := []byte{}
+			values[i] = &bs
+		} else {
+			v := reflect.New(scantype).Interface()
+			values[i] = &v
+		}
+	}
+	if err = iter.rows.Scan(values...); err != nil {
+		err = merrs.NormalError.NewCause(err)
+		iter.Close()
+		return
+	}
+	m := make(map[string]interface{})
+	for i, v := range values {
+		fieldname := iter.cols[i].Name()
+		ftype := ftypes[i]
+		if iter.cols[i].ScanType() == nil {
+			v = nil
+		} else {
+			v = reflect.ValueOf(v).Elem().Interface()
+			v = SQLValueDecode(ftype, v)
+		}
+		m[fieldname] = v
+	}
+	row = &Row{m}
+	return
+}
+
+// 关闭迭代器
+func (iter *Iter) Close() (err error) {
+	return iter.rows.Close()
+}
+
+// 一次获取多行数据
+func (iter *Iter) NextRows(n int) (rows []*Row, err error) {
+	rows = []*Row{}
+	for {
+		rd, e := iter.NextRow()
+		if e != nil {
+			err = merrs.NewError(e)
+			return
+		}
+		if rd == nil || n > 0 && len(rows) >= n {
+			return
+		}
+		rows = append(rows, rd)
+	}
+}
+
+// 一次获取多行map形式的数据
+func (iter *Iter) NextMaps(n int) (rds []map[string]interface{}, err error) {
+	rds = []map[string]interface{}{}
+	for {
+		rd, e := iter.NextRow()
+		if e != nil {
+			err = merrs.NormalError.NewCause(e)
+			return
+		}
+		if rd == nil || n > 0 && len(rds) >= n {
+			return
+		}
+		rds = append(rds, rd.Data())
+	}
+}
+
+// 一次获取所有数据
+func (iter *Iter) AllRows() (rows []*Row, err error) {
+	defer iter.Close()
+	return iter.NextRows(-1)
+}
+
+// 一次获取所有map形式的数据
+func (iter *Iter) AllMaps() (rds []map[string]interface{}, err error) {
+	defer iter.Close()
+	return iter.NextMaps(-1)
+}

+ 13 - 0
sqlite/schema/field.go

@@ -0,0 +1,13 @@
+package schema
+
+type Field struct {
+	Name     string            //字段名
+	Type     string            //字段类型
+	Defval   any               // 默认值
+	Partkeyi int               // 0 普通字段,n 在分区键中的位置
+	Idkeyi   int               // 0 普通字段,n 在主键中的位置
+	Extra    map[string]string // 附加信息,包括显示名,别名等
+	Tags     []string          // 标记信息
+}
+
+type Fields []*Field

+ 97 - 0
sqlite/schema/tableinfo.go

@@ -0,0 +1,97 @@
+package schema
+
+import (
+	"fmt"
+	"time"
+)
+
+// 对应 sqlite 中存储的 tableinfo 内部结构
+const TableInfoVersion = "1.0"
+
+// 表定义信息
+type TableDefine struct {
+	TableName string
+	Fields    Fields
+	Indexes   map[string][]string
+	Ttl       time.Duration
+}
+
+// 表结构信息,对表定义信息的扩展
+type TableInfo struct {
+	*TableDefine
+	MapNameFields map[string]*Field
+	FieldNames    []string
+	FieldTypes    []string
+	DefValues     []any
+	PartKeys      []string
+	IDKeys        []string
+	LatField      string
+	TableIndexes  map[string][]string
+	Version       string
+}
+
+func (me *TableDefine) TableInfo() (tableinfo *TableInfo, createfieldtypes []string) {
+	fields := me.Fields
+	tableinfo = &TableInfo{
+		TableDefine:   me,
+		MapNameFields: map[string]*Field{},
+		FieldNames:    []string{},
+		FieldTypes:    []string{},
+		DefValues:     []any{},
+		PartKeys:      []string{},
+		IDKeys:        []string{},
+		TableIndexes:  map[string][]string{},
+		LatField:      "",
+		Version:       TableInfoVersion,
+	}
+	for k, v := range me.Indexes {
+		// 用户定义索引,i_ 开头
+		tableinfo.TableIndexes["i_"+k] = v
+	}
+	for _, field := range fields {
+		tableinfo.MapNameFields[field.Name] = field
+		if me.Indexes == nil {
+			// 未指定索引,自动生成全字段索引,i_ 开头
+			// 指定空map,不建索引
+			tableinfo.TableIndexes["i_"+me.TableName+"_"+field.Name] = []string{field.Name}
+		}
+	}
+	for i := 0; ; i++ {
+		tableinfo.LatField = fmt.Sprint("_last_access_time_", i)
+		if tableinfo.MapNameFields[tableinfo.LatField] == nil {
+			last_access_time_field := &Field{
+				Name:   tableinfo.LatField,
+				Type:   "int",
+				Defval: time.Now().UnixNano(),
+			}
+			tableinfo.MapNameFields[tableinfo.LatField] = last_access_time_field
+			fields = append(fields, last_access_time_field)
+			// 内置索引,ii_ 开头,区别于用户定义索引
+			tableinfo.TableIndexes["ii_"+me.TableName+"_"+tableinfo.LatField] = []string{tableinfo.LatField}
+			break
+		}
+	}
+	for _, field := range fields {
+		tableinfo.FieldNames = append(tableinfo.FieldNames, field.Name)
+		tableinfo.FieldTypes = append(tableinfo.FieldTypes, field.Type)
+		createfieldtypes = append(createfieldtypes, `"`+field.Name+`" `+field.Type)
+		if field.Defval == nil {
+			tableinfo.DefValues = append(tableinfo.DefValues, DefaultValue(field.Type))
+		} else {
+			tableinfo.DefValues = append(tableinfo.DefValues, field.Defval)
+		}
+		if field.Partkeyi != 0 {
+			for field.Partkeyi > len(tableinfo.PartKeys) {
+				tableinfo.PartKeys = append(tableinfo.PartKeys, "")
+			}
+			tableinfo.PartKeys[field.Partkeyi-1] = field.Name
+		}
+		if field.Idkeyi != 0 {
+			for field.Idkeyi > len(tableinfo.IDKeys) {
+				tableinfo.IDKeys = append(tableinfo.IDKeys, "")
+			}
+			tableinfo.IDKeys[field.Idkeyi-1] = field.Name
+		}
+	}
+	return
+}

+ 45 - 0
sqlite/schema/value.go

@@ -0,0 +1,45 @@
+package schema
+
+import "time"
+
+func DefaultValue(ftype string) any {
+	switch ftype {
+	case "text":
+		return ""
+	case "varchar":
+		return ""
+	case "string":
+		return ""
+	case "bool":
+		return false
+	case "boolean":
+		return false
+	case "int":
+		return 0
+	case "smallint":
+		return int16(0)
+	case "enum":
+		return int16(0)
+	case "bigint":
+		return int64(0)
+	case "float":
+		return float32(0)
+	case "double":
+		return float64(0)
+	case "date":
+		return "19700101"
+	case "timestamp", "milliseconds":
+		return time.Time{}.UnixNano() / 1e6
+	case "map":
+		return []byte{}
+	case "set":
+		return []byte{}
+	case "list":
+		return []byte{}
+	case "bucket":
+		return []byte{}
+	case "relation":
+		return []byte{}
+	}
+	return nil
+}

+ 958 - 0
sqlite/sqldb.go

@@ -0,0 +1,958 @@
+package sqlite
+
+import (
+	"context"
+	"database/sql"
+	"fmt"
+	"os"
+	"path/filepath"
+	"regexp"
+	"strings"
+	"sync"
+	"time"
+
+	"git.wecise.com/wecise/cgimport/sqlite/schema"
+	"git.wecise.com/wecise/util/cmap"
+	"git.wecise.com/wecise/util/merrs"
+	"git.wecise.com/wecise/util/msgpack"
+	"git.wecise.com/wecise/util/rc"
+	"github.com/spf13/cast"
+	"modernc.org/sqlite"
+	sqlite3 "modernc.org/sqlite/lib"
+)
+
+type Status int
+
+const (
+	Uninited Status = iota // 未初始化,不可读,不可写
+	Initing                // 正在初始化,不可读,不可写
+	Inited                 // 初始化完成,可读,可写
+	Closed                 // 已关闭,不可读,不可写
+)
+
+func (status Status) String() string {
+	switch status {
+	case Uninited:
+		return "Uninited"
+	case Initing:
+		return "Initing"
+	case Inited:
+		return "Inited"
+	case Closed:
+		return "Closed"
+	}
+	return "unkown"
+}
+
+var ReNonWordChars = regexp.MustCompile(`\W`)
+
+// 一个 SQLDB
+// 对应一个读写表
+// 或
+// 对应一个视图,关联多个只读表
+type SQLDB struct {
+	db           *sql.DB
+	keyspace     string
+	dbname       string
+	dbsourcename string
+	//
+	bufferedCommandsMutex sync.Mutex
+	bufferedCommands      []*Command
+	chstatus              chan Status
+	chiniting             chan error
+	// table info
+	infomutex  sync.RWMutex
+	tableinfo  *schema.TableInfo
+	insertStmt *Stmt
+	upsertStmt *Stmt
+	datainfo   *DataInfo
+	statinfo   *StatInfo
+	//
+	reorg_proc ReorgProc
+	reorg_rc   *rc.RoutinesController
+}
+
+type ReorgProc struct {
+	sync.Mutex
+	statinfo_update func()
+	datainfo_update func()
+	expire_clear    func()
+}
+
+var dbs = cmap.New[string, *SQLDB]()
+
+func NewSQLDB(keyspace string, dbname string, inmemory bool) (sdb *SQLDB, err error) {
+	dbkey := fmt.Sprint("K:", keyspace, "N:", dbname, "M:", inmemory)
+	sdb, err = dbs.GetWithNew(dbkey, func() (*SQLDB, error) {
+		return openSQLDB(keyspace, dbname, inmemory, false)
+	})
+	if err != nil {
+		return
+	}
+	if sdb == nil {
+		err = merrs.NewError("NewSQLDB " + dbkey + " error")
+		return
+	}
+	return sdb, nil
+}
+
+var DBPath = "/opt/matrix/var/cgimport/"
+
+func openSQLDB(keyspace string, dbname string, inmemory bool, autorebuilding bool) (sqldb *SQLDB, err error) {
+	var status Status = Uninited
+	dbsourcename := "file:"
+	if inmemory {
+		dbsourcename += dbname + ":memory:?mode=memory&cache=shared"
+	} else {
+		dbfname := filepath.Join(DBPath, keyspace, dbname)
+		e := os.MkdirAll(filepath.Dir(dbfname), os.ModePerm)
+		if e != nil {
+			err = merrs.NewError(e)
+			return nil, err
+		}
+		fi, e := os.Stat(dbfname)
+		if e != nil && !os.IsNotExist(e) {
+			err = merrs.NewError(e)
+			return nil, err
+		}
+		if fi != nil {
+			if autorebuilding {
+				e := os.Remove(dbfname)
+				if e != nil {
+					err = merrs.NewError(e)
+					return nil, err
+				}
+				status = Uninited
+			} else {
+				status = Inited
+			}
+		}
+		dbsourcename += dbfname + "?cache=shared"
+	}
+	db, e := sql.Open("sqlite", dbsourcename)
+	if e != nil {
+		err = merrs.NewError(e)
+		return nil, err
+	}
+	if !inmemory {
+		_, e = db.Exec("PRAGMA temp_store=memory") // 0=1=file 2=memory
+		if e != nil {
+			err = merrs.NewError(e)
+			return nil, err
+		}
+		_, e = db.Exec("PRAGMA journal_mode=memory") // DELETE TRUNCATE PERSIST MEMORY OFF
+		if e != nil {
+			err = merrs.NewError(e)
+			return nil, err
+		}
+		_, e = db.Exec("PRAGMA synchronous=off") // 0=off 1=normal 2=full
+		if e != nil {
+			err = merrs.NewError(e)
+			return nil, err
+		}
+		// _, e = db.Exec("PRAGMA auto_vacuum=none") // 0=none 1=full
+		// if e != nil {
+		// 	err = merrs.NewError(e)
+		// 	return nil
+		// }
+	}
+	// _, e = db.Exec("PRAGMA count_changes=1")
+	// if e != nil {
+	// 	err = merrs.NewError(e)
+	// 	return nil
+	// }
+	_, e = db.Exec("PRAGMA case_sensitive_like=1")
+	if e != nil {
+		err = merrs.NewError(e)
+		return nil, err
+	}
+	sdb := &SQLDB{
+		db:               db,
+		keyspace:         keyspace,
+		dbname:           dbname,
+		dbsourcename:     dbsourcename,
+		bufferedCommands: []*Command{},
+		chstatus:         make(chan Status, 1),
+		chiniting:        make(chan error, 1),
+		reorg_rc:         rc.NewRoutinesController("", 1),
+	}
+	sdb.chiniting <- nil
+	if status == Inited {
+		e := sdb.loadTableInfo()
+		if e != nil {
+			if !merrs.UninitedError.Contains(e) {
+				err = merrs.NewError(e)
+				return nil, err
+			}
+			if !autorebuilding {
+				sdb.db.Close()
+				return openSQLDB(keyspace, dbname, inmemory, true)
+			}
+			status = Uninited
+		}
+	}
+	sdb.chstatus <- status
+	return sdb, nil
+}
+
+func (me *SQLDB) DBName() string {
+	return me.dbname
+}
+
+func (me *SQLDB) SourceName() string {
+	return me.dbsourcename
+}
+
+func (me *SQLDB) Reset() {
+	status := <-me.chstatus
+	if status == Initing {
+		e := <-me.chiniting // 等待初始化完成
+		me.chiniting <- e
+	}
+	me.chstatus <- Uninited
+}
+
+func (me *SQLDB) Close() error {
+	status := <-me.chstatus
+	if status == Initing {
+		e := <-me.chiniting // 等待初始化完成
+		me.chiniting <- e
+	}
+	me.chstatus <- Closed
+	if status == Closed {
+		return nil
+	}
+	return me.db.Close()
+}
+
+func (me *SQLDB) Status() Status {
+	select {
+	case status := <-me.chstatus:
+		me.chstatus <- status
+		return status
+	}
+}
+
+func (me *SQLDB) SetMaxAttached() (err error) {
+	// set SQLITE_MAX_ATTACHED = 125
+	conn, e := me.db.Conn(context.Background())
+	if e != nil {
+		return merrs.NormalError.NewCause(e)
+	}
+	defer conn.Close()
+	sqlite.Limit(conn, sqlite3.SQLITE_MAX_ATTACHED, 125)
+	return nil
+}
+
+func (me *SQLDB) AttacheDB(clsdb *SQLDB, dbname string) error {
+	dbsourcename := clsdb.SourceName()
+	sql := "ATTACH DATABASE '" + dbsourcename + "' as " + dbname
+	logger.Trace(sql)
+	_, e := me.exec(sql)
+	return e
+}
+
+func (me *SQLDB) DBList() (dbs map[string]string, err error) {
+	sql := "PRAGMA database_list"
+	rows, e := me.queryMaps(sql)
+	if e != nil {
+		return nil, merrs.NormalError.NewWith("", []error{e}, merrs.SSMaps{{"sql": sql}}, 1)
+	}
+	dbs = map[string]string{}
+	for _, m := range rows {
+		dbs[cast.ToString(m["name"])] = cast.ToString(m["file"])
+	}
+	return
+}
+
+func (me *SQLDB) InitTable(tabledefine *schema.TableDefine, force bool) (err error) {
+	me.infomutex.Lock()
+	defer me.infomutex.Unlock()
+	status := <-me.chstatus
+	if status == Inited && !force {
+		// 初始化已完成,直接返回
+		me.chstatus <- status
+		return nil
+	}
+	if status == Initing {
+		me.chstatus <- status
+		e := <-me.chiniting // 等待初始化完成
+		me.chiniting <- e
+		return e
+	}
+	<-me.chiniting
+	me.chstatus <- Initing
+	// 操作计数
+	// PushCount(time.Now(), 1, me.keyspace, "renew", tabledefine.TableName)
+	e := me.renewTable(tabledefine)
+	if e != nil {
+		err = e
+		me.chiniting <- err
+		return
+	}
+	logger.Debug("Init LocalDB", me.dbsourcename)
+	// 执行初始化期间产生的更新操作
+	_, e = me.execBufferedCommands()
+	if e != nil {
+		err = merrs.NewError(e)
+		me.chiniting <- err
+		return
+	}
+	me.chiniting <- nil
+	status = <-me.chstatus
+	me.chstatus <- Inited
+	// 执行新产生的更新操作
+	_, e = me.execBufferedCommands()
+	if e != nil {
+		err = merrs.NewError(e)
+		return
+	}
+	return nil
+}
+
+func (me *SQLDB) loadTableInfo() (err error) {
+	rows, e := me.queryMaps("SELECT name FROM sqlite_master WHERE type='table'")
+	if e != nil {
+		return e
+	}
+	if len(rows) < 2 {
+		return merrs.UninitedError.New("uninited")
+	}
+
+	// 通过 sqlite_master 和 PRAGMA table_info 获得的信息有限
+	// 通过自定义的 schema 表,可以获得自己保存的信息
+	rows, e = me.queryMaps("SELECT key,value FROM __table_info__")
+	if e != nil {
+		return e
+	}
+	if len(rows) < 3 {
+		return merrs.UninitedError.New("uninited")
+	}
+	mrows := map[string]any{}
+	for _, row := range rows {
+		mrows[row["key"].(string)] = row["value"]
+	}
+	// tableinfo
+	v_tableinfo := mrows["tableinfo"]
+	if v_tableinfo == nil {
+		return merrs.UninitedError.New("not found tableinfo")
+	}
+	s_tableinfo, ok := v_tableinfo.(string)
+	if !ok {
+		return merrs.UninitedError.New("tableinfo type inconsistent") // 类型不一致
+	}
+	load_tableinfo := &schema.TableInfo{}
+	e = msgpack.Decode([]byte(s_tableinfo), load_tableinfo)
+	if e != nil {
+		err = merrs.NewError(e, merrs.SSMap{"tablename": load_tableinfo.TableName})
+		return
+	}
+	if load_tableinfo.Version != schema.TableInfoVersion {
+		return merrs.UninitedError.New("uninited load_tableinfo.version=" + load_tableinfo.Version)
+	}
+	me.tableinfo = load_tableinfo
+	// datainfo
+	v_datainfo := mrows["datainfo"]
+	if v_datainfo == nil {
+		return merrs.UninitedError.New("not found datainfo")
+	}
+	s_datainfo, ok := v_datainfo.(string)
+	if !ok {
+		return merrs.UninitedError.New("datainfo type inconsistent") // 类型不一致
+	}
+	me.datainfo = &DataInfo{}
+	msgpack.Decode([]byte(s_datainfo), me.datainfo)
+	// statinfo
+	v_statinfo := mrows["statinfo"]
+	if v_statinfo == nil {
+		return merrs.UninitedError.New("not found statinfo")
+	}
+	s_statinfo, ok := v_statinfo.(string)
+	if !ok {
+		return merrs.UninitedError.New("statinfo type inconsistent") // 类型不一致
+	}
+	me.statinfo = &StatInfo{}
+	msgpack.Decode([]byte(s_statinfo), me.statinfo)
+	if me.statinfo.CreateTime.Equal(time.Time{}) {
+		return merrs.UninitedError.New("uninited")
+	}
+	//
+	insertstmt, upsertstmt, e := me.initInsertStmt(false)
+	if e != nil {
+		err = merrs.NewError(e, merrs.SSMap{"tablename": me.tableinfo.TableName})
+		return
+	}
+	me.insertStmt = insertstmt
+	me.upsertStmt = upsertstmt
+	logger.Debug("Load LocalDB", me.dbsourcename)
+	return
+}
+
+func (me *SQLDB) renewTable(tabledefine *schema.TableDefine) (err error) {
+	tableinfo, createfieldtypes := tabledefine.TableInfo()
+	if me.tableinfo != nil {
+		// 简化结构,一个 DB 里只保留一个表或视图,可通过视图关联多个表
+		err = me.dropTable(me.tableinfo.TableName)
+		if err != nil {
+			return
+		}
+	}
+	e := me.newTable(tableinfo, createfieldtypes)
+	if e != nil {
+		err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
+		return
+	}
+	me.tableinfo = tableinfo
+	insertstmt, upsertstmt, e := me.initInsertStmt(true)
+	if e != nil {
+		err = merrs.NewError(e, merrs.SSMap{"tablename": tableinfo.TableName})
+		return
+	}
+	me.insertStmt = insertstmt
+	me.upsertStmt = upsertstmt
+	return
+}
+
+func (me *SQLDB) dropTable(tablename string) error {
+	dropSql := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, tablename)
+	// logger.Tracef("Drop table %s sql: %s", tablename, dropSql)
+	_, e := me.exec(dropSql)
+	if e != nil {
+		return merrs.NewError(e, merrs.SSMap{"tablename": tablename})
+	}
+	return nil
+}
+
+func (me *SQLDB) newTable(tableinfo *schema.TableInfo,
+	createfieldtypes []string,
+) (err error) {
+	tablename := tableinfo.TableName
+	colums := strings.Join(createfieldtypes, `,`)
+	if len(tableinfo.IDKeys) > 0 {
+		colums = colums + `,` + `PRIMARY KEY("` + strings.Join(tableinfo.IDKeys, `","`) + `")`
+	}
+	createSql := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (%s)`, tablename, colums)
+	// logger.Tracef("Create table %s sql: %s", tablename, createSql)
+	if _, e := me.exec(createSql); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	for idxName, idxfields := range tableinfo.Indexes {
+		var indexSql string
+		if len(idxfields) > 1 {
+			indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, strings.Join(idxfields, `", "`))
+		} else {
+			indexSql = fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s on %s ("%s")`, idxName, tablename, idxfields[0])
+		}
+		// logger.Tracef("Create table %s index sql: %s", tablename, indexSql)
+		if _, e := me.exec(indexSql); e != nil {
+			err = merrs.NewError(fmt.Sprintf("sqlite create table index error"), e)
+			return
+		}
+	}
+	if _, e := me.exec(`CREATE TABLE IF NOT EXISTS __table_info__ (key TEXT PRIMARY KEY, value TEXT)`); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	if _, e := me.exec(`DELETE FROM __table_info__`); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	tableinfobs, e := msgpack.Encode(tableinfo)
+	if e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "tableinfo", string(tableinfobs)); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	me.datainfo = &DataInfo{}
+	datainfobs, e := msgpack.Encode(me.datainfo)
+	if e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "datainfo", string(datainfobs)); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	me.statinfo = &StatInfo{
+		CreateTime: time.Now(),
+	}
+	statinfobs, e := msgpack.Encode(me.statinfo)
+	if e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	if _, e := me.exec(`INSERT INTO __table_info__ (key, value) values (?,?)`, "statinfo", string(statinfobs)); e != nil {
+		err = merrs.NewError(fmt.Sprintf("sqlite create table %s error", tablename), e)
+		return
+	}
+	return nil
+}
+
+func (me *SQLDB) initInsertStmt(initvaluetypes bool) (insertstmt, upsertstmt *Stmt, err error) {
+	qmarks := []string{}
+	for range me.tableinfo.FieldNames {
+		qmarks = append(qmarks, "?")
+	}
+	upsertSql := fmt.Sprintf(`BEGIN TRANSACTION;
+DELETE FROM "`+me.tableinfo.TableName+`" WHERE "%s"=?;
+INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s);
+COMMIT;`,
+		strings.Join(me.tableinfo.IDKeys, `"=? and "`), strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
+	upsertstmt, err = me.prepare(upsertSql)
+	if err != nil {
+		return
+	}
+	insertSql := fmt.Sprintf(`INSERT INTO "`+me.tableinfo.TableName+`" ("%s") VALUES (%s)`,
+		strings.Join(me.tableinfo.FieldNames, `","`), strings.Join(qmarks, ","))
+	insertstmt, err = me.prepare(insertSql)
+	if err != nil {
+		return
+	}
+	if initvaluetypes {
+		if len(me.tableinfo.DefValues) != len(me.tableinfo.FieldNames) {
+			err = merrs.NewError(fmt.Sprintf("sqlite prepare insert table %s error", me.tableinfo.TableName))
+			return
+		}
+		params := []any{}
+		for _, k := range me.tableinfo.IDKeys {
+			for i, f := range me.tableinfo.FieldNames {
+				if f == k {
+					params = append(params, me.tableinfo.DefValues[i])
+				}
+			}
+		}
+		params = append(params, me.tableinfo.DefValues...)
+		// 首次插入数据的类型决定该字段的实际存储类型
+		_, err = insertstmt.Exec(params...)
+		if err != nil {
+			return
+		}
+		_, err = me.delete(nil)
+		if err != nil {
+			return
+		}
+	}
+	return
+}
+
+func (me *SQLDB) setLastQueryTime() {
+	me.statinfo.LastQueryTime = time.Now()
+	me.reorg_proc.Lock()
+	defer me.reorg_proc.Unlock()
+	me.reorg_proc.statinfo_update = me.updateStatInfo
+	me.reorg()
+}
+
+func (me *SQLDB) setLastUpdateTime() {
+	me.statinfo.LastUpdateTime = time.Now()
+	me.reorg_proc.Lock()
+	defer me.reorg_proc.Unlock()
+	me.reorg_proc.statinfo_update = me.updateStatInfo
+	me.reorg()
+}
+
+func (me *SQLDB) updateStatInfo() {
+	statinfobs, e := msgpack.Encode(me.statinfo)
+	if e != nil {
+		logger.Error("UpdateStatInfo Error", e)
+	}
+	sql := "UPDATE __table_info__ SET value=? WHERE key=?"
+	_, e = me.exec(sql, "statinfo", string(statinfobs))
+	if e != nil {
+		logger.Error("UpdateStatInfo Error", e)
+	}
+}
+
+func (me *SQLDB) clearExpireData() {
+	// Format("2006-01-02 15:04:05")
+	deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE `+me.tableinfo.LatField+`<?`, me.tableinfo.TableName)
+	n, e := me.exec(deleteSql, time.Now().Add(-me.tableinfo.Ttl).UnixMilli())
+	if e != nil {
+		logger.Error("ClearExpireData Error", e)
+	}
+	logger.Debug("ClearExpireData", n)
+}
+
+func (me *SQLDB) ClearExpireData() {
+	if me.tableinfo.Ttl > 0 {
+		me.reorg_proc.Lock()
+		defer me.reorg_proc.Unlock()
+		me.reorg_proc.expire_clear = me.clearExpireData
+		me.reorg()
+	}
+}
+
+func (me *SQLDB) reorg() {
+	me.reorg_rc.CallLast2Only(func() {
+		me.reorg_proc.Lock()
+		datainfo_update := me.reorg_proc.datainfo_update
+		statinfo_update := me.reorg_proc.statinfo_update
+		expire_clear := me.reorg_proc.expire_clear
+		me.reorg_proc.datainfo_update = nil
+		me.reorg_proc.statinfo_update = nil
+		me.reorg_proc.expire_clear = nil
+		me.reorg_proc.Unlock()
+		if datainfo_update != nil {
+			datainfo_update()
+		}
+		if statinfo_update != nil {
+			statinfo_update()
+		}
+		if expire_clear != nil {
+			expire_clear()
+		}
+		time.Sleep(1 * time.Second)
+	})
+}
+
+func (me *SQLDB) execBufferedCommands() (count int, err error) {
+
+	for {
+		var uc *Command
+		me.bufferedCommandsMutex.Lock()
+		if len(me.bufferedCommands) > 0 {
+			uc = me.bufferedCommands[0]
+			me.bufferedCommands = me.bufferedCommands[1:]
+		}
+		me.bufferedCommandsMutex.Unlock()
+		if uc == nil {
+			return
+		}
+		n, e := me.execCommand(uc)
+		if e != nil {
+			err = e
+			if me.Status() != Inited {
+				// 重新初始化
+				me.bufferedCommandsMutex.Lock()
+				me.bufferedCommands = append(me.bufferedCommands, uc)
+				me.bufferedCommandsMutex.Unlock()
+				// err = nil
+				err = merrs.NewError(merrs.SSMaps{
+					{
+						"me.Status()": fmt.Sprint(me.Status()),
+					}}, e)
+				logger.Error(err)
+				err = nil
+			}
+			return
+		}
+		count += n
+	}
+}
+
+func (me *SQLDB) pushCommand(uc *Command) (n int, err error) {
+	select {
+	case status := <-me.chstatus:
+		me.chstatus <- status
+		switch status {
+		case Uninited, Initing:
+			me.bufferedCommandsMutex.Lock()
+			me.bufferedCommands = append(me.bufferedCommands, uc)
+			me.bufferedCommandsMutex.Unlock()
+			return 1, nil
+		case Inited:
+			me.bufferedCommandsMutex.Lock()
+			me.bufferedCommands = append(me.bufferedCommands, uc)
+			me.bufferedCommandsMutex.Unlock()
+			n, err = me.execBufferedCommands()
+			return
+		case Closed:
+			return
+		}
+	}
+	return
+}
+
+func (me *SQLDB) execCommand(uc *Command) (n int, err error) {
+	switch uc.CmdType {
+	case UCTInsert:
+		return me.insert(uc.Data, uc.OverwriteExists)
+	case UCTUpdate:
+		return me.update(uc.Data, uc.Conds)
+	case UCTDelete:
+		return me.delete(uc.Conds)
+	}
+	panic(merrs.NewError(fmt.Sprint("unsupport UpdateCommandType ", uc.CmdType), nil))
+}
+
+func (me *SQLDB) Prepare(sql string) (*Stmt, error) {
+	return me.prepare(sql)
+}
+
+func (me *SQLDB) prepare(sql string) (*Stmt, error) {
+	stmt, e := me.db.Prepare(sql)
+	if e != nil {
+		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}})
+	}
+	return &Stmt{Stmt: stmt}, nil
+}
+
+func (me *SQLDB) exec(sql string, args ...any) (n int, err error) {
+	rslt, e := me.db.Exec(sql, args...)
+	if e != nil {
+		return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
+	}
+	if rslt == nil {
+		return 0, merrs.NewError("no result", merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
+	}
+	rn, e := rslt.RowsAffected()
+	if e != nil {
+		return 0, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
+	}
+	n = int(rn)
+	return
+}
+
+func (me *SQLDB) Select(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
+	var status Status
+	select {
+	case status = <-me.chstatus:
+		me.chstatus <- status
+		if status == Closed {
+			return nil, merrs.ClosedError.New("closed")
+		}
+		if status == Uninited || status == Initing {
+			return nil, merrs.UninitedError.New("uninited")
+		}
+	}
+	iter, err = me.query(ctx, sql, args...)
+	me.setLastQueryTime()
+	// PushCount(time.Now(), 1, me.keyspace, "select", me.tableinfo.TableName)
+	return
+}
+
+func (me *SQLDB) queryMaps(sql string, args ...any) (rows []map[string]any, err error) {
+	iter, e := me.query(nil, sql, args...)
+	if e != nil {
+		return nil, e
+	}
+	return iter.AllMaps()
+}
+
+func (me *SQLDB) query(ctx context.Context, sql string, args ...any) (iter *Iter, err error) {
+	defer func() {
+		x := recover()
+		if x != nil {
+			switch xv := x.(type) {
+			case error:
+				err = xv
+			default:
+				err = merrs.NewError(fmt.Sprint(x))
+			}
+		}
+	}()
+	if ctx == nil {
+		ctx = context.Background()
+	}
+	rows, e := me.db.QueryContext(ctx, sql, args...)
+	if e != nil {
+		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
+	}
+	coltypes, e := rows.ColumnTypes()
+	if e != nil {
+		return nil, merrs.NewError(e, merrs.SSMaps{{"sql": sql}, {"args": fmt.Sprintf("%v", args)}})
+	}
+	iter = &Iter{ctx: ctx, rows: rows, cols: coltypes}
+	return
+}
+
+func (me *SQLDB) ids(data map[string]any) (idmap map[string]any) {
+	idmap = map[string]any{}
+	for _, k := range me.tableinfo.IDKeys {
+		idmap[k] = data[k]
+	}
+	return
+}
+
+func (me *SQLDB) exists(conds map[string]any) (exists bool, err error) {
+	values := []any{}
+	existsSql := fmt.Sprintf(`SELECT * FROM %s`, me.tableinfo.TableName)
+	if len(conds) > 0 {
+		existsSql += " WHERE "
+		keys := []string{}
+		for k, v := range conds {
+			keys = append(keys, k+"=?")
+			values = append(values, v)
+		}
+		existsSql += strings.Join(keys, " and ")
+	}
+	existsSql += " LIMIT 1"
+	logger.Tracef("Exists sql: %s, params=%v", existsSql, values)
+	iter, e := me.Select(nil, existsSql, values...)
+	if e != nil {
+		return false, merrs.NewError(e)
+	}
+	defer iter.Close()
+	if iter.rows.Next() {
+		return true, nil
+	}
+	return false, nil
+}
+
+func (me *SQLDB) Insert(
+	data map[string]any,
+	overwriteexists bool,
+) (
+	n int,
+	err error,
+) {
+	return me.pushCommand(&Command{CmdType: UCTInsert, Data: data, OverwriteExists: overwriteexists})
+}
+
+func (me *SQLDB) insert(
+	data map[string]any,
+	overwriteexists bool,
+) (
+	n int,
+	err error,
+) {
+	encodedata := map[string]any{}
+	for _, field := range me.tableinfo.FieldNames {
+		ftype := me.tableinfo.MapNameFields[field].Type
+		v, _ := data[field]
+		if field == me.tableinfo.LatField {
+			v = time.Now().UnixMilli()
+		} else {
+			v = SQLValueEncode(ftype, v)
+		}
+		encodedata[field] = v
+	}
+	values := []interface{}{}
+	upsertStmt := me.insertStmt
+	if overwriteexists {
+		// 覆盖已有数据,需要指定主键值
+		upsertStmt = me.upsertStmt
+		for _, field := range me.tableinfo.IDKeys {
+			values = append(values, encodedata[field])
+		}
+	}
+	for _, field := range me.tableinfo.FieldNames {
+		values = append(values, encodedata[field])
+	}
+	logger.Tracef("Insert %s data: values=%v", me.tableinfo.TableName, values)
+	rslt, err := upsertStmt.Exec(values...)
+	if err != nil {
+		if strings.Index(err.Error(), "UNIQUE constraint") >= 0 {
+			return 0, merrs.ExistError.NewError(err,
+				merrs.SSMaps{
+					{"TableName": me.tableinfo.TableName},
+					{"Values": fmt.Sprint(values)},
+					{"Status": fmt.Sprint(me.Status())},
+				})
+		}
+		return 0, merrs.NewError(err,
+			merrs.SSMaps{
+				{"TableName": me.tableinfo.TableName},
+				{"Values": fmt.Sprint(values)},
+				{"Status": fmt.Sprint(me.Status())},
+			})
+	}
+	if rslt == nil {
+		return 0, merrs.NewError(fmt.Sprintf("sqlite insert table %s no result", me.tableinfo.TableName))
+	}
+	rn, err := rslt.RowsAffected()
+	if err != nil {
+		return 0, merrs.NewError(err,
+			merrs.SSMaps{
+				{"TableName": me.tableinfo.TableName},
+				{"Values": fmt.Sprint(values)},
+				{"Status": fmt.Sprint(me.Status())},
+			})
+	}
+	n = int(rn)
+	if n > 0 {
+		me.setLastUpdateTime()
+	}
+	// PushCount(time.Now(), n, me.keyspace, "insert", me.tableinfo.TableName)
+	return
+}
+
+func (me *SQLDB) Update(
+	data map[string]any,
+	conds map[string]any,
+) (
+	n int,
+	err error,
+) {
+	return me.pushCommand(&Command{CmdType: UCTUpdate, Data: data, Conds: conds})
+}
+
+func (me *SQLDB) update(
+	data map[string]any,
+	conds map[string]any,
+) (
+	n int,
+	err error,
+) {
+	updatefields := []string{}
+	condfields := []string{}
+	values := []interface{}{}
+	for k, v := range data {
+		updatefields = append(updatefields, k)
+		values = append(values, v)
+	}
+	updatefields = append(updatefields, me.tableinfo.LatField)
+	values = append(values, time.Now().UnixMilli())
+	for k, v := range conds {
+		condfields = append(condfields, k)
+		values = append(values, v)
+	}
+	if len(values) == 0 {
+		return 0, nil
+	}
+	updateSql := fmt.Sprintf(`UPDATE %s SET "%s"=?`, me.tableinfo.TableName, strings.Join(updatefields, `"=?,"`))
+	if len(condfields) > 0 {
+		updateSql += fmt.Sprintf(` WHERE "%s"=?`, strings.Join(condfields, `"=? and "`))
+	}
+	logger.Tracef("Update sql: %s, params=%v", updateSql, values)
+	n, err = me.exec(updateSql, values...)
+	if err != nil {
+		return 0, merrs.NewError(fmt.Sprintf("sqlite update table %s error", me.tableinfo.TableName), err)
+	}
+	if n > 0 {
+		me.setLastUpdateTime()
+	}
+	// PushCount(time.Now(), n, me.keyspace, "update", me.tableinfo.TableName)
+	return
+}
+
+func (me *SQLDB) Delete(
+	conds map[string]any,
+) (
+	n int,
+	err error,
+) {
+	return me.pushCommand(&Command{CmdType: UCTDelete, Conds: conds})
+}
+
+func (me *SQLDB) delete(
+	conds map[string]any,
+) (
+	n int,
+	err error,
+) {
+	values := []interface{}{}
+	deleteSql := fmt.Sprintf(`DELETE FROM %s`, me.tableinfo.TableName)
+	if len(conds) > 0 {
+		fields := []string{}
+		for k, v := range conds {
+			fields = append(fields, `"`+k+`"=?`)
+			values = append(values, v)
+		}
+		deleteSql += " WHERE " + strings.Join(fields, " and ")
+	}
+	logger.Tracef("Delete sql: %s, params=%v", deleteSql, values)
+	n, err = me.exec(deleteSql, values...)
+	if err != nil {
+		return 0, merrs.NewError(fmt.Sprintf("sqlite delete table %s error", me.tableinfo.TableName), err)
+	}
+	if n > 0 {
+		me.setLastUpdateTime()
+	}
+	// PushCount(time.Now(), n, me.keyspace, "delete", me.tableinfo.TableName)
+	return
+}

+ 57 - 0
sqlite/sqltype.go

@@ -0,0 +1,57 @@
+package sqlite
+
+import "time"
+
+func SQLType(ftype string) string {
+	switch ftype {
+	case "map", "set", "list":
+		return "blob"
+	case "bucket":
+		return "blob"
+	case "relation":
+		return "blob"
+	}
+	return ftype
+}
+
+func defaultvalue(ftype string) any {
+	switch ftype {
+	case "text":
+		return ""
+	case "varchar":
+		return ""
+	case "string":
+		return ""
+	case "bool":
+		return false
+	case "boolean":
+		return false
+	case "int":
+		return 0
+	case "smallint":
+		return int16(0)
+	case "enum":
+		return int16(0)
+	case "bigint":
+		return int64(0)
+	case "float":
+		return float32(0)
+	case "double":
+		return float64(0)
+	case "date":
+		return "1970-01-01"
+	case "timestamp":
+		return time.Time{}.UnixNano() / 1e6
+	case "map":
+		return []byte{}
+	case "set":
+		return []byte{}
+	case "list":
+		return []byte{}
+	case "bucket":
+		return []byte{}
+	case "relation":
+		return []byte{}
+	}
+	return nil
+}

+ 306 - 0
sqlite/sqlvalue.go

@@ -0,0 +1,306 @@
+package sqlite
+
+import (
+	"fmt"
+	"strings"
+
+	"git.wecise.com/wecise/util/msgpack"
+	"github.com/spf13/cast"
+)
+
+func BlobMarshal(v interface{}) (s string) {
+	bs, e := msgpack.Encode(v)
+	if e != nil {
+		t, e := cast.ToStringE(v)
+		if e != nil {
+			s = fmt.Sprintf("%#v", v)
+		}
+		s = t
+	} else {
+		s = string(bs)
+	}
+	return
+}
+
+func SliceTypeRecognize(tx []interface{}) (v interface{}, t string) {
+	t = ""
+	for _, a := range tx {
+		switch a.(type) {
+		case string:
+			if t == "" {
+				t = "string"
+			}
+			if t != "string" {
+				t = ""
+				break
+			}
+		case int:
+			if t == "" {
+				t = "int"
+			}
+			if t != "int" {
+				t = ""
+				break
+			}
+		}
+	}
+	switch t {
+	case "string":
+		tss := make([]string, len(tx))
+		for i, a := range tx {
+			tss[i] = cast.ToString(a)
+		}
+		v = tss
+	case "int":
+		tis := make([]int, len(tx))
+		for i, a := range tx {
+			tis[i] = cast.ToInt(a)
+		}
+		v = tis
+	default:
+		v = tx
+	}
+	return
+}
+
+func MapTypeRecognize(tx map[string]interface{}) (v interface{}, t string) {
+	t = ""
+	tvs := map[string]interface{}{}
+	for k, a := range tx {
+		switch ta := a.(type) {
+		case []interface{}:
+			av, at := SliceTypeRecognize(ta)
+			if t == "" {
+				t = "[]" + at
+			}
+			if t != "[]"+at {
+				t = ""
+				break
+			}
+			tvs[k] = av
+		case []string:
+			if t == "" {
+				t = "[]string"
+			}
+			if t != "[]string" {
+				t = ""
+				break
+			}
+			tvs[k] = ta
+		case []int:
+			if t == "" {
+				t = "[]int"
+			}
+			if t != "[]int" {
+				t = ""
+				break
+			}
+			tvs[k] = ta
+		case string:
+			if t == "" {
+				t = "string"
+			}
+			if t != "string" {
+				t = ""
+				break
+			}
+			tvs[k] = ta
+		case int:
+			if t == "" {
+				t = "int"
+			}
+			if t != "int" {
+				t = ""
+				break
+			}
+			tvs[k] = ta
+		}
+	}
+	switch t {
+	case "[]string":
+		tss := make(map[string][]string, len(tx))
+		for i, a := range tvs {
+			tss[i] = cast.ToStringSlice(a)
+		}
+		v = tss
+	case "[]int":
+		tis := make(map[string][]int, len(tx))
+		for i, a := range tvs {
+			tis[i] = cast.ToIntSlice(a)
+		}
+		v = tis
+	case "string":
+		tss := make(map[string]string, len(tx))
+		for i, a := range tvs {
+			tss[i] = cast.ToString(a)
+		}
+		v = tss
+	case "int":
+		tis := make(map[string]int, len(tx))
+		for i, a := range tvs {
+			tis[i] = cast.ToInt(a)
+		}
+		v = tis
+	default:
+		v = tx
+	}
+	return
+}
+
+func MapsTypeRecognize(tx []map[string]interface{}) (v interface{}, t string) {
+	t = ""
+	tvs := []interface{}{}
+	for _, x := range tx {
+		av, at := MapTypeRecognize(x)
+		if t == "" {
+			t = at
+		}
+		if t != at {
+			t = ""
+			break
+		}
+		tvs = append(tvs, av)
+	}
+	switch t {
+	case "[]string":
+		tss := make([]map[string][]string, len(tx))
+		for i, a := range tvs {
+			tss[i] = a.(map[string][]string)
+		}
+		v = tss
+	case "[]int":
+		tis := make([]map[string][]int, len(tx))
+		for i, a := range tvs {
+			tis[i] = a.(map[string][]int)
+		}
+		v = tis
+	case "string":
+		tss := make([]map[string]string, len(tx))
+		for i, a := range tvs {
+			tss[i] = a.(map[string]string)
+		}
+		v = tss
+	case "int":
+		tis := make([]map[string]int, len(tx))
+		for i, a := range tvs {
+			tis[i] = a.(map[string]int)
+		}
+		v = tis
+	default:
+		v = tx
+	}
+	return
+}
+
+func BlobRecognize(x interface{}) (v interface{}, t string) {
+	v = x
+	switch tx := x.(type) {
+	case []map[string]interface{}:
+		v, t = MapsTypeRecognize(tx)
+	case map[string]interface{}:
+		v, t = MapTypeRecognize(tx)
+	case []interface{}:
+		v, t = SliceTypeRecognize(tx)
+	case *interface{}:
+		v, t = BlobRecognize(*tx)
+	default:
+		v = tx
+	}
+	return
+}
+
+func BlobUnmarshal(s string, x interface{}) (v interface{}) {
+	e := msgpack.Decode([]byte(s), x)
+	if e != nil {
+		return s
+	}
+	v = x
+	return
+}
+
+// 从 sqlite 读出的数据 到 go 语言中
+func SQLValueDecode(ftype_sampledata, value interface{}) (v interface{}) {
+	v = value
+	switch ftype := ftype_sampledata.(type) {
+	case string:
+		switch ftype {
+		case "BOOL", "BOOLEAN":
+			v = cast.ToBool(v)
+		case "BLOB":
+			s := cast.ToString(v)
+			var x interface{}
+			v = BlobUnmarshal(s, &x)
+			v, _ = BlobRecognize(v)
+		case "INT", "INTEGER":
+			switch tv := v.(type) {
+			case int8:
+				v = int(tv)
+			case int16:
+				v = int(tv)
+			case int32:
+				v = int(tv)
+			case int64:
+				v = int(tv)
+			case uint:
+				v = int(tv)
+			case uint8:
+				v = int(tv)
+			case uint16:
+				v = int(tv)
+			case uint32:
+				v = int(tv)
+			case uint64:
+				v = int(tv)
+			}
+		case "DATE":
+			// 兼容cassandra输出
+			v = cast.ToTime(v)
+		case "TIME", "DATETIME", "TIMESTAMP":
+			v = NSTimeStampValue(v)
+		}
+		return
+	}
+	s := cast.ToString(v)
+	v = BlobUnmarshal(s, ftype_sampledata)
+	return
+}
+
+// 从 go 语言数据 到 sqlite 数据库中
+func SQLValueEncode(ftype string, v interface{}) interface{} {
+	if v == nil {
+		return nil
+	}
+	switch strings.ToLower(ftype) {
+	case "int", "integer":
+		return cast.ToInt(v)
+	case "long", "bigint", "int64":
+		return cast.ToInt64(v)
+	case "int32":
+		return cast.ToInt32(v)
+	case "enum", "smallint", "int16":
+		return cast.ToInt16(v)
+	case "double", "float64":
+		return cast.ToFloat64(v)
+	case "float", "float32":
+		return cast.ToFloat32(v)
+	case "bool", "boolean":
+		return cast.ToBool(v)
+	case "string", "text", "varchar", "clob":
+		return cast.ToString(v)
+	case "date":
+		return MSTimeStampValue(v).Format("2006-01-02")
+	case "time", "datetime", "timestamp":
+		return MSTimeStampValue(v).UnixNano()
+	case "relation":
+		sv := BlobMarshal(v)
+		if sv == "{}" {
+			// 兼容cassandra输入
+			sv = "null"
+		}
+		return sv
+	case "blob", "map", "list", "set", "bucket":
+		fallthrough
+	default:
+		return BlobMarshal(v)
+	}
+}

+ 7 - 0
sqlite/stmt.go

@@ -0,0 +1,7 @@
+package sqlite
+
+import "database/sql"
+
+type Stmt struct {
+	*sql.Stmt
+}

+ 14 - 0
sqlite/tableinfo.go

@@ -0,0 +1,14 @@
+package sqlite
+
+import (
+	"time"
+)
+
+type DataInfo struct {
+}
+
+type StatInfo struct {
+	CreateTime     time.Time
+	LastQueryTime  time.Time
+	LastUpdateTime time.Time
+}

+ 79 - 0
sqlite/timestamp.go

@@ -0,0 +1,79 @@
+package sqlite
+
+import (
+	"time"
+
+	"github.com/spf13/cast"
+)
+
+// 输入数字为毫秒值或文本或time.Time
+func MSTimeStampValue(v interface{}) time.Time {
+	var ms int64
+	switch tv := v.(type) {
+	case time.Time:
+		return tv
+	case float32:
+		ms = int64(tv)
+	case float64:
+		ms = int64(tv)
+	case int:
+		ms = int64(tv)
+	case int8:
+		ms = int64(tv)
+	case int16:
+		ms = int64(tv)
+	case int32:
+		ms = int64(tv)
+	case int64:
+		ms = tv
+	case uint:
+		ms = int64(tv)
+	case uint8:
+		ms = int64(tv)
+	case uint16:
+		ms = int64(tv)
+	case uint32:
+		ms = int64(tv)
+	case uint64:
+		ms = int64(tv)
+	default:
+		return cast.ToTimeInDefaultLocation(v, time.Local)
+	}
+	return time.Unix(0, ms*1000000)
+}
+
+// 输入数字为纳秒值或文本或time.Time
+func NSTimeStampValue(v interface{}) time.Time {
+	var ns int64
+	switch tv := v.(type) {
+	case time.Time:
+		return tv
+	case float32:
+		ns = int64(tv)
+	case float64:
+		ns = int64(tv)
+	case int:
+		ns = int64(tv)
+	case int8:
+		ns = int64(tv)
+	case int16:
+		ns = int64(tv)
+	case int32:
+		ns = int64(tv)
+	case int64:
+		ns = tv
+	case uint:
+		ns = int64(tv)
+	case uint8:
+		ns = int64(tv)
+	case uint16:
+		ns = int64(tv)
+	case uint32:
+		ns = int64(tv)
+	case uint64:
+		ns = int64(tv)
+	default:
+		return cast.ToTimeInDefaultLocation(v, time.Local)
+	}
+	return time.Unix(0, ns)
+}