refactor(flux): convert the allocator into an interface (#23214)

This follows the changes from influxdata/flux#4539.
pull/23218/head
Jonathan A. Sternberg 2022-03-22 12:33:52 -05:00 committed by GitHub
parent 2c930fd127
commit 5e3ea7b94c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 156 additions and 182 deletions

View File

@ -6,10 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
context2 "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"html/template"
"io"
"io/ioutil"
@ -20,6 +16,11 @@ import (
"testing"
"time"
context2 "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/execute"
@ -810,11 +811,11 @@ func (s TestQueryProfiler) Name() string {
return fmt.Sprintf("query%d", s.start)
}
func (s TestQueryProfiler) GetSortedResult(q flux.Query, alloc *memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error) {
func (s TestQueryProfiler) GetSortedResult(q flux.Query, alloc memory.Allocator, desc bool, sortKeys ...string) (flux.Table, error) {
return nil, nil
}
func (s TestQueryProfiler) GetResult(q flux.Query, alloc *memory.Allocator) (flux.Table, error) {
func (s TestQueryProfiler) GetResult(q flux.Query, alloc memory.Allocator) (flux.Table, error) {
groupKey := execute.NewGroupKey(
[]flux.ColMeta{
{

View File

@ -32,7 +32,7 @@ build_test_harness() {
}
# Many tests targeting 3rd party databases are not yet supported in CI and should be filtered out.
DB_INTEGRATION_WRITE_TESTS=integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to
DB_INTEGRATION_WRITE_TESTS=integration_mqtt_pub,integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to
DB_INTEGRATION_READ_TESTS=integration_sqlite_read_from_seed,integration_sqlite_read_from_nonseed,integration_vertica_read_from_seed,integration_vertica_read_from_nonseed,integration_mssql_read_from_seed,integration_mssql_read_from_nonseed,integration_mariadb_read_from_seed,integration_mariadb_read_from_nonseed,integration_mysql_read_from_seed,integration_mysql_read_from_nonseed,integration_pg_read_from_seed,integration_pg_read_from_nonseed,integration_hdb_read_from_seed,integration_hdb_read_from_nonseed
DB_INTEGRATION_INJECTION_TESTS=integration_sqlite_injection,integration_hdb_injection,integration_pg_injection,integration_mysql_injection,integration_mariadb_injection,integration_mssql_injection
DB_TESTS="${DB_INTEGRATION_WRITE_TESTS},${DB_INTEGRATION_READ_TESTS},${DB_INTEGRATION_INJECTION_TESTS}"

2
go.mod
View File

@ -34,7 +34,7 @@ require (
github.com/hashicorp/vault/api v1.0.2
github.com/imdario/mergo v0.3.9 // indirect
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe
github.com/influxdata/flux v0.159.0
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839

4
go.sum
View File

@ -503,8 +503,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe h1:7j4SdN/BvQwN6WoUq7mv0kg5U9NhnFBxPGMafYRKym0=
github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/flux v0.159.0 h1:Vdq/3/NfO6xl2q6COr8c3rKuywMIRrfxVIRrXGHZq/Q=
github.com/influxdata/flux v0.159.0/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8=
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44 h1:Q6U31iE0HzFpzS+JRZRyFvp6TSDBU7bWpQ1N78yltCw=
github.com/influxdata/flux v0.159.1-0.20220322154400-5e19bfa74b44/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8=
github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1AjGWxxrE=
github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU=
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=

View File

@ -8,33 +8,33 @@ import (
)
type StorageReader struct {
ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadSeriesCardinalityFn func(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadFilterFn func(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error)
ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error)
ReadSeriesCardinalityFn func(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc memory.Allocator) (query.TableIterator, error)
SupportReadSeriesCardinalityFn func(ctx context.Context) bool
CloseFn func()
}
func (s *StorageReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadFilterFn(ctx, spec, alloc)
}
func (s *StorageReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadGroupFn(ctx, spec, alloc)
}
func (s *StorageReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadTagKeysFn(ctx, spec, alloc)
}
func (s *StorageReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadTagValuesFn(ctx, spec, alloc)
}
func (s *StorageReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadSeriesCardinalityFn(ctx, spec, alloc)
}
@ -51,6 +51,6 @@ func (s *StorageReader) Close() {
}
}
func (s *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
return s.ReadWindowAggregateFn(ctx, spec, alloc)
}

View File

@ -627,7 +627,7 @@ type Query struct {
compiler flux.Compiler
memoryManager *queryMemoryManager
alloc *memory.Allocator
alloc *memory.ResourceAllocator
}
func (q *Query) ProfilerResults() (flux.ResultIterator, error) {

View File

@ -41,7 +41,7 @@ var (
time.Sleep(time.Millisecond)
}
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
prev := time.Now()
for now := time.Now(); now.Equal(prev); now = time.Now() {
time.Sleep(time.Millisecond)
@ -213,7 +213,7 @@ func TestController_QueryRuntimeError(t *testing.T) {
time.Sleep(time.Millisecond)
}
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
prev := time.Now()
for now := time.Now(); now.Equal(prev); now = time.Now() {
time.Sleep(time.Millisecond)
@ -273,7 +273,7 @@ func TestController_QueryQueueError(t *testing.T) {
q, err := ctrl.Query(context.Background(), makeRequest(&mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Block until test is finished
<-done
},
@ -419,7 +419,7 @@ func TestController_ExecuteError(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) {
StartFn: func(ctx context.Context, alloc memory.Allocator) (*mock.Query, error) {
return nil, errors.New("expected error")
},
}, nil
@ -563,7 +563,7 @@ func TestController_StartPanic(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
StartFn: func(ctx context.Context, alloc *memory.Allocator) (i *mock.Query, e error) {
StartFn: func(ctx context.Context, alloc memory.Allocator) (i *mock.Query, e error) {
panic("panic during start step")
},
}, nil
@ -601,7 +601,7 @@ func TestController_ShutdownWithRunningQuery(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
close(executing)
<-ctx.Done()
@ -656,7 +656,7 @@ func TestController_ShutdownWithTimeout(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// This should just block until the end of the test
// when we perform cleanup.
close(executing)
@ -705,7 +705,7 @@ func TestController_PerQueryMemoryLimit(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
defer func() {
if err, ok := recover().(error); ok && err != nil {
q.SetErr(err)
@ -757,7 +757,7 @@ func TestController_ConcurrencyQuota(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
select {
case <-q.Canceled:
default:
@ -828,7 +828,7 @@ func TestController_QueueSize(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
executing <- struct{}{}
// Block until test is finished
<-done
@ -888,7 +888,7 @@ func TestController_CancelDone_Unlimited(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -933,7 +933,7 @@ func TestController_DoneWithoutRead_Unlimited(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -985,7 +985,7 @@ func TestController_CancelDone(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -1032,7 +1032,7 @@ func TestController_DoneWithoutRead(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Ensure the query takes a little bit of time so the cancel actually cancels something.
t := time.NewTimer(time.Second)
defer t.Stop()
@ -1088,7 +1088,7 @@ func TestController_Error_MaxMemory(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for i := 0; i < 16; i++ {
size := config.MemoryBytesQuotaPerQuery / 16
@ -1146,7 +1146,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
wellBehavedNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory until we hit our initial memory limit so we should
// never request more memory.
for amount := int64(0); amount < config.InitialMemoryBytesQuotaPerQuery; amount += 16 {
@ -1163,7 +1163,7 @@ func TestController_NoisyNeighbor(t *testing.T) {
noisyNeighbor := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for {
@ -1252,7 +1252,7 @@ func TestController_Error_NoRemainingMemory(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can until denied.
for size := int64(0); ; size += 16 {
if err := alloc.Account(16); err != nil {
@ -1298,7 +1298,7 @@ func TestController_MemoryRelease(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate some amount of memory and never release it.
if err := alloc.Account(int(config.MemoryBytesQuotaPerQuery) / 2); err != nil {
q.SetErr(err)
@ -1347,7 +1347,7 @@ func TestController_IrregularMemoryQuota(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to hit the memory limit.
for size := 0; size < 768; size += 16 {
if err := alloc.Account(16); err != nil {
@ -1406,7 +1406,7 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) {
compiler := &mock.Compiler{
CompileFn: func(ctx context.Context) (flux.Program, error) {
return &mock.Program{
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) {
ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) {
// Allocate memory continuously to use up what we can and be as noisy as possible.
// Turn up the stereo and party on.
for size := 0; size < 1024; size += 16 {

View File

@ -47,7 +47,7 @@ func (c *Controller) createAllocator(q *Query) {
m: c.memory,
limit: c.memory.initialBytesQuotaPerQuery,
}
q.alloc = &memory.Allocator{
q.alloc = &memory.ResourceAllocator{
// Use an anonymous function to ensure the value is copied.
Limit: func(v int64) *int64 { return &v }(q.memoryManager.limit),
Manager: q.memoryManager,

View File

@ -39,7 +39,7 @@ type BucketsDecoder struct {
orgID platform2.ID
deps BucketDependencies
buckets []*platform.Bucket
alloc *memory.Allocator
alloc memory.Allocator
}
func (bd *BucketsDecoder) Connect(ctx context.Context) error {

View File

@ -220,9 +220,9 @@ type seriesCardinalityReader struct {
}
func (s seriesCardinalityReader) Read(ctx context.Context, f func(flux.Table) error, mem arrowmemory.Allocator) error {
alloc, ok := mem.(*memory.Allocator)
alloc, ok := mem.(memory.Allocator)
if !ok {
alloc = &memory.Allocator{
alloc = &memory.ResourceAllocator{
Allocator: mem,
}
}

View File

@ -34,7 +34,7 @@ type Source struct {
id execute.DatasetID
ts []execute.Transformation
alloc *memory.Allocator
alloc memory.Allocator
stats cursors.CursorStats
runner runner

View File

@ -37,27 +37,27 @@ func (mockTableIterator) Statistics() cursors.CursorStats {
type mockReader struct {
}
func (mockReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
func (mockReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
func (mockReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
func (mockReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
func (mockReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
func (mockReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (mockReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc memory.Allocator) (query.TableIterator, error) {
return &mockTableIterator{}, nil
}
@ -89,8 +89,8 @@ func (a mockAdministration) Bounds() *execute.Bounds {
return a.StreamBounds
}
func (mockAdministration) Allocator() *memory.Allocator {
return &memory.Allocator{}
func (mockAdministration) Allocator() memory.Allocator {
return memory.DefaultAllocator
}
func (mockAdministration) Parents() []execute.DatasetID {
@ -222,7 +222,7 @@ func TestReadWindowAggregateSource(t *testing.T) {
},
}
reader := &mock.StorageReader{
ReadWindowAggregateFn: func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
ReadWindowAggregateFn: func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
if want, got := orgID, spec.OrganizationID; want != got {
t.Errorf("unexpected organization id -want/+got:\n\t- %s\n\t+ %s", want, got)
}

View File

@ -43,7 +43,7 @@ type DatabasesDecoder struct {
orgID platform2.ID
deps *DatabasesDependencies
databases []*platform.DBRPMapping
alloc *memory.Allocator
alloc memory.Allocator
}
func (bd *DatabasesDecoder) Connect(ctx context.Context) error {

View File

@ -15,14 +15,14 @@ import (
// StorageReader is an interface for reading tables from the storage subsystem.
type StorageReader interface {
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error)
ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error)
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error)
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc memory.Allocator) (TableIterator, error)
ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc memory.Allocator) (TableIterator, error)
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc memory.Allocator) (TableIterator, error)
ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error)
ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc memory.Allocator) (TableIterator, error)
ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc memory.Allocator) (TableIterator, error)
ReadSeriesCardinality(ctx context.Context, spec ReadSeriesCardinalitySpec, alloc *memory.Allocator) (TableIterator, error)
ReadSeriesCardinality(ctx context.Context, spec ReadSeriesCardinalitySpec, alloc memory.Allocator) (TableIterator, error)
SupportReadSeriesCardinality(ctx context.Context) bool
Close()

View File

@ -62,7 +62,7 @@ func NewReader(s storage.Store) query.StorageReader {
return &storeReader{s: s}
}
func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &filterIterator{
ctx: ctx,
s: r.s,
@ -72,7 +72,7 @@ func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec,
}, nil
}
func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &groupIterator{
ctx: ctx,
s: r.s,
@ -82,7 +82,7 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, a
}, nil
}
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &windowAggregateIterator{
ctx: ctx,
s: r.s,
@ -92,7 +92,7 @@ func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWi
}, nil
}
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &tagKeysIterator{
ctx: ctx,
bounds: spec.Bounds,
@ -103,7 +103,7 @@ func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpe
}, nil
}
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error) {
return &tagValuesIterator{
ctx: ctx,
bounds: spec.Bounds,
@ -114,7 +114,7 @@ func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValue
}, nil
}
func (r *storeReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *storeReader) ReadSeriesCardinality(ctx context.Context, spec query.ReadSeriesCardinalitySpec, alloc memory.Allocator) (query.TableIterator, error) {
return &seriesCardinalityIterator{
ctx: ctx,
bounds: spec.Bounds,
@ -137,7 +137,7 @@ type filterIterator struct {
spec query.ReadFilterSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
@ -254,7 +254,7 @@ type groupIterator struct {
spec query.ReadGroupSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
@ -629,7 +629,7 @@ type windowAggregateIterator struct {
spec query.ReadWindowAggregateSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats }
@ -864,7 +864,7 @@ type tagKeysIterator struct {
s storage.Store
readSpec query.ReadTagKeysSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
alloc memory.Allocator
}
func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
@ -949,7 +949,7 @@ type tagValuesIterator struct {
s storage.Store
readSpec query.ReadTagValuesSpec
predicate *datatypes.Predicate
alloc *memory.Allocator
alloc memory.Allocator
}
func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
@ -1026,7 +1026,7 @@ type seriesCardinalityIterator struct {
s storage.Store
readSpec query.ReadSeriesCardinalitySpec
predicate *datatypes.Predicate
alloc *memory.Allocator
alloc memory.Allocator
stats cursors.CursorStats
}

View File

@ -33,7 +33,7 @@ type floatTable struct {
table
mu sync.Mutex
cur cursors.FloatArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newFloatTable(
@ -45,7 +45,7 @@ func newFloatTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatTable {
t := &floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -129,7 +129,7 @@ func newFloatWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatWindowTable {
t := &floatWindowTable{
floatTable: floatTable{
@ -340,7 +340,7 @@ func newFloatWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatWindowSelectorTable {
t := &floatWindowSelectorTable{
floatTable: floatTable{
@ -441,7 +441,7 @@ func newFloatEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -677,7 +677,7 @@ func newFloatGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *floatGroupTable {
t := &floatGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1014,7 +1014,7 @@ type integerTable struct {
table
mu sync.Mutex
cur cursors.IntegerArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newIntegerTable(
@ -1026,7 +1026,7 @@ func newIntegerTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerTable {
t := &integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1111,7 +1111,7 @@ func newIntegerWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerWindowTable {
t := &integerWindowTable{
integerTable: integerTable{
@ -1323,7 +1323,7 @@ func newIntegerWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerWindowSelectorTable {
t := &integerWindowSelectorTable{
integerTable: integerTable{
@ -1424,7 +1424,7 @@ func newIntegerEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -1660,7 +1660,7 @@ func newIntegerGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *integerGroupTable {
t := &integerGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -1998,7 +1998,7 @@ type unsignedTable struct {
table
mu sync.Mutex
cur cursors.UnsignedArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newUnsignedTable(
@ -2010,7 +2010,7 @@ func newUnsignedTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedTable {
t := &unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -2094,7 +2094,7 @@ func newUnsignedWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedWindowTable {
t := &unsignedWindowTable{
unsignedTable: unsignedTable{
@ -2305,7 +2305,7 @@ func newUnsignedWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedWindowSelectorTable {
t := &unsignedWindowSelectorTable{
unsignedTable: unsignedTable{
@ -2406,7 +2406,7 @@ func newUnsignedEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -2642,7 +2642,7 @@ func newUnsignedGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *unsignedGroupTable {
t := &unsignedGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -2979,7 +2979,7 @@ type stringTable struct {
table
mu sync.Mutex
cur cursors.StringArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newStringTable(
@ -2991,7 +2991,7 @@ func newStringTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringTable {
t := &stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -3075,7 +3075,7 @@ func newStringWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringWindowTable {
t := &stringWindowTable{
stringTable: stringTable{
@ -3286,7 +3286,7 @@ func newStringWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringWindowSelectorTable {
t := &stringWindowSelectorTable{
stringTable: stringTable{
@ -3387,7 +3387,7 @@ func newStringEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -3623,7 +3623,7 @@ func newStringGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *stringGroupTable {
t := &stringGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -3904,7 +3904,7 @@ type booleanTable struct {
table
mu sync.Mutex
cur cursors.BooleanArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func newBooleanTable(
@ -3916,7 +3916,7 @@ func newBooleanTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanTable {
t := &booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -4000,7 +4000,7 @@ func newBooleanWindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanWindowTable {
t := &booleanWindowTable{
booleanTable: booleanTable{
@ -4211,7 +4211,7 @@ func newBooleanWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanWindowSelectorTable {
t := &booleanWindowSelectorTable{
booleanTable: booleanTable{
@ -4312,7 +4312,7 @@ func newBooleanEmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -4548,7 +4548,7 @@ func newBooleanGroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *booleanGroupTable {
t := &booleanGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),

View File

@ -28,7 +28,7 @@ type {{.name}}Table struct {
table
mu sync.Mutex
cur cursors.{{.Name}}ArrayCursor
alloc *memory.Allocator
alloc memory.Allocator
}
func new{{.Name}}Table(
@ -40,7 +40,7 @@ func new{{.Name}}Table(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
@ -125,7 +125,7 @@ func new{{.Name}}WindowTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}WindowTable {
t := &{{.name}}WindowTable{
{{.name}}Table: {{.name}}Table{
@ -337,7 +337,7 @@ func new{{.Name}}WindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}WindowSelectorTable {
t := &{{.name}}WindowSelectorTable{
{{.name}}Table: {{.name}}Table{
@ -438,7 +438,7 @@ func new{{.Name}}EmptyWindowSelectorTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}EmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
@ -674,7 +674,7 @@ func new{{.Name}}GroupTable(
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),

View File

@ -33,7 +33,7 @@ type table struct {
cancelled, used int32
cache *tagsCache
alloc *memory.Allocator
alloc memory.Allocator
}
func newTable(
@ -43,7 +43,7 @@ func newTable(
cols []flux.ColMeta,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
alloc memory.Allocator,
) table {
return table{
done: done,

View File

@ -195,7 +195,7 @@ func NewStorageReader(tb testing.TB, setupFn SetupFunc) *StorageReader {
}
}
func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) {
return r.StorageReader.ReadWindowAggregate(ctx, spec, alloc)
}
@ -215,7 +215,7 @@ func TestStorageReader_ReadFilter(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
@ -291,11 +291,11 @@ func TestStorageReader_Table(t *testing.T) {
for _, tc := range []struct {
name string
newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator
newFn func(ctx context.Context, alloc memory.Allocator) flux.TableIterator
}{
{
name: "ReadFilter",
newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator {
newFn: func(ctx context.Context, alloc memory.Allocator) flux.TableIterator {
ti, err := reader.ReadFilter(context.Background(), query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
@ -447,7 +447,7 @@ func TestStorageReader_ReadWindowAggregate(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
@ -547,7 +547,6 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) {
},
},
} {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -562,7 +561,7 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -646,7 +645,6 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -661,7 +659,7 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -830,7 +828,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -845,7 +842,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) {
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -930,7 +927,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -946,7 +942,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) {
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1031,7 +1027,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T)
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1047,7 +1042,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T)
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1132,7 +1127,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStopTime(t *tes
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1149,7 +1143,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStopTime(t *tes
},
CreateEmpty: true,
ForceAggregate: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1234,7 +1228,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStartTime(t *te
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1251,7 +1244,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStartTime(t *te
},
CreateEmpty: true,
ForceAggregate: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1369,7 +1362,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1386,7 +1378,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1504,7 +1496,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1522,7 +1513,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing
tt.aggregate,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1578,7 +1569,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
defer reader.Close()
t.Run("unwindowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1592,7 +1582,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1611,7 +1601,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
})
t.Run("windowed mean", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1625,7 +1614,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1671,7 +1660,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
})
t.Run("windowed mean with offset", func(t *testing.T) {
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1686,7 +1674,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.MeanKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1780,7 +1768,6 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1794,7 +1781,7 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.FirstKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -1891,7 +1878,6 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -1906,7 +1892,7 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.FirstKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2004,7 +1990,6 @@ func TestStorageReader_WindowSumOffset(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2019,7 +2004,7 @@ func TestStorageReader_WindowSumOffset(t *testing.T) {
Aggregates: []plan.ProcedureKind{
storageflux.SumKind,
},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2116,7 +2101,6 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2131,7 +2115,7 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2256,7 +2240,6 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2272,7 +2255,7 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2400,7 +2383,6 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2416,7 +2398,7 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) {
storageflux.SumKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2544,7 +2526,6 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2560,7 +2541,7 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2649,7 +2630,6 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2666,7 +2646,7 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2755,7 +2735,6 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2772,7 +2751,7 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) {
},
CreateEmpty: true,
TimeColumn: execute.DefaultStopColLabel,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -2865,7 +2844,6 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {
})
defer reader.Close()
mem := &memory.Allocator{}
ti, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -2880,7 +2858,7 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {
storageflux.FirstKind,
},
CreateEmpty: true,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -3098,7 +3076,7 @@ func TestStorageReader_ReadGroup(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
@ -3229,7 +3207,6 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
for _, tt := range cases {
t.Run(tt.aggregate, func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -3239,7 +3216,7 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t0"},
AggregateMethod: tt.aggregate,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -3301,7 +3278,6 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
for _, tt := range cases {
t.Run("", func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -3310,7 +3286,7 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t1"},
}, mem)
}, memory.DefaultAllocator)
if err != nil {
t.Fatal(err)
}
@ -3434,7 +3410,7 @@ func TestStorageReader_ReadWindowAggregateMonths(t *testing.T) {
mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator)
defer mem.AssertSize(t, 0)
alloc := &memory.Allocator{
alloc := &memory.ResourceAllocator{
Allocator: mem,
}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
@ -3482,11 +3458,11 @@ func TestStorageReader_Backoff(t *testing.T) {
for _, tt := range []struct {
name string
read func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error)
read func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error)
}{
{
name: "ReadFilter",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadFilter(context.Background(), query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
@ -3496,7 +3472,7 @@ func TestStorageReader_Backoff(t *testing.T) {
},
{
name: "ReadGroup",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -3510,7 +3486,7 @@ func TestStorageReader_Backoff(t *testing.T) {
},
{
name: "ReadWindowAggregate",
read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) {
read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) {
return reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
@ -3532,7 +3508,7 @@ func TestStorageReader_Backoff(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// Read the table and learn what the maximum allocated
// value is. We don't want to exceed this.
mem := &memory.Allocator{}
mem := &memory.ResourceAllocator{}
tables, err := tt.read(reader, mem)
if err != nil {
t.Fatal(err)
@ -3559,7 +3535,7 @@ func TestStorageReader_Backoff(t *testing.T) {
// if the next buffer attempts to be allocated
// before the first.
limit := mem.MaxAllocated()
mem = &memory.Allocator{Limit: &limit}
mem = &memory.ResourceAllocator{Limit: &limit}
tables, err = tt.read(reader, mem)
if err != nil {
t.Fatal(err)
@ -3670,12 +3646,11 @@ func BenchmarkReadFilter(b *testing.B) {
return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr
}
benchmarkRead(b, setupFn, func(r *StorageReader) error {
mem := &memory.Allocator{}
tables, err := r.ReadFilter(context.Background(), query.ReadFilterSpec{
OrganizationID: r.Org,
BucketID: r.Bucket,
Bounds: r.Bounds,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
return err
}
@ -3775,7 +3750,6 @@ func BenchmarkReadGroup(b *testing.B) {
return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr
}
benchmarkRead(b, setupFn, func(r *StorageReader) error {
mem := &memory.Allocator{}
tables, err := r.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: r.Org,
@ -3785,7 +3759,7 @@ func BenchmarkReadGroup(b *testing.B) {
GroupMode: query.GroupModeBy,
GroupKeys: []string{"_start", "_stop", "t0"},
AggregateMethod: storageflux.MinKind,
}, mem)
}, memory.DefaultAllocator)
if err != nil {
return err
}

View File

@ -216,8 +216,7 @@ func newFakeResult() *fakeResult {
meta := []flux.ColMeta{{Label: "x", Type: flux.TInt}}
vals := []values.Value{values.NewInt(int64(1))}
gk := execute.NewGroupKey(meta, vals)
a := &memory.Allocator{}
b := execute.NewColListTableBuilder(gk, a)
b := execute.NewColListTableBuilder(gk, memory.DefaultAllocator)
i, _ := b.AddCol(meta[0])
b.AppendInt(i, int64(1))
t, err := b.Table()