365 lines
10 KiB
Go
365 lines
10 KiB
Go
package upgrade
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"testing"
|
|
|
|
"github.com/BurntSushi/toml"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/influxdata/influx-cli/v2/clients"
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/bolt"
|
|
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
|
"github.com/influxdata/influxdb/v2/internal/testutil"
|
|
"github.com/influxdata/influxdb/v2/kit/cli"
|
|
"github.com/influxdata/influxdb/v2/v1/services/meta"
|
|
"github.com/spf13/cobra"
|
|
"github.com/spf13/viper"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zaptest"
|
|
)
|
|
|
|
func TestPathValidations(t *testing.T) {
|
|
tmpdir := t.TempDir()
|
|
|
|
v1Dir := filepath.Join(tmpdir, "v1db")
|
|
v2Dir := filepath.Join(tmpdir, "v2db")
|
|
|
|
boltPath := filepath.Join(v2Dir, bolt.DefaultFilename)
|
|
configsPath := filepath.Join(v2Dir, "configs")
|
|
enginePath := filepath.Join(v2Dir, "engine")
|
|
|
|
err := os.MkdirAll(filepath.Join(enginePath, "db"), 0777)
|
|
require.Nil(t, err)
|
|
|
|
sourceOpts := &optionsV1{
|
|
dbDir: v1Dir,
|
|
configFile: "",
|
|
}
|
|
sourceOpts.populateDirs()
|
|
|
|
targetOpts := &optionsV2{
|
|
boltPath: boltPath,
|
|
cliConfigsPath: configsPath,
|
|
enginePath: enginePath,
|
|
}
|
|
|
|
err = sourceOpts.validatePaths()
|
|
require.NotNil(t, err, "Must fail")
|
|
require.Contains(t, err.Error(), "1.x DB dir")
|
|
|
|
err = os.MkdirAll(filepath.Join(v1Dir, "meta"), 0777)
|
|
require.Nil(t, err)
|
|
|
|
err = sourceOpts.validatePaths()
|
|
require.NotNil(t, err, "Must fail")
|
|
require.Contains(t, err.Error(), "1.x meta.db")
|
|
|
|
err = os.WriteFile(filepath.Join(v1Dir, "meta", "meta.db"), []byte{1}, 0777)
|
|
require.Nil(t, err)
|
|
|
|
err = sourceOpts.validatePaths()
|
|
require.Nil(t, err)
|
|
|
|
err = targetOpts.validatePaths()
|
|
require.NotNil(t, err, "Must fail")
|
|
require.Contains(t, err.Error(), "2.x engine")
|
|
|
|
err = os.Remove(filepath.Join(enginePath, "db"))
|
|
require.Nil(t, err)
|
|
|
|
err = os.WriteFile(configsPath, []byte{1}, 0777)
|
|
require.Nil(t, err)
|
|
|
|
err = targetOpts.validatePaths()
|
|
require.NotNil(t, err, "Must fail")
|
|
require.Contains(t, err.Error(), "2.x CLI configs")
|
|
}
|
|
|
|
func TestClearTargetPaths(t *testing.T) {
|
|
tmpdir := t.TempDir()
|
|
|
|
v2Dir := filepath.Join(tmpdir, "v2db")
|
|
boltPath := filepath.Join(v2Dir, bolt.DefaultFilename)
|
|
configsPath := filepath.Join(v2Dir, "configs")
|
|
enginePath := filepath.Join(v2Dir, "engine")
|
|
cqPath := filepath.Join(v2Dir, "cqs")
|
|
configPath := filepath.Join(v2Dir, "config")
|
|
|
|
err := os.MkdirAll(filepath.Join(enginePath, "db"), 0777)
|
|
require.NoError(t, err)
|
|
err = os.WriteFile(boltPath, []byte{1}, 0777)
|
|
require.NoError(t, err)
|
|
err = os.WriteFile(configsPath, []byte{1}, 0777)
|
|
require.NoError(t, err)
|
|
err = os.WriteFile(cqPath, []byte{1}, 0777)
|
|
require.NoError(t, err)
|
|
err = os.WriteFile(configPath, []byte{1}, 0777)
|
|
require.NoError(t, err)
|
|
|
|
targetOpts := &optionsV2{
|
|
boltPath: boltPath,
|
|
cliConfigsPath: configsPath,
|
|
enginePath: enginePath,
|
|
configPath: configPath,
|
|
cqPath: cqPath,
|
|
}
|
|
|
|
err = targetOpts.validatePaths()
|
|
require.Error(t, err)
|
|
err = targetOpts.clearPaths()
|
|
require.NoError(t, err)
|
|
err = targetOpts.validatePaths()
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func TestDbURL(t *testing.T) {
|
|
|
|
type testCase struct {
|
|
name string
|
|
conf string
|
|
want string
|
|
}
|
|
|
|
var testCases = []testCase{
|
|
{
|
|
name: "default",
|
|
conf: "[meta]\n[data]\n[http]\n",
|
|
want: "http://localhost:8086",
|
|
},
|
|
{
|
|
name: "custom but same as default",
|
|
conf: "[meta]\n[data]\n[http]\nbind-address=\":8086\"\nhttps-enabled=false",
|
|
want: "http://localhost:8086",
|
|
},
|
|
{
|
|
name: "custom no host",
|
|
conf: "[meta]\n[data]\n[http]\nbind-address=\":8186\"\nhttps-enabled=true",
|
|
want: "https://localhost:8186",
|
|
},
|
|
{
|
|
name: "custom with host",
|
|
conf: "[meta]\n[data]\n[http]\nbind-address=\"10.0.0.1:8086\"\nhttps-enabled=true",
|
|
want: "https://10.0.0.1:8086",
|
|
},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
t.Parallel()
|
|
var c configV1
|
|
_, err := toml.Decode(tc.conf, &c)
|
|
require.NoError(t, err)
|
|
if diff := cmp.Diff(tc.want, c.dbURL()); diff != "" {
|
|
t.Fatal(diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestUpgradeRealDB(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
tmpdir := t.TempDir()
|
|
|
|
err := testutil.Unzip(filepath.Join("testdata", "v1db.zip"), tmpdir)
|
|
require.NoError(t, err)
|
|
|
|
v1ConfigPath := filepath.Join(tmpdir, "v1.conf")
|
|
v1Config, err := os.Create(v1ConfigPath)
|
|
require.NoError(t, err)
|
|
defer v1Config.Close()
|
|
|
|
var pathsep = "/"
|
|
if runtime.GOOS == "windows" {
|
|
// Turn '\' into '\\' so it's escaped properly.
|
|
pathsep = "\\\\"
|
|
tmpdir = strings.ReplaceAll(tmpdir, "\\", "\\\\")
|
|
}
|
|
_, err = v1Config.WriteString(fmt.Sprintf(`reporting-disabled = true
|
|
[meta]
|
|
dir = "%[1]s%[2]sv1db%[2]smeta"
|
|
|
|
[data]
|
|
dir = "%[1]s%[2]sv1db%[2]sdata"
|
|
wal-dir = "%[1]s%[2]sv1db%[2]swal"
|
|
|
|
[coordinator]
|
|
max-concurrent-queries = 0
|
|
`,
|
|
tmpdir, pathsep))
|
|
require.NoError(t, err)
|
|
v1Config.Close()
|
|
|
|
tl := launcher.NewTestLauncherServer()
|
|
boltPath := filepath.Join(tl.Path, bolt.DefaultFilename)
|
|
enginePath := filepath.Join(tl.Path, "engine")
|
|
cqPath := filepath.Join(tl.Path, "cq.txt")
|
|
cliConfigPath := filepath.Join(tl.Path, "influx-configs")
|
|
configPath := filepath.Join(tl.Path, "config.toml")
|
|
|
|
v1opts := &optionsV1{configFile: v1ConfigPath}
|
|
v2opts := &optionsV2{
|
|
boltPath: boltPath,
|
|
enginePath: enginePath,
|
|
cqPath: cqPath,
|
|
cliConfigsPath: cliConfigPath,
|
|
configPath: configPath,
|
|
userName: "my-user",
|
|
password: "my-password",
|
|
orgName: "my-org",
|
|
bucket: "my-bucket",
|
|
retention: "7d",
|
|
token: "my-token",
|
|
}
|
|
|
|
opts := &options{source: *v1opts, target: *v2opts, force: true}
|
|
log := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))
|
|
err = runUpgradeE(ctx, clients.CLI{}, opts, log)
|
|
require.NoError(t, err)
|
|
|
|
v := viper.New()
|
|
v.SetConfigFile(configPath)
|
|
require.NoError(t, v.ReadInConfig())
|
|
lOpts := launcher.NewOpts(v)
|
|
cliOpts := lOpts.BindCliOpts()
|
|
|
|
cmd := cobra.Command{
|
|
Use: "test",
|
|
Run: func(*cobra.Command, []string) {
|
|
tl.RunOrFail(t, ctx, func(o *launcher.InfluxdOpts) {
|
|
*o = *lOpts
|
|
})
|
|
defer tl.ShutdownOrFail(t, ctx)
|
|
|
|
orgs, _, err := tl.OrganizationService().FindOrganizations(ctx, influxdb.OrganizationFilter{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, orgs)
|
|
require.Len(t, orgs, 1)
|
|
require.Equal(t, "my-org", orgs[0].Name)
|
|
|
|
users, _, err := tl.UserService().FindUsers(ctx, influxdb.UserFilter{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, users)
|
|
require.Len(t, users, 1)
|
|
require.Equal(t, "my-user", users[0].Name)
|
|
|
|
tokenNames := []string{"reader", "writer", "readerwriter"}
|
|
compatTokens, _, err := tl.Launcher.AuthorizationV1Service().FindAuthorizations(ctx, influxdb.AuthorizationFilter{})
|
|
require.NoError(t, err)
|
|
require.NotNil(t, compatTokens)
|
|
require.Len(t, compatTokens, len(tokenNames))
|
|
|
|
buckets, _, err := tl.Launcher.BucketService().FindBuckets(ctx, influxdb.BucketFilter{})
|
|
require.NoError(t, err)
|
|
|
|
bucketNames := []string{"my-bucket", "_tasks", "_monitoring", "mydb/autogen", "mydb/1week", "test/autogen", "empty/autogen"}
|
|
myDbAutogenBucketId := ""
|
|
myDb1weekBucketId := ""
|
|
testBucketId := ""
|
|
emptyBucketId := ""
|
|
|
|
require.NotNil(t, buckets)
|
|
require.Len(t, buckets, len(bucketNames))
|
|
|
|
for _, b := range buckets {
|
|
require.Contains(t, bucketNames, b.Name)
|
|
switch b.Name {
|
|
case bucketNames[0]:
|
|
tl.Bucket = b
|
|
case bucketNames[3]:
|
|
myDbAutogenBucketId = b.ID.String()
|
|
case bucketNames[4]:
|
|
myDb1weekBucketId = b.ID.String()
|
|
case bucketNames[5]:
|
|
testBucketId = b.ID.String()
|
|
case bucketNames[6]:
|
|
emptyBucketId = b.ID.String()
|
|
}
|
|
require.NotZero(t, b.ShardGroupDuration)
|
|
}
|
|
require.NoDirExists(t, filepath.Join(enginePath, "data", "_internal"))
|
|
|
|
// Ensure retention policy from the setup request passed through to the bucket.
|
|
require.Equal(t, humanize.Week, tl.Bucket.RetentionPeriod)
|
|
|
|
dbChecks := []struct {
|
|
dbname string
|
|
shardsNum int
|
|
}{
|
|
{myDbAutogenBucketId, 3},
|
|
{testBucketId, 5},
|
|
{myDb1weekBucketId, 1},
|
|
{emptyBucketId, 0},
|
|
}
|
|
|
|
for _, check := range dbChecks {
|
|
db := tl.Launcher.Engine().MetaClient().Database(check.dbname)
|
|
require.NotNil(t, db)
|
|
require.Len(t, db.ShardInfos(), check.shardsNum)
|
|
if check.shardsNum > 0 {
|
|
require.DirExists(t, filepath.Join(enginePath, "data", check.dbname, meta.DefaultRetentionPolicyName))
|
|
}
|
|
}
|
|
|
|
auths, _, err := tl.Launcher.AuthorizationService().FindAuthorizations(ctx, influxdb.AuthorizationFilter{})
|
|
require.NoError(t, err)
|
|
require.Len(t, auths, 1)
|
|
|
|
respBody := mustRunQuery(t, tl, "test", "select count(avg) from stat", auths[0].Token)
|
|
require.Contains(t, respBody, `["1970-01-01T00:00:00Z",5776]`)
|
|
|
|
respBody = mustRunQuery(t, tl, "mydb", "select count(avg) from testv1", auths[0].Token)
|
|
require.Contains(t, respBody, `["1970-01-01T00:00:00Z",2882]`)
|
|
|
|
respBody = mustRunQuery(t, tl, "mydb", "select count(i) from testv1", auths[0].Token)
|
|
require.Contains(t, respBody, `["1970-01-01T00:00:00Z",21]`)
|
|
|
|
respBody = mustRunQuery(t, tl, "mydb", `select count(line) from mydb."1week".log`, auths[0].Token)
|
|
require.Contains(t, respBody, `["1970-01-01T00:00:00Z",1]`)
|
|
|
|
cqBytes, err := os.ReadFile(cqPath)
|
|
require.NoError(t, err)
|
|
cqs := string(cqBytes)
|
|
|
|
require.Contains(t, cqs, "CREATE CONTINUOUS QUERY other_cq ON test BEGIN SELECT mean(foo) INTO test.autogen.foo FROM empty.autogen.foo GROUP BY time(1h) END")
|
|
require.Contains(t, cqs, "CREATE CONTINUOUS QUERY cq_3 ON test BEGIN SELECT mean(bar) INTO test.autogen.bar FROM test.autogen.foo GROUP BY time(1m) END")
|
|
require.Contains(t, cqs, "CREATE CONTINUOUS QUERY cq ON empty BEGIN SELECT mean(example) INTO empty.autogen.mean FROM empty.autogen.raw GROUP BY time(1h) END")
|
|
},
|
|
}
|
|
require.NoError(t, cli.BindOptions(v, &cmd, cliOpts))
|
|
require.NoError(t, cmd.Execute())
|
|
}
|
|
|
|
func mustRunQuery(t *testing.T, tl *launcher.TestLauncher, db, rawQ, token string) string {
|
|
queryUrl := *tl.URL()
|
|
queryUrl.Path = "/query"
|
|
|
|
params := queryUrl.Query()
|
|
params.Set("db", db)
|
|
params.Set("q", rawQ)
|
|
queryUrl.RawQuery = params.Encode()
|
|
|
|
req, err := http.NewRequest(http.MethodGet, queryUrl.String(), nil)
|
|
require.Nil(t, err)
|
|
|
|
req.Header.Set("Authorization", "Token "+token)
|
|
resp, err := http.DefaultClient.Do(req)
|
|
require.Nil(t, err)
|
|
|
|
respBody, err := io.ReadAll(resp.Body)
|
|
require.Nil(t, err)
|
|
|
|
return string(respBody)
|
|
}
|