657 lines
21 KiB
Go
657 lines
21 KiB
Go
package launcher_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
nethttp "net/http"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
|
"github.com/influxdata/influxdb/v2/http"
|
|
"github.com/influxdata/influxdb/v2/tsdb"
|
|
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestStorage_WriteAndQuery(t *testing.T) {
|
|
l := launcher.NewTestLauncher()
|
|
l.RunOrFail(t, ctx)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
|
|
User: "USER-1",
|
|
Password: "PASSWORD-1",
|
|
Org: "ORG-01",
|
|
Bucket: "BUCKET",
|
|
})
|
|
org2 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
|
|
User: "USER-2",
|
|
Password: "PASSWORD-1",
|
|
Org: "ORG-02",
|
|
Bucket: "BUCKET",
|
|
})
|
|
|
|
// Execute single write against the server.
|
|
l.WriteOrFail(t, org1, `m,k=v1 f=100i 946684800000000000`)
|
|
l.WriteOrFail(t, org2, `m,k=v2 f=200i 946684800000000000`)
|
|
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
|
|
|
|
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n\r\n"
|
|
if got := l.FluxQueryOrFail(t, org1.Org, org1.Auth.Token, qs); !cmp.Equal(got, exp) {
|
|
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
|
|
exp = `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n\r\n"
|
|
if got := l.FluxQueryOrFail(t, org2.Org, org2.Auth.Token, qs); !cmp.Equal(got, exp) {
|
|
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
|
|
}
|
|
}
|
|
|
|
// Ensure the server will write all points possible with exception of
|
|
// - field type conflict
|
|
// - field too large
|
|
func TestStorage_PartialWrite(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Initial write of integer.
|
|
l.WritePointsOrFail(t, `cpu value=1i 946684800000000000`)
|
|
|
|
// Write mixed-field types.
|
|
err := l.WritePoints("cpu value=2i 946684800000000001\ncpu value=3 946684800000000002\ncpu value=4i 946684800000000003")
|
|
require.Error(t, err)
|
|
|
|
// Write oversized field value.
|
|
err = l.WritePoints(fmt.Sprintf(`cpu str="%s" 946684800000000004`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))
|
|
require.Error(t, err)
|
|
|
|
// Write biggest field value.
|
|
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000005`, strings.Repeat("a", tsdb.MaxFieldValueLength)))
|
|
|
|
// Ensure the valid points were written.
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_time","_field"])`
|
|
|
|
exp := `,result,table,_time,_field` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00.000000005Z,str` + "\r\n" + // str=max-length string
|
|
`,_result,1,2000-01-01T00:00:00Z,value` + "\r\n" + // value=1
|
|
`,_result,1,2000-01-01T00:00:00.000000001Z,value` + "\r\n" + // value=2
|
|
`,_result,1,2000-01-01T00:00:00.000000003Z,value` + "\r\n\r\n" // value=4
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp, string(buf))
|
|
}
|
|
|
|
func TestStorage_DisableMaxFieldValueSize(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t, func(o *launcher.InfluxdOpts) {
|
|
o.StorageConfig.Data.SkipFieldSizeValidation = true
|
|
})
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Write a normally-oversized field value.
|
|
l.WritePointsOrFail(t, fmt.Sprintf(`cpu str="%s" 946684800000000000`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)))
|
|
|
|
// Check that the point can be queried.
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z) |> keep(columns: ["_value"])`
|
|
exp := `,result,table,_value` + "\r\n" +
|
|
fmt.Sprintf(`,_result,0,%s`, strings.Repeat("a", tsdb.MaxFieldValueLength+1)) + "\r\n\r\n"
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp, string(buf))
|
|
}
|
|
|
|
func TestLauncher_WriteAndQuery(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Execute single write against the server.
|
|
resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID), `m,k=v f=100i 946684800000000000`))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if resp.StatusCode != nethttp.StatusNoContent {
|
|
t.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
|
|
}
|
|
|
|
// Query server to ensure write persists.
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
|
|
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error querying server: %v", err)
|
|
}
|
|
if diff := cmp.Diff(string(buf), exp); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
}
|
|
|
|
func TestLauncher_BucketDelete(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Execute single write against the server.
|
|
resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID), `m,k=v f=100i 946684800000000000`))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if resp.StatusCode != nethttp.StatusNoContent {
|
|
t.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
|
|
}
|
|
|
|
// Query server to ensure write persists.
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
|
|
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v` + "\r\n\r\n"
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error querying server: %v", err)
|
|
}
|
|
if diff := cmp.Diff(string(buf), exp); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
|
|
// Verify the cardinality in the engine.
|
|
engine := l.Launcher.Engine()
|
|
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(1); got != exp {
|
|
t.Fatalf("got %d, exp %d", got, exp)
|
|
}
|
|
|
|
// Delete the bucket.
|
|
if resp, err = nethttp.DefaultClient.Do(l.MustNewHTTPRequest("DELETE", fmt.Sprintf("/api/v2/buckets/%s", l.Bucket.ID), "")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if body, err = io.ReadAll(resp.Body); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if resp.StatusCode != nethttp.StatusNoContent {
|
|
t.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
|
|
}
|
|
|
|
// Verify that the data has been removed from the storage engine.
|
|
if got, exp := engine.SeriesCardinality(ctx, l.Bucket.ID), int64(0); got != exp {
|
|
t.Fatalf("after bucket delete got %d, exp %d", got, exp)
|
|
}
|
|
|
|
databaseInfo := engine.MetaClient().Database(l.Bucket.ID.String())
|
|
assert.Nil(t, databaseInfo)
|
|
}
|
|
|
|
func TestLauncher_DeleteWithPredicate(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Write data to server.
|
|
if resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID),
|
|
"cpu,region=us-east-1 v=1 946684800000000000\n"+
|
|
"cpu,region=us-west-1 v=1 946684800000000000\n"+
|
|
"mem,region=us-west-1 v=1 946684800000000000\n",
|
|
)); err != nil {
|
|
t.Fatal(err)
|
|
} else if err := resp.Body.Close(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Execute single write against the server.
|
|
s := http.DeleteService{
|
|
Addr: l.URL().String(),
|
|
Token: l.Auth.Token,
|
|
}
|
|
if err := s.DeleteBucketRangePredicate(context.Background(), http.DeleteRequest{
|
|
OrgID: l.Org.ID.String(),
|
|
BucketID: l.Bucket.ID.String(),
|
|
Start: "2000-01-01T00:00:00Z",
|
|
Stop: "2000-01-02T00:00:00Z",
|
|
Predicate: `_measurement="cpu" AND region="us-west-1"`,
|
|
}); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// Query server to ensure write persists.
|
|
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
|
|
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,region` + "\r\n" +
|
|
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,cpu,us-east-1` + "\r\n" +
|
|
`,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,mem,us-west-1` + "\r\n\r\n"
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
|
|
if err != nil {
|
|
t.Fatalf("unexpected error querying server: %v", err)
|
|
} else if diff := cmp.Diff(string(buf), exp); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
}
|
|
|
|
func TestLauncher_FluxCardinality(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
// Run a query without any data on the server - should return 0 and not crash.
|
|
query := `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => true
|
|
)`
|
|
|
|
exp := `,result,table,_value` + "\r\n" +
|
|
`,_result,0,0` + "\r\n\r\n"
|
|
|
|
body, err := http.SimpleQuery(l.URL(), query, l.Org.Name, l.Auth.Token)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp, string(body))
|
|
|
|
// Write data to server.
|
|
resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID),
|
|
"cpu,region=us-east-1 v=1 946684800000000000\n"+
|
|
"cpu,region=us-west-1 v=1 946684800000000000\n"+
|
|
"mem,region=us-west-1 v=1 946684800000000000\n"+
|
|
"mem,region=us-south-1 v=2 996684800000000000\n",
|
|
))
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
|
|
// Specific time values for tests bracketing shards with time ranges
|
|
mc := l.Engine().MetaClient()
|
|
sgs, err := mc.ShardGroupsByTimeRange(l.Bucket.ID.String(), meta.DefaultRetentionPolicyName, time.Unix(0, 946684800000000000), time.Unix(0, 996684800000000000))
|
|
require.NoError(t, err)
|
|
require.Equal(t, 2, len(sgs))
|
|
|
|
sg1Start := sgs[0].StartTime
|
|
sg2End := sgs[1].EndTime
|
|
sg2Start := sgs[1].StartTime
|
|
preSg1Start := sg1Start.Add(-1 * time.Minute)
|
|
|
|
lastPoint := time.Unix(0, 996684800000000000)
|
|
// a point in the middle of the later shard group, after the data but before
|
|
// the end of the group
|
|
afterLastPoint := lastPoint.Add(1 * time.Minute)
|
|
require.True(t, afterLastPoint.Before(sg2End))
|
|
require.True(t, afterLastPoint.After(sg2Start))
|
|
|
|
// similar, but before the data
|
|
beforeLastPoint := lastPoint.Add(-1 * time.Minute)
|
|
require.True(t, beforeLastPoint.Before(sg2End))
|
|
require.True(t, beforeLastPoint.After(sg2Start))
|
|
|
|
tests := []struct {
|
|
name string
|
|
query string
|
|
exp string
|
|
}{
|
|
{
|
|
name: "boolean literal predicate - true",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => true
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,3` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "boolean literal predicate - false",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => false
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,0` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "nil predicate",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,3` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "nil predicate with large time range",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 1990-01-01T00:00:00Z,
|
|
stop: 2010-01-01T00:00:00Z,
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,4` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "single measurement match",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => r._measurement == "cpu"
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,2` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "multiple measurement match",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => r._measurement == "cpu" or r._measurement == "mem"
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,3` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "predicate matches nothing",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-01-01T00:00:00Z,
|
|
stop: 2000-01-02T00:00:00Z,
|
|
predicate: (r) => r._measurement == "cpu" and r._measurement == "mem"
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,0` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "time range matches nothing",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 2000-04-01T00:00:00Z,
|
|
stop: 2000-05-02T00:00:00Z,
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,0` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "large time range - all shards are within the window",
|
|
query: `import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 1990-01-01T00:00:00Z,
|
|
stop: 2010-01-01T00:00:00Z,
|
|
predicate: (r) => r._measurement == "cpu"
|
|
)`,
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,2` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "start range is inclusive",
|
|
query: fmt.Sprintf(`import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: %s,
|
|
stop: 2010-01-01T00:00:00Z,
|
|
predicate: (r) => r._measurement == "mem"
|
|
)`, time.Unix(0, 946684800000000000).Format(time.RFC3339Nano),
|
|
),
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,2` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "stop range is exclusive",
|
|
query: fmt.Sprintf(`import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: 1990-01-01T00:00:00Z,
|
|
stop: %s,
|
|
predicate: (r) => r._measurement == "mem"
|
|
)`, lastPoint.Format(time.RFC3339Nano),
|
|
),
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,1` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "one shard is entirely in the time range, other is partially, range includes data in partial shard",
|
|
query: fmt.Sprintf(`import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: %s,
|
|
stop: %s,
|
|
predicate: (r) => r._measurement == "mem"
|
|
)`, preSg1Start.Format(time.RFC3339Nano), afterLastPoint.Format(time.RFC3339Nano),
|
|
),
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,2` + "\r\n\r\n",
|
|
},
|
|
{
|
|
name: "one shard is entirely in the time range, other is partially, range does not include data in partial shard",
|
|
query: fmt.Sprintf(`import "influxdata/influxdb"
|
|
influxdb.cardinality(
|
|
bucket: "BUCKET",
|
|
start: %s,
|
|
stop: %s,
|
|
predicate: (r) => r._measurement == "mem"
|
|
)`, preSg1Start.Format(time.RFC3339Nano), beforeLastPoint.Format(time.RFC3339Nano),
|
|
),
|
|
exp: `,result,table,_value` + "\r\n" +
|
|
`,_result,0,1` + "\r\n\r\n",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
body, err := http.SimpleQuery(l.URL(), tt.query, l.Org.Name, l.Auth.Token)
|
|
require.NoError(t, err)
|
|
require.Equal(t, tt.exp, string(body))
|
|
}
|
|
}
|
|
|
|
func TestLauncher_UpdateRetentionPolicy(t *testing.T) {
|
|
durPtr := func(d time.Duration) *time.Duration {
|
|
return &d
|
|
}
|
|
|
|
testCases := []struct {
|
|
name string
|
|
initRp time.Duration
|
|
initSgd time.Duration
|
|
derivedSgd *time.Duration
|
|
newRp *time.Duration
|
|
newSgd *time.Duration
|
|
expectInitErr bool
|
|
expectUpdateErr bool
|
|
}{
|
|
{
|
|
name: "infinite to 1w",
|
|
derivedSgd: durPtr(humanize.Week),
|
|
newRp: durPtr(humanize.Week),
|
|
},
|
|
{
|
|
name: "1w to 1d",
|
|
initRp: humanize.Week,
|
|
derivedSgd: durPtr(humanize.Day),
|
|
newRp: durPtr(humanize.Day),
|
|
},
|
|
{
|
|
name: "1d to 1h",
|
|
initRp: humanize.Day,
|
|
derivedSgd: durPtr(time.Hour),
|
|
newRp: durPtr(time.Hour),
|
|
},
|
|
{
|
|
name: "infinite, update shard duration",
|
|
initSgd: humanize.Month,
|
|
derivedSgd: durPtr(humanize.Month),
|
|
newSgd: durPtr(humanize.Week),
|
|
},
|
|
{
|
|
name: "1w, update shard duration",
|
|
initRp: humanize.Week,
|
|
initSgd: humanize.Week,
|
|
newSgd: durPtr(time.Hour),
|
|
},
|
|
{
|
|
name: "1d, update shard duration",
|
|
initRp: humanize.Day,
|
|
initSgd: 3 * time.Hour,
|
|
newSgd: durPtr(1*time.Hour + 30*time.Minute),
|
|
},
|
|
{
|
|
name: "infinite, update both retention and shard duration",
|
|
derivedSgd: durPtr(humanize.Week),
|
|
newRp: durPtr(time.Hour),
|
|
newSgd: durPtr(time.Hour),
|
|
},
|
|
{
|
|
name: "init shard duration larger than RP",
|
|
initRp: time.Hour,
|
|
initSgd: humanize.Day,
|
|
expectInitErr: true,
|
|
},
|
|
{
|
|
name: "updated shard duration larger than RP",
|
|
initRp: humanize.Day,
|
|
initSgd: time.Hour,
|
|
newSgd: durPtr(humanize.Week),
|
|
expectUpdateErr: true,
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
bucketService := l.BucketService(t)
|
|
|
|
bucket := &influxdb.Bucket{
|
|
OrgID: l.Org.ID,
|
|
RetentionPeriod: tc.initRp,
|
|
ShardGroupDuration: tc.initSgd,
|
|
}
|
|
err := bucketService.CreateBucket(ctx, bucket)
|
|
if tc.expectInitErr {
|
|
require.Error(t, err)
|
|
return
|
|
}
|
|
require.NoError(t, err)
|
|
defer bucketService.DeleteBucket(ctx, bucket.ID)
|
|
|
|
bucket, err = bucketService.FindBucketByID(ctx, bucket.ID)
|
|
require.NoError(t, err)
|
|
|
|
expectedSgd := tc.initSgd
|
|
if tc.derivedSgd != nil {
|
|
expectedSgd = *tc.derivedSgd
|
|
}
|
|
require.Equal(t, tc.initRp, bucket.RetentionPeriod)
|
|
require.Equal(t, expectedSgd, bucket.ShardGroupDuration)
|
|
|
|
bucket, err = bucketService.UpdateBucket(ctx, bucket.ID, influxdb.BucketUpdate{
|
|
RetentionPeriod: tc.newRp,
|
|
ShardGroupDuration: tc.newSgd,
|
|
})
|
|
if tc.expectUpdateErr {
|
|
require.Error(t, err)
|
|
return
|
|
}
|
|
require.NoError(t, err)
|
|
|
|
bucket, err = bucketService.FindBucketByID(ctx, bucket.ID)
|
|
require.NoError(t, err)
|
|
|
|
expectedRp := tc.initRp
|
|
if tc.newRp != nil {
|
|
expectedRp = *tc.newRp
|
|
}
|
|
if tc.newSgd != nil {
|
|
expectedSgd = *tc.newSgd
|
|
}
|
|
require.Equal(t, expectedRp, bucket.RetentionPeriod)
|
|
require.Equal(t, expectedSgd, bucket.ShardGroupDuration)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLauncher_OverlappingShards(t *testing.T) {
|
|
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
|
|
defer l.ShutdownOrFail(t, ctx)
|
|
|
|
bkt := influxdb.Bucket{Name: "test", ShardGroupDuration: time.Hour, OrgID: l.Org.ID}
|
|
require.NoError(t, l.BucketService(t).CreateBucket(ctx, &bkt))
|
|
|
|
req := l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
|
|
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\n")
|
|
resp, err := nethttp.DefaultClient.Do(req)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
|
|
newDur := humanize.Day
|
|
_, err = l.BucketService(t).UpdateBucket(ctx, bkt.ID, influxdb.BucketUpdate{ShardGroupDuration: &newDur})
|
|
require.NoError(t, err)
|
|
|
|
req = l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
|
|
// NOTE: The 3rd point's timestamp is chronologically earlier than the other two points, but it
|
|
// must come after the others in the request to trigger the overlapping-shard bug. If it comes
|
|
// first in the request, the bug is avoided because:
|
|
// 1. The point-writer sees there is no shard for the earlier point, and creates a new 24h shard-group
|
|
// 2. The new 24 group covers the timestamps of the remaining 2 points, so the writer doesn't bother looking
|
|
// for existing shards that also cover the timestamp
|
|
// 3. With only 1 shard mapped to the 3 points, there is no overlap to trigger the bug
|
|
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\nm,s=1 n=1 1626412920000000000\n")
|
|
resp, err = nethttp.DefaultClient.Do(req)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
|
|
query := `from(bucket:"test") |> range(start:2000-01-01T00:00:00Z,stop:2050-01-01T00:00:00Z)` +
|
|
` |> drop(columns:["_start","_stop"])`
|
|
exp := `,result,table,_time,_value,_field,_measurement,s` + "\r\n" +
|
|
`,_result,0,2021-07-16T06:22:00Z,0,n,m,0` + "\r\n" +
|
|
`,_result,0,2021-07-16T07:22:00Z,1,n,m,0` + "\r\n" +
|
|
`,_result,1,2021-07-16T05:22:00Z,1,n,m,1` + "\r\n\r\n"
|
|
|
|
buf, err := http.SimpleQuery(l.URL(), query, l.Org.Name, l.Auth.Token)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp, string(buf))
|
|
}
|