Under certain circumstances, the retention service can fail to delete shards from the store in a timely manner. When the shard groups are pruned based on age, this leaves orphaned shard files on the disk. The retention service will then not attempt to remove the obsolete shard files because the meta store does not know about them. This can cause excessive disk space usage for some users. This corrects that by requiring shards files be deleted before they can be removed from the meta store. fixes: #24529 (cherry picked from commitpull/24599/head v2.7.57bd3f89d18
) closes https://github.com/influxdata/influxdb/issues/24545 Co-authored-by: Geoffrey Wossum <gwossum@influxdata.com> (cherry picked from commit0dc48b1260
) closes https://github.com/influxdata/influxdb/issues/24546
parent
c169e98f30
commit
09a9607fd9
28
go.mod
28
go.mod
|
@ -63,16 +63,17 @@ require (
|
|||
go.etcd.io/bbolt v1.3.6
|
||||
go.uber.org/multierr v1.6.0
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/crypto v0.14.0
|
||||
golang.org/x/sync v0.4.0
|
||||
golang.org/x/sys v0.13.0
|
||||
golang.org/x/text v0.13.0
|
||||
golang.org/x/crypto v0.16.0
|
||||
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611
|
||||
golang.org/x/sync v0.5.0
|
||||
golang.org/x/sys v0.15.0
|
||||
golang.org/x/text v0.14.0
|
||||
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
|
||||
golang.org/x/tools v0.14.1-0.20231011210224-b9b97d982b0a
|
||||
golang.org/x/tools v0.16.0
|
||||
google.golang.org/protobuf v1.30.0
|
||||
gopkg.in/yaml.v2 v2.4.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
honnef.co/go/tools v0.4.0
|
||||
honnef.co/go/tools v0.4.6
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -137,7 +138,7 @@ require (
|
|||
github.com/fsnotify/fsnotify v1.5.4 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.0 // indirect
|
||||
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect
|
||||
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493 // indirect
|
||||
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect
|
||||
github.com/go-sql-driver/mysql v1.6.0 // indirect
|
||||
github.com/goccy/go-json v0.9.11 // indirect
|
||||
github.com/gofrs/uuid v3.3.0+incompatible // indirect
|
||||
|
@ -152,9 +153,10 @@ require (
|
|||
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
|
||||
github.com/hashicorp/go-hclog v0.12.2 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-retryablehttp v0.6.4 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.0 // indirect
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
|
||||
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/hashicorp/vault/sdk v0.1.8 // indirect
|
||||
|
@ -178,10 +180,11 @@ require (
|
|||
github.com/mattn/go-colorable v0.1.12 // indirect
|
||||
github.com/mattn/go-ieproxy v0.0.1 // indirect
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect
|
||||
github.com/miekg/dns v1.1.29 // indirect
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
|
||||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.1.2 // indirect
|
||||
github.com/mitchellh/mapstructure v1.2.2 // indirect
|
||||
github.com/moby/patternmatcher v0.5.0 // indirect
|
||||
github.com/moby/sys/sequential v0.5.0 // indirect
|
||||
github.com/moby/term v0.0.0-20221128092401-c43b287e0e0f // indirect
|
||||
|
@ -217,12 +220,11 @@ require (
|
|||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.uber.org/atomic v1.7.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 // indirect
|
||||
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
|
||||
golang.org/x/mod v0.13.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/mod v0.14.0 // indirect
|
||||
golang.org/x/net v0.19.0 // indirect
|
||||
golang.org/x/oauth2 v0.7.0 // indirect
|
||||
golang.org/x/term v0.13.0 // indirect
|
||||
golang.org/x/term v0.15.0 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
|
||||
gonum.org/v1/gonum v0.11.0 // indirect
|
||||
google.golang.org/api v0.114.0 // indirect
|
||||
|
|
61
go.sum
61
go.sum
|
@ -321,8 +321,8 @@ github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW
|
|||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4=
|
||||
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
|
||||
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493 h1:OTanQnFt0bi5iLFSdbEVA/idR6Q2WhCm+deb7ir2CcM=
|
||||
github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
|
||||
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 h1:gclg6gY70GLy3PbkQ1AERPfmLMMagS60DKF78eWwLn8=
|
||||
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24=
|
||||
github.com/go-chi/chi v4.1.0+incompatible h1:ETj3cggsVIY2Xao5ExCu6YhEh5MD6JTfcBzS37R260w=
|
||||
github.com/go-chi/chi v4.1.0+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ=
|
||||
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
|
||||
|
@ -496,8 +496,9 @@ github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVo
|
|||
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
|
||||
github.com/hashicorp/go-hclog v0.0.0-20180709165350-ff2cf002a8dd/go.mod h1:9bjs9uLqI8l75knNv3lV1kA55veR+WUPSiKIWcQHudI=
|
||||
github.com/hashicorp/go-hclog v0.8.0/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
|
||||
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
|
||||
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
|
||||
github.com/hashicorp/go-hclog v0.12.2 h1:F1fdYblUEsxKiailtkhCCG2g4bipEgaHiDc8vffNpD4=
|
||||
github.com/hashicorp/go-hclog v0.12.2/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ=
|
||||
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
|
||||
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
|
||||
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
|
||||
|
@ -507,8 +508,9 @@ github.com/hashicorp/go-plugin v1.0.0/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn
|
|||
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
|
||||
github.com/hashicorp/go-retryablehttp v0.6.4 h1:BbgctKO892xEyOXnGiaAwIoSq1QZ/SS4AhjoAh9DnfY=
|
||||
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
|
||||
github.com/hashicorp/go-rootcerts v1.0.0 h1:Rqb66Oo1X/eSV1x66xbDccZjhJigjg0+e82kpwzSwCI=
|
||||
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
|
||||
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
|
||||
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
|
||||
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
|
||||
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
|
||||
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
|
||||
|
@ -661,6 +663,7 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
|
|||
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
|
||||
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
|
||||
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
|
||||
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
|
||||
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
|
@ -678,7 +681,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfr
|
|||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4=
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||
github.com/miekg/dns v1.1.25 h1:dFwPR6SfLtrSwgDcIq2bcU/gVutB4sNApq2HBdqcakg=
|
||||
github.com/miekg/dns v1.1.29 h1:xHBEhR+t5RzcFJjBLJlax2daXOrTYtr9z4WdKEfWFzg=
|
||||
github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
|
||||
github.com/mileusna/useragent v0.0.0-20190129205925-3e331f0949a5 h1:pXqZHmHOz6LN+zbbUgqyGgAWRnnZEI40IzG3tMsXcSI=
|
||||
github.com/mileusna/useragent v0.0.0-20190129205925-3e331f0949a5/go.mod h1:JWhYAp2EXqUtsxTKdeGlY8Wp44M7VxThC9FEoNGi2IE=
|
||||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
|
||||
|
@ -696,8 +700,9 @@ github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUb
|
|||
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
|
||||
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
|
||||
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
github.com/mitchellh/mapstructure v1.2.2 h1:dxe5oCinTXiTIcfgmZecdCzPmAJKd46KsCWc35r0TV4=
|
||||
github.com/mitchellh/mapstructure v1.2.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
|
||||
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
|
||||
github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae h1:mQO+oxi0kpii/TX+ltfTCFuYkOjEn53JhaOObiMuvnk=
|
||||
github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae/go.mod h1:Iym28+kJVnC1hfQvv5MUtI6AiFFzvQjHcvI4RFTG/04=
|
||||
|
@ -1010,8 +1015,8 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
|
|||
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
|
||||
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
|
||||
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
|
@ -1028,8 +1033,8 @@ golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u0
|
|||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps=
|
||||
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw=
|
||||
golang.org/x/exp v0.0.0-20220827204233-334a2380cb91/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE=
|
||||
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4=
|
||||
golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
|
||||
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a h1:Jw5wfR+h9mnIYH+OtGT2im5wV1YGGDora5vTv/aa5bE=
|
||||
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk=
|
||||
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
||||
|
@ -1070,8 +1075,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
|||
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
|
||||
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
|
||||
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
|
||||
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
|
||||
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
|
@ -1093,6 +1098,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
|
|||
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
|
@ -1126,8 +1132,8 @@ golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qx
|
|||
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
|
||||
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
|
@ -1154,8 +1160,8 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
|
|||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
|
||||
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
|
||||
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -1180,7 +1186,9 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191115151921-52ab43148777/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -1241,14 +1249,14 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
|
||||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
|
||||
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210503060354-a79de5458b56/go.mod h1:tfny5GFUkzUvx4ps4ajbZsCe5lw1metzhBm9T3x7oIY=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek=
|
||||
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
|
||||
golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4=
|
||||
golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
|
@ -1259,8 +1267,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
|||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
|
@ -1298,6 +1306,7 @@ golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtn
|
|||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
|
@ -1333,8 +1342,8 @@ golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
|||
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
|
||||
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
|
||||
golang.org/x/tools v0.14.1-0.20231011210224-b9b97d982b0a h1:4M9bLuM4viZiz37z5SxTCpxhKOD1KNfGgijIjIt+6WQ=
|
||||
golang.org/x/tools v0.14.1-0.20231011210224-b9b97d982b0a/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
|
||||
golang.org/x/tools v0.16.0 h1:GO788SKMRunPIBCXiQyo2AaexLstOrVhuAL5YwsckQM=
|
||||
golang.org/x/tools v0.16.0/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
@ -1531,8 +1540,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt
|
|||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
|
||||
honnef.co/go/tools v0.4.0 h1:lyXVV1c8wUBJRKqI8JgIpT8TW1VDagfYYaxbKa/HoL8=
|
||||
honnef.co/go/tools v0.4.0/go.mod h1:36ZgoUOrqOk1GxwHhyryEkq8FQWkUO2xGuSMhUCcdvA=
|
||||
honnef.co/go/tools v0.4.6 h1:oFEHCKeID7to/3autwsWfnuv69j3NsfcXbvJKuIcep8=
|
||||
honnef.co/go/tools v0.4.6/go.mod h1:+rnGS1THNh8zMwnd2oVOTL9QF6vmfyG6ZXBULae2uc0=
|
||||
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
|
||||
|
|
|
@ -80,6 +80,7 @@ type MetaClient interface {
|
|||
DropDatabase(name string) error
|
||||
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
Database(name string) (di *meta.DatabaseInfo)
|
||||
DropShard(id uint64) error
|
||||
Databases() []meta.DatabaseInfo
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
PrecreateShardGroups(now, cutoff time.Time) error
|
||||
|
@ -142,6 +143,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
|
|||
e.retentionService = retention.NewService(c.RetentionService)
|
||||
e.retentionService.TSDBStore = e.tsdbStore
|
||||
e.retentionService.MetaClient = e.metaClient
|
||||
e.retentionService.DropShardMetaRef = retention.OSSDropShardMetaRef(e.MetaClient())
|
||||
|
||||
e.precreatorService = precreator.NewService(c.PrecreatorConfig)
|
||||
e.precreatorService.MetaClient = e.metaClient
|
||||
|
|
|
@ -668,24 +668,11 @@ func (c *Client) TruncateShardGroups(t time.Time) error {
|
|||
|
||||
// PruneShardGroups remove deleted shard groups from the data store.
|
||||
func (c *Client) PruneShardGroups() error {
|
||||
var changed bool
|
||||
expiration := time.Now().Add(ShardGroupDeletedExpiration)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
data := c.cacheData.Clone()
|
||||
for i, d := range data.Databases {
|
||||
for j, rp := range d.RetentionPolicies {
|
||||
var remainingShardGroups []ShardGroupInfo
|
||||
for _, sgi := range rp.ShardGroups {
|
||||
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) {
|
||||
remainingShardGroups = append(remainingShardGroups, sgi)
|
||||
continue
|
||||
}
|
||||
changed = true
|
||||
}
|
||||
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
|
||||
}
|
||||
}
|
||||
changed := data.PruneShardGroups(expiration)
|
||||
if changed {
|
||||
return c.commit(data)
|
||||
}
|
||||
|
|
|
@ -1074,7 +1074,9 @@ func TestMetaClient_PruneShardGroups(t *testing.T) {
|
|||
|
||||
data := c.Data()
|
||||
data.Databases[1].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration
|
||||
data.Databases[1].RetentionPolicies[0].ShardGroups[0].Shards = nil
|
||||
data.Databases[1].RetentionPolicies[0].ShardGroups[1].DeletedAt = expiration
|
||||
data.Databases[1].RetentionPolicies[0].ShardGroups[1].Shards = nil
|
||||
|
||||
if err := c.SetData(&data); err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
|
@ -292,8 +292,11 @@ func (data *Data) DropShard(id uint64) {
|
|||
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)
|
||||
|
||||
if len(shards) == 1 {
|
||||
// We just deleted the last shard in the shard group.
|
||||
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
|
||||
// We just deleted the last shard in the shard group, but make sure we don't overwrite the timestamp if it
|
||||
// was already deleted.
|
||||
if !data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Deleted() {
|
||||
data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -457,6 +460,26 @@ func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
|
|||
return ErrShardGroupNotFound
|
||||
}
|
||||
|
||||
// PruneShardGroups removes any shards deleted before expiration and that have no remaining owners.
|
||||
// Returns true if data is modified.
|
||||
func (data *Data) PruneShardGroups(expiration time.Time) bool {
|
||||
var changed bool
|
||||
for i, d := range data.Databases {
|
||||
for j, rp := range d.RetentionPolicies {
|
||||
var remainingShardGroups []ShardGroupInfo
|
||||
for _, sgi := range rp.ShardGroups {
|
||||
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) || len(sgi.Shards) > 0 {
|
||||
remainingShardGroups = append(remainingShardGroups, sgi)
|
||||
continue
|
||||
}
|
||||
changed = true
|
||||
}
|
||||
data.Databases[i].RetentionPolicies[j].ShardGroups = remainingShardGroups
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
||||
// CreateContinuousQuery adds a named continuous query to a database.
|
||||
func (data *Data) CreateContinuousQuery(database, name, query string) error {
|
||||
di := data.Database(database)
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
package helpers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||
)
|
||||
|
||||
// DataDeleteShardGroup deletes the shard group specified by database, policy, and id from targetData.
|
||||
// It does this by setting the shard group's DeletedAt time to now. We have to reimplement DeleteShardGroup
|
||||
// instead of using data's so that the DeletedAt time will be deterministic. We are also not testing
|
||||
// the functionality of DeleteShardGroup. We are testing if DeleteShardGroup gets called correctly.
|
||||
func DataDeleteShardGroup(targetData *meta.Data, now time.Time, database, policy string, id uint64) error {
|
||||
rpi, err := targetData.RetentionPolicy(database, policy)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
} else if rpi == nil {
|
||||
return meta.ErrRetentionPolicyNotFound
|
||||
}
|
||||
|
||||
// Find shard group by ID and set its deletion timestamp.
|
||||
for i := range rpi.ShardGroups {
|
||||
if rpi.ShardGroups[i].ID == id {
|
||||
rpi.ShardGroups[i].DeletedAt = now
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return meta.ErrShardGroupNotFound
|
||||
}
|
||||
|
||||
// DataNukeShardGroup unconditionally removes the shard group identified by targetDB, targetRP, and targetID
|
||||
// from targetData. There's no meta.Data method to directly remove a shard group, only to mark it deleted and
|
||||
// then prune it. We can't use the functionality we're testing to generate the expected result.
|
||||
func DataNukeShardGroup(targetData *meta.Data, targetDB, targetRP string, targetID uint64) error {
|
||||
rpi, err := targetData.RetentionPolicy(targetDB, targetRP)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if rpi == nil {
|
||||
return fmt.Errorf("no retention policy found for %q, %q, %d", targetDB, targetRP, targetID)
|
||||
}
|
||||
isTargetShardGroup := func(sgi meta.ShardGroupInfo) bool {
|
||||
return sgi.ID == targetID
|
||||
}
|
||||
if !slices.ContainsFunc(rpi.ShardGroups, isTargetShardGroup) {
|
||||
return fmt.Errorf("shard not found for %q, %q, %d", targetDB, targetRP, targetID)
|
||||
}
|
||||
rpi.ShardGroups = slices.DeleteFunc(rpi.ShardGroups, isTargetShardGroup)
|
||||
return nil
|
||||
}
|
|
@ -3,31 +3,46 @@ package retention // import "github.com/influxdata/influxdb/services/retention"
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/logger"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MetaClient interface {
|
||||
Databases() []meta.DatabaseInfo
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
DropShard(id uint64) error
|
||||
PruneShardGroups() error
|
||||
}
|
||||
|
||||
// Service represents the retention policy enforcement service.
|
||||
type Service struct {
|
||||
MetaClient interface {
|
||||
Databases() []meta.DatabaseInfo
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
PruneShardGroups() error
|
||||
}
|
||||
MetaClient
|
||||
TSDBStore interface {
|
||||
ShardIDs() []uint64
|
||||
DeleteShard(shardID uint64) error
|
||||
}
|
||||
|
||||
// DropShardRef is a function that takes a shard ID and removes the
|
||||
// "reference" to it in the meta data. For OSS, this would be a DropShard
|
||||
// operation. For Enterprise, this would be a RemoveShardOwner operation.
|
||||
// Also provided is owners, the list of node IDs of the shard owners
|
||||
// according to the meta store. For OSS, owners will always be empty.
|
||||
// Enterprise can use owners to optimize out calls to RemoveShardOwner
|
||||
// if the current node doesn't actually own the shardID. This prevents
|
||||
// a lot of unnecessary RPC calls.
|
||||
DropShardMetaRef func(shardID uint64, owners []uint64) error
|
||||
|
||||
config Config
|
||||
wg sync.WaitGroup
|
||||
cancel context.CancelFunc
|
||||
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
|
@ -39,12 +54,23 @@ func NewService(c Config) *Service {
|
|||
}
|
||||
}
|
||||
|
||||
// OSSDropShardMetaRef creates a closure appropriate for OSS to use as DropShardMetaRef.
|
||||
func OSSDropShardMetaRef(mc MetaClient) func(uint64, []uint64) error {
|
||||
return func(shardID uint64, owners []uint64) error {
|
||||
return mc.DropShard(shardID)
|
||||
}
|
||||
}
|
||||
|
||||
// Open starts retention policy enforcement.
|
||||
func (s *Service) Open(ctx context.Context) error {
|
||||
if !s.config.Enabled || s.cancel != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.DropShardMetaRef == nil {
|
||||
return fmt.Errorf("invalid nil for retention service DropShardMetaRef")
|
||||
}
|
||||
|
||||
s.logger.Info("Starting retention policy enforcement service",
|
||||
logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
|
||||
|
||||
|
@ -114,86 +140,139 @@ func (s *Service) run(ctx context.Context) {
|
|||
return
|
||||
|
||||
case <-ticker.C:
|
||||
startTime := time.Now()
|
||||
log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check")
|
||||
|
||||
type deletionInfo struct {
|
||||
db string
|
||||
rp string
|
||||
}
|
||||
deletedShardIDs := make(map[uint64]deletionInfo)
|
||||
|
||||
// Mark down if an error occurred during this function so we can inform the
|
||||
// user that we will try again on the next interval.
|
||||
// Without the message, they may see the error message and assume they
|
||||
// have to do it manually.
|
||||
var retryNeeded bool
|
||||
dbs := s.MetaClient.Databases()
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
// Build list of already deleted shards.
|
||||
for _, g := range r.DeletedShardGroups() {
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
|
||||
}
|
||||
}
|
||||
|
||||
// Determine all shards that have expired and need to be deleted.
|
||||
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
|
||||
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
|
||||
log.Info("Failed to delete shard group",
|
||||
logger.Database(d.Name),
|
||||
logger.ShardGroup(g.ID),
|
||||
logger.RetentionPolicy(r.Name),
|
||||
zap.Error(err))
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("Deleted shard group",
|
||||
logger.Database(d.Name),
|
||||
logger.ShardGroup(g.ID),
|
||||
logger.RetentionPolicy(r.Name))
|
||||
|
||||
// Store all the shard IDs that may possibly need to be removed locally.
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove shards if we store them locally
|
||||
for _, id := range s.TSDBStore.ShardIDs() {
|
||||
if info, ok := deletedShardIDs[id]; ok {
|
||||
if err := s.TSDBStore.DeleteShard(id); err != nil {
|
||||
log.Info("Failed to delete shard",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp),
|
||||
zap.Error(err))
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
log.Info("Deleted shard",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp))
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.MetaClient.PruneShardGroups(); err != nil {
|
||||
log.Info("Problem pruning shard groups", zap.Error(err))
|
||||
retryNeeded = true
|
||||
}
|
||||
|
||||
if retryNeeded {
|
||||
log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
|
||||
}
|
||||
|
||||
logEnd()
|
||||
elapsed := time.Since(startTime)
|
||||
globalRetentionMetrics.checkDuration.Observe(elapsed.Seconds())
|
||||
s.DeletionCheck()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) DeletionCheck() {
|
||||
log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check")
|
||||
defer logEnd()
|
||||
|
||||
type deletionInfo struct {
|
||||
db string
|
||||
rp string
|
||||
owners []uint64
|
||||
}
|
||||
newDeletionInfo := func(db, rp string, si meta.ShardInfo) deletionInfo {
|
||||
owners := make([]uint64, len(si.Owners))
|
||||
for i, o := range si.Owners {
|
||||
owners[i] = o.NodeID
|
||||
}
|
||||
return deletionInfo{db: db, rp: rp, owners: owners}
|
||||
}
|
||||
deletedShardIDs := make(map[uint64]deletionInfo)
|
||||
|
||||
dropShardMetaRef := func(id uint64, info deletionInfo) error {
|
||||
if err := s.DropShardMetaRef(id, info.owners); err != nil {
|
||||
log.Error("Failed to drop shard meta reference",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mark down if an error occurred during this function so we can inform the
|
||||
// user that we will try again on the next interval.
|
||||
// Without the message, they may see the error message and assume they
|
||||
// have to do it manually.
|
||||
var retryNeeded bool
|
||||
dbs := s.MetaClient.Databases()
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
// Build list of already deleted shards.
|
||||
for _, g := range r.DeletedShardGroups() {
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh)
|
||||
}
|
||||
}
|
||||
|
||||
// Determine all shards that have expired and need to be deleted.
|
||||
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
|
||||
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
|
||||
log.Info("Failed to delete shard group",
|
||||
logger.Database(d.Name),
|
||||
logger.ShardGroup(g.ID),
|
||||
logger.RetentionPolicy(r.Name),
|
||||
zap.Error(err))
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
|
||||
log.Info("Deleted shard group",
|
||||
logger.Database(d.Name),
|
||||
logger.ShardGroup(g.ID),
|
||||
logger.RetentionPolicy(r.Name))
|
||||
|
||||
// Store all the shard IDs that may possibly need to be removed locally.
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove shards if we store them locally
|
||||
for _, id := range s.TSDBStore.ShardIDs() {
|
||||
if info, ok := deletedShardIDs[id]; ok {
|
||||
delete(deletedShardIDs, id)
|
||||
log.Info("Attempting deletion of shard from store",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp))
|
||||
if err := s.TSDBStore.DeleteShard(id); err != nil {
|
||||
log.Error("Failed to delete shard",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp),
|
||||
zap.Error(err))
|
||||
if errors.Is(err, tsdb.ErrShardNotFound) {
|
||||
// At first you wouldn't think this could happen, we're iterating over shards
|
||||
// in the store. However, if this has been a very long running operation the
|
||||
// shard could have been dropped from the store while we were working on other shards.
|
||||
log.Warn("Shard does not exist in store, continuing retention removal",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp))
|
||||
} else {
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
log.Info("Deleted shard",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp))
|
||||
if err := dropShardMetaRef(id, info); err != nil {
|
||||
// removeShardMetaReference already logged the error.
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for expired phantom shards that exist in the metadata but not in the store.
|
||||
for id, info := range deletedShardIDs {
|
||||
log.Error("Expired phantom shard detected during retention check, removing from metadata",
|
||||
logger.Database(info.db),
|
||||
logger.Shard(id),
|
||||
logger.RetentionPolicy(info.rp))
|
||||
if err := dropShardMetaRef(id, info); err != nil {
|
||||
// removeShardMetaReference already logged the error.
|
||||
retryNeeded = true
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.MetaClient.PruneShardGroups(); err != nil {
|
||||
log.Info("Problem pruning shard groups", zap.Error(err))
|
||||
retryNeeded = true
|
||||
}
|
||||
|
||||
if retryNeeded {
|
||||
log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package retention_test
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -10,11 +11,15 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/v2/internal"
|
||||
"github.com/influxdata/influxdb/v2/toml"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/retention"
|
||||
"github.com/influxdata/influxdb/v2/v1/services/retention/helpers"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
func TestService_OpenDisabled(t *testing.T) {
|
||||
|
@ -60,6 +65,201 @@ func TestService_OpenClose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestRetention_DeletionCheck(t *testing.T) {
|
||||
cfg := retention.Config{
|
||||
Enabled: true,
|
||||
|
||||
// This test runs DeletionCheck manually for the test cases. It is about checking
|
||||
// the results of DeletionCheck, not if it is run properly on the timer.
|
||||
// Set a long check interval so the deletion check won't run on its own during the test.
|
||||
CheckInterval: toml.Duration(time.Hour * 24),
|
||||
}
|
||||
|
||||
now := time.Now().UTC()
|
||||
shardDuration := time.Hour * 24 * 14
|
||||
shardGroupDuration := time.Hour * 24
|
||||
foreverShard := uint64(1003) // a shard that can't be deleted
|
||||
phantomShard := uint64(1006)
|
||||
dataUT := &meta.Data{
|
||||
Users: []meta.UserInfo{},
|
||||
Databases: []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "servers",
|
||||
DefaultRetentionPolicy: "autogen",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "autogen",
|
||||
ReplicaN: 2,
|
||||
Duration: shardDuration,
|
||||
ShardGroupDuration: shardGroupDuration,
|
||||
ShardGroups: []meta.ShardGroupInfo{
|
||||
// Shard group 1 is deleted and expired group with a single shard.
|
||||
{
|
||||
ID: 1,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
Shards: []meta.ShardInfo{
|
||||
{
|
||||
ID: 101,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Shard group 2 is deleted and expired with no shards.
|
||||
// Note a shard group with no shards should not exist anyway.
|
||||
{
|
||||
ID: 2,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
},
|
||||
// Shard group 3 is deleted and expired, but its shard can not be deleted.
|
||||
{
|
||||
ID: 3,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 2*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
Shards: []meta.ShardInfo{
|
||||
{
|
||||
ID: foreverShard,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Shard group 4 is deleted, but not expired with a single shard.
|
||||
{
|
||||
ID: 4,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration),
|
||||
DeletedAt: now.Truncate(time.Hour * 24),
|
||||
Shards: []meta.ShardInfo{
|
||||
{
|
||||
ID: 104,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Shard group 5 is active and should not be touched.
|
||||
{
|
||||
ID: 5,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(0 * shardGroupDuration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(1 * shardGroupDuration),
|
||||
Shards: []meta.ShardInfo{
|
||||
{
|
||||
ID: 105,
|
||||
},
|
||||
},
|
||||
},
|
||||
// Shard group 6 is a deleted and expired shard group with a phantom shard that doesn't exist in the store.
|
||||
{
|
||||
ID: 6,
|
||||
StartTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 0*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
EndTime: now.Truncate(time.Hour * 24).Add(-2*shardDuration + 1*shardGroupDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
DeletedAt: now.Truncate(time.Hour * 24).Add(-1 * shardDuration).Add(meta.ShardGroupDeletedExpiration),
|
||||
Shards: []meta.ShardInfo{
|
||||
{
|
||||
ID: phantomShard,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
expData := dataUT.Clone()
|
||||
|
||||
databasesFn := func() []meta.DatabaseInfo {
|
||||
return dataUT.Databases
|
||||
}
|
||||
deleteShardGroupFn := func(database, policy string, id uint64) error {
|
||||
return helpers.DataDeleteShardGroup(dataUT, now, database, policy, id)
|
||||
}
|
||||
dropShardFn := func(id uint64) error {
|
||||
dataUT.DropShard(id)
|
||||
return nil
|
||||
}
|
||||
pruneShardGroupsFn := func() error {
|
||||
// PruneShardGroups is the core functionality we are testing. We must use meta.Data's version.
|
||||
dataUT.PruneShardGroups(now.Add(meta.ShardGroupDeletedExpiration))
|
||||
return nil
|
||||
}
|
||||
mc := &internal.MetaClientMock{
|
||||
DatabasesFn: databasesFn,
|
||||
DeleteShardGroupFn: deleteShardGroupFn,
|
||||
DropShardFn: dropShardFn,
|
||||
PruneShardGroupsFn: pruneShardGroupsFn,
|
||||
}
|
||||
|
||||
collectShards := func(d *meta.Data) map[uint64]struct{} {
|
||||
s := map[uint64]struct{}{}
|
||||
for _, db := range d.Databases {
|
||||
for _, rp := range db.RetentionPolicies {
|
||||
for _, sg := range rp.ShardGroups {
|
||||
for _, sh := range sg.Shards {
|
||||
s[sh.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// All these shards are yours except phantomShard. Attempt no deletion there.
|
||||
shards := collectShards(dataUT)
|
||||
delete(shards, phantomShard)
|
||||
|
||||
shardIDs := func() []uint64 {
|
||||
return maps.Keys(shards)
|
||||
}
|
||||
deleteShard := func(shardID uint64) error {
|
||||
if _, ok := shards[shardID]; !ok {
|
||||
return tsdb.ErrShardNotFound
|
||||
}
|
||||
if shardID == foreverShard {
|
||||
return fmt.Errorf("unknown error deleting shard files for shard %d", shardID)
|
||||
}
|
||||
delete(shards, shardID)
|
||||
return nil
|
||||
}
|
||||
store := &internal.TSDBStoreMock{
|
||||
DeleteShardFn: deleteShard,
|
||||
ShardIDsFn: shardIDs,
|
||||
}
|
||||
|
||||
s := retention.NewService(cfg)
|
||||
s.MetaClient = mc
|
||||
s.TSDBStore = store
|
||||
s.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient)
|
||||
require.NoError(t, s.Open(context.Background()))
|
||||
s.DeletionCheck()
|
||||
|
||||
// Adjust expData to make it look like we expect.
|
||||
require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 1))
|
||||
require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 2))
|
||||
expData.DropShard(104)
|
||||
require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 6))
|
||||
|
||||
require.Equal(t, expData, dataUT)
|
||||
require.Equal(t, collectShards(expData), shards)
|
||||
|
||||
// Check that multiple duplicate calls to DeletionCheck don't make further changes.
|
||||
// This is mostly for our friend foreverShard.
|
||||
for i := 0; i < 10; i++ {
|
||||
s.DeletionCheck()
|
||||
require.Equal(t, expData, dataUT)
|
||||
require.Equal(t, collectShards(expData), shards)
|
||||
}
|
||||
|
||||
// Our heroic support team hos fixed the issue with foreverShard.
|
||||
foreverShard = math.MaxUint64
|
||||
s.DeletionCheck()
|
||||
require.NoError(t, helpers.DataNukeShardGroup(expData, "servers", "autogen", 3))
|
||||
require.Equal(t, expData, dataUT)
|
||||
require.Equal(t, collectShards(expData), shards)
|
||||
|
||||
require.NoError(t, s.Close())
|
||||
}
|
||||
|
||||
func TestService_CheckShards(t *testing.T) {
|
||||
now := time.Now()
|
||||
// Account for any time difference that could cause some of the logic in
|
||||
|
@ -71,43 +271,44 @@ func TestService_CheckShards(t *testing.T) {
|
|||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
data := []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
|
||||
DefaultRetentionPolicy: "rp0",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "rp0",
|
||||
ReplicaN: 1,
|
||||
Duration: time.Hour,
|
||||
ShardGroupDuration: time.Hour,
|
||||
ShardGroups: []meta.ShardGroupInfo{
|
||||
{
|
||||
ID: 1,
|
||||
StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour),
|
||||
EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 2},
|
||||
{ID: 3},
|
||||
data := meta.Data{
|
||||
Databases: []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
DefaultRetentionPolicy: "rp0",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "rp0",
|
||||
ReplicaN: 1,
|
||||
Duration: time.Hour,
|
||||
ShardGroupDuration: time.Hour,
|
||||
ShardGroups: []meta.ShardGroupInfo{
|
||||
{
|
||||
ID: 1,
|
||||
StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour),
|
||||
EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 2},
|
||||
{ID: 3},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: 4,
|
||||
StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
|
||||
EndTime: now.Truncate(time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 5},
|
||||
{ID: 6},
|
||||
{
|
||||
ID: 4,
|
||||
StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
|
||||
EndTime: now.Truncate(time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 5},
|
||||
{ID: 6},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: 7,
|
||||
StartTime: now.Truncate(time.Hour),
|
||||
EndTime: now.Truncate(time.Hour).Add(time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 8},
|
||||
{ID: 9},
|
||||
{
|
||||
ID: 7,
|
||||
StartTime: now.Truncate(time.Hour),
|
||||
EndTime: now.Truncate(time.Hour).Add(time.Hour),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 8},
|
||||
{ID: 9},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -120,13 +321,13 @@ func TestService_CheckShards(t *testing.T) {
|
|||
config.CheckInterval = toml.Duration(10 * time.Millisecond)
|
||||
s := NewService(t, config)
|
||||
s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
return data
|
||||
return data.Databases
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
deletedShardGroups := make(map[string]struct{})
|
||||
s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error {
|
||||
for _, dbi := range data {
|
||||
for _, dbi := range data.Databases {
|
||||
if dbi.Name == database {
|
||||
for _, rpi := range dbi.RetentionPolicies {
|
||||
if rpi.Name == policy {
|
||||
|
@ -151,6 +352,25 @@ func TestService_CheckShards(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
dropShardDone := make(chan struct{})
|
||||
droppedShards := make(map[uint64]struct{})
|
||||
s.MetaClient.DropShardFn = func(id uint64) error {
|
||||
data.DropShard(id)
|
||||
if _, ok := droppedShards[id]; ok {
|
||||
t.Errorf("duplicate DropShard")
|
||||
}
|
||||
droppedShards[id] = struct{}{}
|
||||
if got, want := droppedShards, map[uint64]struct{}{
|
||||
2: struct{}{},
|
||||
3: struct{}{},
|
||||
}; reflect.DeepEqual(got, want) {
|
||||
close(dropShardDone)
|
||||
} else if len(got) > len(want) {
|
||||
t.Errorf("dropped too many shards")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
pruned := false
|
||||
closing := make(chan struct{})
|
||||
s.MetaClient.PruneShardGroupsFn = func() error {
|
||||
|
@ -165,11 +385,21 @@ func TestService_CheckShards(t *testing.T) {
|
|||
return nil
|
||||
}
|
||||
|
||||
activeShards := map[uint64]struct{}{
|
||||
2: struct{}{},
|
||||
3: struct{}{},
|
||||
5: struct{}{},
|
||||
6: struct{}{},
|
||||
}
|
||||
deletedShards := make(map[uint64]struct{})
|
||||
s.TSDBStore.ShardIDsFn = func() []uint64 {
|
||||
return []uint64{2, 3, 5, 6}
|
||||
return maps.Keys(activeShards)
|
||||
}
|
||||
s.TSDBStore.DeleteShardFn = func(shardID uint64) error {
|
||||
if _, ok := activeShards[shardID]; !ok {
|
||||
return tsdb.ErrShardNotFound
|
||||
}
|
||||
delete(activeShards, shardID)
|
||||
deletedShards[shardID] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
@ -185,6 +415,14 @@ func TestService_CheckShards(t *testing.T) {
|
|||
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case <-dropShardDone:
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
t.Errorf("timeout waiting for shard to be dropped")
|
||||
}
|
||||
|
||||
timer = time.NewTimer(100 * time.Millisecond)
|
||||
select {
|
||||
case <-done:
|
||||
timer.Stop()
|
||||
case <-timer.C:
|
||||
|
@ -242,30 +480,32 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
|
|||
var mu sync.Mutex
|
||||
shards := []uint64{3, 5, 8, 9, 11, 12}
|
||||
localShards := []uint64{3, 5, 8, 9, 11, 12}
|
||||
databases := []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "autogen",
|
||||
Duration: 24 * time.Hour,
|
||||
ShardGroupDuration: 24 * time.Hour,
|
||||
ShardGroups: []meta.ShardGroupInfo{
|
||||
{
|
||||
ID: 1,
|
||||
StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 3}, {ID: 9},
|
||||
data := meta.Data{
|
||||
Databases: []meta.DatabaseInfo{
|
||||
{
|
||||
Name: "db0",
|
||||
RetentionPolicies: []meta.RetentionPolicyInfo{
|
||||
{
|
||||
Name: "autogen",
|
||||
Duration: 24 * time.Hour,
|
||||
ShardGroupDuration: 24 * time.Hour,
|
||||
ShardGroups: []meta.ShardGroupInfo{
|
||||
{
|
||||
ID: 1,
|
||||
StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 3}, {ID: 9},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ID: 2,
|
||||
StartTime: time.Now().Add(-1 * time.Hour),
|
||||
EndTime: time.Now(),
|
||||
DeletedAt: time.Now(),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 11}, {ID: 12},
|
||||
{
|
||||
ID: 2,
|
||||
StartTime: time.Now().Add(-1 * time.Hour),
|
||||
EndTime: time.Now(),
|
||||
DeletedAt: time.Now(),
|
||||
Shards: []meta.ShardInfo{
|
||||
{ID: 11}, {ID: 12},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -284,7 +524,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
|
|||
s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return databases
|
||||
return data.Databases
|
||||
}
|
||||
|
||||
s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error {
|
||||
|
@ -308,11 +548,18 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
|
|||
}
|
||||
}
|
||||
shards = newShards
|
||||
databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC()
|
||||
data.Databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC()
|
||||
mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
s.MetaClient.DropShardFn = func(shardID uint64) error {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
data.DropShard(shardID)
|
||||
return nil
|
||||
}
|
||||
|
||||
s.MetaClient.PruneShardGroupsFn = func() error {
|
||||
// When this is called all shards that have been deleted from the meta
|
||||
// store (expired) should also have been deleted from disk.
|
||||
|
@ -401,5 +648,6 @@ func NewService(tb testing.TB, c retention.Config) *Service {
|
|||
|
||||
s.Service.MetaClient = s.MetaClient
|
||||
s.Service.TSDBStore = s.TSDBStore
|
||||
s.Service.DropShardMetaRef = retention.OSSDropShardMetaRef(s.Service.MetaClient)
|
||||
return s
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue