chore: upgrade flux to v0.167.0 in 1.10 (#23350)

* chore: backport 8334dd0a23 to 1.x

* chore: upgrade flux to v0.167.0

* chore: update flux to latest version (#23249)

* chore: update flux to latest version

* fix: backport "convert allocator to interface"

* fix: construct `span` during dependency injection
pull/23399/head v1.10.0rc1
Brandon Pfeifer 2022-05-17 16:27:38 -04:00 committed by GitHub
parent ed80cc6a1c
commit 98c8db57da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 178 additions and 228 deletions

View File

@ -37,7 +37,7 @@ func (s *LocalBucketsProcedureSpec) Copy() plan.ProcedureSpec {
type BucketsDecoder struct {
deps StorageDependencies
alloc *memory.Allocator
alloc memory.Allocator
user meta.User
}

View File

@ -1,61 +1,44 @@
package influxdb_test
import "testing/expect"
option now = () => (2030-01-01T00:00:00Z)
testcase push_down_min_bare extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare()
testcase push_down_min_bare extends "flux/planner/group_min_test.group_min_bare" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_min_bare_host extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_host()
testcase push_down_min_bare_host extends "flux/planner/group_min_test.group_min_bare_host" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_min_bare_field extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_field()
testcase push_down_min_bare_field extends "flux/planner/group_min_test.group_min_bare_field" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_max_bare extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare()
testcase push_down_max_bare extends "flux/planner/group_max_test.group_max_bare" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_max_bare_host extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_host()
testcase push_down_max_bare_host extends "flux/planner/group_max_test.group_max_bare_host" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_max_bare_field extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_field()
testcase push_down_max_bare_field extends "flux/planner/group_max_test.group_max_bare_field" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_min_table()
testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test.group_min_table" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}
testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_max_table()
testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test.group_max_table" {
expect.planner(rules: ["PushDownGroupAggregateRule": 1])
super()
}

View File

@ -29,7 +29,7 @@ type Source struct {
id execute.DatasetID
ts []execute.Transformation
alloc *memory.Allocator
alloc memory.Allocator
stats cursors.CursorStats
label string

View File

@ -108,12 +108,12 @@ func (spec *ReadWindowAggregateSpec) Name() string {
}
type Reader interface {
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error)
ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error)
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error)
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc memory.Allocator) (TableIterator, error)
ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc memory.Allocator) (TableIterator, error)
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc memory.Allocator) (TableIterator, error)
ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc memory.Allocator) (TableIterator, error)
ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc memory.Allocator) (TableIterator, error)
Close()
}

View File

@ -7,6 +7,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/dependencies/dependenciestest"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/interpreter"
@ -760,7 +761,8 @@ m,tag1=c _value=4 41`),
tc.want.tables,
nil,
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
ctx := deps.Inject(context.Background())
ctx, span := dependency.Inject(context.Background())
defer span.Finish()
newT, err := influxdb.NewToTransformation(ctx, d, c, tc.spec, deps.StorageDeps)
if err != nil {
t.Error(err)

View File

@ -71,7 +71,7 @@ type DatabasesDecoder struct {
deps *influxdb.StorageDependencies
databases []meta.DatabaseInfo
user meta.User
alloc *memory.Allocator
alloc memory.Allocator
}
func (bd *DatabasesDecoder) Connect(ctx context.Context) error {

6
go.mod
View File

@ -16,7 +16,7 @@ require (
github.com/golang/mock v1.5.0
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.7
github.com/influxdata/flux v0.159.0
github.com/influxdata/flux v0.167.0
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256
github.com/influxdata/pkg-config v0.2.11
@ -89,7 +89,6 @@ require (
github.com/benbjohnson/immutable v0.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect
github.com/c-bata/go-prompt v0.2.2 // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/denisenkom/go-mssqldb v0.10.0 // indirect
@ -123,17 +122,14 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/lib/pq v1.0.0 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-runewidth v0.0.3 // indirect
github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
github.com/philhofer/fwd v1.0.0 // indirect
github.com/pierrec/lz4/v4 v4.1.11 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.0.11 // indirect
github.com/segmentio/kafka-go v0.2.0 // indirect

20
go.sum
View File

@ -96,7 +96,6 @@ github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1M
github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/HdrHistogram/hdrhistogram-go v1.1.0 h1:6dpdDPTRoo78HxAJ6T1HfMiKSnqhgRRqzCuPshRkQ7I=
github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc=
@ -235,7 +234,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -285,7 +283,6 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoD
github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8=
github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 h1:nLPjjvpUAODOR6vY/7o0hBIk8iTr19Fvmf8aFx/kC7A=
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -582,8 +579,8 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY=
github.com/influxdata/flux v0.159.0 h1:Vdq/3/NfO6xl2q6COr8c3rKuywMIRrfxVIRrXGHZq/Q=
github.com/influxdata/flux v0.159.0/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8=
github.com/influxdata/flux v0.167.0 h1:U8xyjJz6uL1mfmKEio0DYxwUMglbk3jJdFiA0v5nsAc=
github.com/influxdata/flux v0.167.0/go.mod h1:eNApXyjdyUdCNs6LxUQRBHxjUVqK1XrJrlMPhIQSQpA=
github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1AjGWxxrE=
github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
@ -662,9 +659,8 @@ github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
@ -710,7 +706,6 @@ github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4f
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.22/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.29 h1:xHBEhR+t5RzcFJjBLJlax2daXOrTYtr9z4WdKEfWFzg=
github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
@ -747,7 +742,6 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU=
@ -940,7 +934,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/zeebo/xxh3 v0.13.0/go.mod h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
@ -1008,7 +1001,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI=
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -1028,7 +1020,6 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a/go.mod h1:a3o/VtDNHN+dCVLEpzjjUHOzR+Ln3DHX056ZPzoZGGA=
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI=
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
@ -1065,7 +1056,6 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4=
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1123,7 +1113,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
@ -1235,7 +1224,6 @@ golang.org/x/sys v0.0.0-20210601080250-7ecdf8ef093b/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM=
@ -1339,7 +1327,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8=
golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1493,7 +1480,6 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=

View File

@ -20,7 +20,7 @@ func NewFluxControllerMock() *FluxControllerMock {
if err != nil {
return nil, err
}
alloc := &memory.Allocator{}
alloc := &memory.ResourceAllocator{}
return p.Start(ctx, alloc)
},
}

View File

@ -10,31 +10,31 @@ import (
// Reader is a mock implementation of flux/stdlib/influxdata/influxdb.Reader
type Reader struct {
ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error)
ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc memory.Allocator) (influxdb.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc memory.Allocator) (influxdb.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc memory.Allocator) (influxdb.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc memory.Allocator) (influxdb.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error)
CloseFn func()
}
func (m Reader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (m Reader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadFilterFn(ctx, spec, alloc)
}
func (m Reader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (m Reader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadGroupFn(ctx, spec, alloc)
}
func (m Reader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (m Reader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadWindowAggregateFn(ctx, spec, alloc)
}
func (m Reader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (m Reader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadTagKeysFn(ctx, spec, alloc)
}
func (m Reader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (m Reader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return m.ReadTagValuesFn(ctx, spec, alloc)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/dependency"
"github.com/influxdata/flux/execute/table"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/memory"
@ -230,11 +231,10 @@ func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Qu
defer span.Finish()
// The controller injects the dependencies for each incoming request.
for _, dep := range c.dependencies {
ctx = dep.Inject(ctx)
}
q, err := c.query(ctx, compiler)
ctx, deps := dependency.Inject(ctx, c.dependencies...)
q, err := c.query(ctx, compiler, deps)
if err != nil {
deps.Finish()
return q, err
}
@ -243,8 +243,8 @@ func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Qu
// query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler.CompilerType())
func (c *Controller) query(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (flux.Query, error) {
q, err := c.createQuery(ctx, compiler, deps)
if err != nil {
return nil, handleFluxError(err)
}
@ -264,7 +264,7 @@ func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Qu
return q, nil
}
func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Query, error) {
func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (*Query, error) {
c.queriesMu.RLock()
if c.shutdown {
c.queriesMu.RUnlock()
@ -287,7 +287,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
labelValues[i] = str
compileLabelValues[i] = str
}
compileLabelValues[len(compileLabelValues)-1] = string(ct)
compileLabelValues[len(compileLabelValues)-1] = string(compiler.CompilerType())
cctx, cancel := context.WithCancel(ctx)
parentSpan, parentCtx := tracing.StartSpanFromContextWithPromMetrics(
@ -307,6 +307,8 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu
parentSpan: parentSpan,
cancel: cancel,
doneCh: make(chan struct{}),
deps: deps,
compiler: compiler,
}
// Lock the queries mutex for the rest of this method.
@ -593,12 +595,14 @@ type Query struct {
done sync.Once
doneCh chan struct{}
program flux.Program
exec flux.Query
results chan flux.Result
program flux.Program
exec flux.Query
results chan flux.Result
compiler flux.Compiler
memoryManager *queryMemoryManager
alloc *memory.Allocator
alloc *memory.ResourceAllocator
deps *dependency.Span
}
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {
@ -694,6 +698,9 @@ func (q *Query) Done() {
}
q.stats.RuntimeErrors = errMsgs
// Clean up the dependencies.
q.deps.Finish()
// Mark the query as finished so it is removed from the query map.
q.c.finish(q)

View File

@ -37,7 +37,7 @@ var (
mockCompiler = &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
q.ResultsCh <- &executetest.Result{}
},
}, nil
@ -197,7 +197,7 @@ func TestController_QueryRuntimeError(t *testing.T) {
// ensure we have non-zero compile time
time.Sleep(1 * time.Millisecond)
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
q.SetErr(errors.New("runtime error"))
},
}, nil
@ -256,7 +256,7 @@ func TestController_QueryQueueError(t *testing.T) {
q, err := ctrl.Query(context.Background(), &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Block until test is finished
<-done
},
@ -405,7 +405,7 @@ func TestController_ExecuteError(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) {
StartFn: func(ctx context.Context, alloc memory.Allocator) (*mock.Query, error) {
return nil, errors.New("expected error")
},
}, nil
@ -550,7 +550,7 @@ func TestController_StartPanic(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (i *mock.Query, e error) {
StartFn: func(ctx context.Context, alloc memory.Allocator) (i *mock.Query, e error) {
panic("panic during start step")
},
}, nil
@ -589,7 +589,7 @@ func TestController_ShutdownWithRunningQuery(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
close(executing)
<-ctx.Done()
@ -644,7 +644,7 @@ func TestController_ShutdownWithTimeout(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// This should just block until the end of the test
// when we perform cleanup.
close(executing)
@ -694,7 +694,7 @@ func TestController_PerQueryMemoryLimit(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
defer func() {
if err, ok := recover().(error); ok && err != nil {
q.SetErr(err)
@ -746,7 +746,7 @@ func TestController_ConcurrencyQuota(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
select {
case <-q.Canceled:
default:
@ -817,7 +817,7 @@ func TestController_QueueSize(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
executing <- struct{}{}
// Block until test is finished
<-done
@ -877,7 +877,7 @@ func TestController_CancelDone_Unlimited(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -922,7 +922,7 @@ func TestController_DoneWithoutRead_Unlimited(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -974,7 +974,7 @@ func TestController_CancelDone(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -1021,7 +1021,7 @@ func TestController_DoneWithoutRead(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -1077,7 +1077,7 @@ func TestController_Error_MaxMemory(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for i := 0; i < 16; i++ {
size := config.MemoryBytesQuotaPerQuery / 16
@ -1135,7 +1135,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
wellBehavedNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory until we hit our initial memory limit so we should
// never request more memory.
for amount := int64(0); amount < config.InitialMemoryBytesQuotaPerQuery; amount += 16 {
@ -1152,7 +1152,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
noisyNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for {
@ -1241,7 +1241,7 @@ func TestController_Error_NoRemainingMemory(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can until denied.
for size := int64(0); ; size += 16 {
if err := alloc.Account(16); err != nil {
@ -1287,7 +1287,7 @@ func TestController_MemoryRelease(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate some amount of memory and never release it.
if err := alloc.Account(int(config.MemoryBytesQuotaPerQuery) / 2); err != nil {
q.SetErr(err)
@ -1336,7 +1336,7 @@ func TestController_IrregularMemoryQuota(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for size := 0; size < 768; size += 16 {
if err := alloc.Account(16); err != nil {
@ -1395,7 +1395,7 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for size := 0; size < 1024; size += 16 {

View File

@ -47,7 +47,7 @@ func (c *Controller) createAllocator(q *Query) {
m: c.memory,
limit: c.memory.initialBytesQuotaPerQuery,
}
q.alloc = &memory.Allocator{
q.alloc = &memory.ResourceAllocator{
// Use an anonymous function to ensure the value is copied.
Limit: func(v int64) *int64 { return &v }(q.memoryManager.limit),
Manager: q.memoryManager,

View File

@ -62,7 +62,7 @@ func NewReader(s storage.Store) query.Reader {
return &storeReader{s: s}
}
func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &filterIterator{
ctx: ctx,
s: r.s,
@ -72,7 +72,7 @@ func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec,
}, nil
}
func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &groupIterator{
ctx: ctx,
s: r.s,
@ -82,7 +82,7 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, a
}, nil
}
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &windowAggregateIterator{
ctx: ctx,
s: r.s,
@ -92,7 +92,7 @@ func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWi
}, nil
}
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &tagKeysIterator{
ctx: ctx,
bounds: spec.Bounds,
@ -103,7 +103,7 @@ func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpe
}, nil
}
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &tagValuesIterator{
ctx: ctx,
bounds: spec.Bounds,
@ -122,7 +122,7 @@ type filterIterator struct {
spec query.ReadFilterSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
@ -239,7 +239,7 @@ type groupIterator struct {
spec query.ReadGroupSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
@ -612,7 +612,7 @@ type windowAggregateIterator struct {
spec query.ReadWindowAggregateSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats }
@ -847,7 +847,7 @@ type tagKeysIterator struct {
s storage.Store
readSpec query.ReadTagKeysSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
alloc memory.Allocator
}
func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
@ -932,7 +932,7 @@ type tagValuesIterator struct {
s storage.Store
readSpec query.ReadTagValuesSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
alloc memory.Allocator
}
func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {

View File

@ -33,7 +33,7 @@ type floatTable struct {
table
mu sync.Mutex
cur cursors.FloatArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newFloatTable(
@ -45,7 +45,7 @@ func newFloatTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatTable {
t := &floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -127,7 +127,7 @@ func newFloatWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatWindowTable {
t := &floatWindowTable{
floatTable: floatTable{
@ -334,7 +334,7 @@ func newFloatWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatWindowSelectorTable {
t := &floatWindowSelectorTable{
floatTable: floatTable{
@ -435,7 +435,7 @@ func newFloatEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -671,7 +671,7 @@ func newFloatGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatGroupTable {
t := &floatGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1008,7 +1008,7 @@ type integerTable struct {
table
mu sync.Mutex
cur cursors.IntegerArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newIntegerTable(
@ -1020,7 +1020,7 @@ func newIntegerTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerTable {
t := &integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1103,7 +1103,7 @@ func newIntegerWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerWindowTable {
t := &integerWindowTable{
integerTable: integerTable{
@ -1311,7 +1311,7 @@ func newIntegerWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerWindowSelectorTable {
t := &integerWindowSelectorTable{
integerTable: integerTable{
@ -1412,7 +1412,7 @@ func newIntegerEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -1648,7 +1648,7 @@ func newIntegerGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerGroupTable {
t := &integerGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1986,7 +1986,7 @@ type unsignedTable struct {
table
mu sync.Mutex
cur cursors.UnsignedArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newUnsignedTable(
@ -1998,7 +1998,7 @@ func newUnsignedTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedTable {
t := &unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -2080,7 +2080,7 @@ func newUnsignedWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedWindowTable {
t := &unsignedWindowTable{
unsignedTable: unsignedTable{
@ -2287,7 +2287,7 @@ func newUnsignedWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedWindowSelectorTable {
t := &unsignedWindowSelectorTable{
unsignedTable: unsignedTable{
@ -2388,7 +2388,7 @@ func newUnsignedEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -2624,7 +2624,7 @@ func newUnsignedGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedGroupTable {
t := &unsignedGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -2961,7 +2961,7 @@ type stringTable struct {
table
mu sync.Mutex
cur cursors.StringArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newStringTable(
@ -2973,7 +2973,7 @@ func newStringTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringTable {
t := &stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -3055,7 +3055,7 @@ func newStringWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringWindowTable {
t := &stringWindowTable{
stringTable: stringTable{
@ -3262,7 +3262,7 @@ func newStringWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringWindowSelectorTable {
t := &stringWindowSelectorTable{
stringTable: stringTable{
@ -3363,7 +3363,7 @@ func newStringEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -3599,7 +3599,7 @@ func newStringGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringGroupTable {
t := &stringGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -3880,7 +3880,7 @@ type booleanTable struct {
table
mu sync.Mutex
cur cursors.BooleanArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newBooleanTable(
@ -3892,7 +3892,7 @@ func newBooleanTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanTable {
t := &booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -3974,7 +3974,7 @@ func newBooleanWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanWindowTable {
t := &booleanWindowTable{
booleanTable: booleanTable{
@ -4181,7 +4181,7 @@ func newBooleanWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanWindowSelectorTable {
t := &booleanWindowSelectorTable{
booleanTable: booleanTable{
@ -4282,7 +4282,7 @@ func newBooleanEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -4518,7 +4518,7 @@ func newBooleanGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanGroupTable {
t := &booleanGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),

View File

@ -27,7 +27,7 @@ type {{.name}}Table struct {
table
mu sync.Mutex
cur cursors.{{.Name}}ArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func new{{.Name}}Table(
@ -39,7 +39,7 @@ func new{{.Name}}Table(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -122,7 +122,7 @@ func new{{.Name}}WindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}WindowTable {
t := &{{.name}}WindowTable{
{{.name}}Table: {{.name}}Table{
@ -330,7 +330,7 @@ func new{{.Name}}WindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}WindowSelectorTable {
t := &{{.name}}WindowSelectorTable{
{{.name}}Table: {{.name}}Table{
@ -431,7 +431,7 @@ func new{{.Name}}EmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}EmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -667,7 +667,7 @@ func new{{.Name}}GroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),

View File

@ -33,7 +33,7 @@ type table struct {
cancelled, used int32
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func newTable(
@ -43,7 +43,7 @@ func newTable(
cols []flux.ColMeta,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) table {
return table{
done: done,

View File

@ -161,7 +161,7 @@ func NewStorageReader(tb testing.TB, setupFn SetupFunc) *StorageReader {
}
}
func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error) {
return r.Reader.ReadWindowAggregate(ctx, spec, alloc)
}
@ -181,7 +181,7 @@ func TestStorageReader_ReadFilter(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
ti, err := reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{
@ -257,11 +257,11 @@ func TestStorageReader_Table(t *testing.T) {
for _, tc := range []struct {
name string
newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator
newFn func(ctx context.Context, alloc memory.Allocator) flux.TableIterator
}{
{
name: "ReadFilter",
newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator {
newFn: func(ctx context.Context, alloc memory.Allocator) flux.TableIterator {
ti, err := reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{
Database: reader.Database,
RetentionPolicy: reader.RetentionPolicy,
@ -413,7 +413,7 @@ func TestStorageReader_ReadWindowAggregate(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
@ -513,7 +513,6 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) {
},
},
} {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -528,7 +527,7 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -612,7 +611,6 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -627,7 +625,7 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -796,7 +794,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -811,7 +808,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) {
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -896,7 +893,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -912,7 +908,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) {
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -997,7 +993,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T)
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1013,7 +1008,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T)
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1131,7 +1126,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1148,7 +1142,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1266,7 +1260,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1284,7 +1277,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1340,7 +1333,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
defer reader.Close()
t.Run("unwindowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1354,7 +1346,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1373,7 +1365,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
})
t.Run("windowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1387,7 +1378,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1433,7 +1424,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
})
t.Run("windowed mean with offset", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1448,7 +1438,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1542,7 +1532,6 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1556,7 +1545,7 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.FirstKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1653,7 +1642,6 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1668,7 +1656,7 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.FirstKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1766,7 +1754,6 @@ func TestStorageReader_WindowSumOffset(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1781,7 +1768,7 @@ func TestStorageReader_WindowSumOffset(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.SumKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1878,7 +1865,6 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -1893,7 +1879,7 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2018,7 +2004,6 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2034,7 +2019,7 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2162,7 +2147,6 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2178,7 +2162,7 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) {
storageflux.SumKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2306,7 +2290,6 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2322,7 +2305,7 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2411,7 +2394,6 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2428,7 +2410,7 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2517,7 +2499,6 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2534,7 +2515,7 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2627,7 +2608,6 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2642,7 +2622,7 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2824,7 +2804,7 @@ func TestStorageReader_ReadGroup(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{
@ -2917,7 +2897,6 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
for _, tt := range cases {
t.Run(tt.aggregate, func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2927,7 +2906,7 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
GroupMode: influxdb.GroupModeBy,
GroupKeys: []string{"t0"},
AggregateMethod: tt.aggregate,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2989,7 +2968,6 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
for _, tt := range cases {
t.Run(tt.aggregate, func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -2998,7 +2976,7 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
},
GroupMode: influxdb.GroupModeBy,
GroupKeys: []string{"t1"},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -3123,7 +3101,7 @@ func TestStorageReader_ReadWindowAggregateMonths(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
@ -3172,11 +3150,11 @@ func TestStorageReader_Backoff(t *testing.T) {
for _, tt := range []struct {
name string
read func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error)
read func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error)
}{
{
name: "ReadFilter",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{
Database: reader.Database,
RetentionPolicy: reader.RetentionPolicy,
@ -3186,7 +3164,7 @@ func TestStorageReader_Backoff(t *testing.T) {
},
{
name: "ReadGroup",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -3200,7 +3178,7 @@ func TestStorageReader_Backoff(t *testing.T) {
},
{
name: "ReadWindowAggregate",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: reader.Database,
@ -3222,7 +3200,7 @@ func TestStorageReader_Backoff(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// Read the table and learn what the maximum allocated
// value is. We don't want to exceed this.
mem := &memory.Allocator{}
mem := &memory.ResourceAllocator{}
tables, err := tt.read(reader, mem)
if err != nil {
t.Fatal(err)
@ -3249,7 +3227,7 @@ func TestStorageReader_Backoff(t *testing.T) {
// if the next buffer attempts to be allocated
// before the first.
limit := mem.MaxAllocated()
mem = &memory.Allocator{Limit: &limit}
mem = &memory.ResourceAllocator{Limit: &limit}
tables, err = tt.read(reader, mem)
if err != nil {
t.Fatal(err)
@ -3360,12 +3338,11 @@ func BenchmarkReadFilter(b *testing.B) {
return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr
}
benchmarkRead(b, setupFn, func(r *StorageReader) error {
mem := &memory.Allocator{}
tables, err := r.ReadFilter(context.Background(), influxdb.ReadFilterSpec{
Database: r.Database,
RetentionPolicy: r.RetentionPolicy,
Bounds: r.Bounds,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
return err
}
@ -3465,7 +3442,6 @@ func BenchmarkReadGroup(b *testing.B) {
return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr
}
benchmarkRead(b, setupFn, func(r *StorageReader) error {
mem := &memory.Allocator{}
tables, err := r.ReadGroup(context.Background(), influxdb.ReadGroupSpec{
ReadFilterSpec: influxdb.ReadFilterSpec{
Database: r.Database,
@ -3475,7 +3451,7 @@ func BenchmarkReadGroup(b *testing.B) {
GroupMode: influxdb.GroupModeBy,
GroupKeys: []string{"_start", "_stop", "t0"},
AggregateMethod: storageflux.MinKind,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
return err
}

View File

@ -32,7 +32,7 @@ build_test_harness() {
}
# Many tests targeting 3rd party databases are not yet supported in CI and should be filtered out.
DB_INTEGRATION_WRITE_TESTS=integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to
DB_INTEGRATION_WRITE_TESTS=integration_mqtt_pub,integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to
DB_INTEGRATION_READ_TESTS=integration_sqlite_read_from_seed,integration_sqlite_read_from_nonseed,integration_vertica_read_from_seed,integration_vertica_read_from_nonseed,integration_mssql_read_from_seed,integration_mssql_read_from_nonseed,integration_mariadb_read_from_seed,integration_mariadb_read_from_nonseed,integration_mysql_read_from_seed,integration_mysql_read_from_nonseed,integration_pg_read_from_seed,integration_pg_read_from_nonseed,integration_hdb_read_from_seed,integration_hdb_read_from_nonseed
DB_INTEGRATION_INJECTION_TESTS="integration_sqlite_injection,integration_hdb_injection,integration_pg_injection,integration_mysql_injection,integration_mariadb_injection,integration_mssql_injection"
DB_TESTS="${DB_INTEGRATION_WRITE_TESTS},${DB_INTEGRATION_READ_TESTS},${DB_INTEGRATION_INJECTION_TESTS}"