feat(upgrade): upgrade databases

pull/19710/head
vlastahajek 2020-10-01 17:36:23 +02:00 committed by Stuart Carnie
parent 7cd689d57e
commit 8763bb1af9
15 changed files with 866 additions and 19 deletions

View File

@ -48,6 +48,9 @@ type TestLauncher struct {
Stdin bytes.Buffer
Stdout bytes.Buffer
Stderr bytes.Buffer
// Flag to act as standard server: disk store, no-e2e testing flag
realServer bool
}
// NewTestLauncher returns a new instance of TestLauncher.
@ -70,6 +73,13 @@ func NewTestLauncher(flagger feature.Flagger) *TestLauncher {
return l
}
// NewTestLauncherServer returns a new instance of TestLauncher configured as real server (disk store, no e2e flag).
func NewTestLauncherServer(flagger feature.Flagger) *TestLauncher {
l := NewTestLauncher(flagger)
l.realServer = true
return l
}
// RunTestLauncherOrFail initializes and starts the server.
func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.Flagger, args ...string) *TestLauncher {
tb.Helper()
@ -85,8 +95,10 @@ func RunTestLauncherOrFail(tb testing.TB, ctx context.Context, flagger feature.F
// Passed arguments will overwrite/add to the default ones.
func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
largs := make([]string, 0, len(args)+8)
largs = append(largs, "--store", "memory")
largs = append(largs, "--e2e-testing")
if !tl.realServer {
largs = append(largs, "--store", "memory")
largs = append(largs, "--e2e-testing")
}
largs = append(largs, "--testing-always-allow-setup")
largs = append(largs, "--bolt-path", filepath.Join(tl.Path, bolt.DefaultFilename))
largs = append(largs, "--engine-path", filepath.Join(tl.Path, "engine"))
@ -98,8 +110,10 @@ func (tl *TestLauncher) Run(ctx context.Context, args ...string) error {
// Shutdown stops the program and cleans up temporary paths.
func (tl *TestLauncher) Shutdown(ctx context.Context) error {
tl.Cancel()
tl.Launcher.Shutdown(ctx)
if tl.running {
tl.Cancel()
tl.Launcher.Shutdown(ctx)
}
return os.RemoveAll(tl.Path)
}

View File

@ -2,12 +2,183 @@ package upgrade
import (
"context"
"fmt"
"github.com/dustin/go-humanize"
"path/filepath"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"go.uber.org/zap"
)
// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data
func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, orgID influxdb.ID, log *zap.Logger) (map[string][]string, error) {
return nil, errors.New("not implemented")
func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opts *optionsV1, v2opts *optionsV2, orgID influxdb.ID, log *zap.Logger) (map[string][]string, error) {
db2BucketIds := make(map[string][]string)
targetDataPath := filepath.Join(v2opts.enginePath, "data")
targetWalPath := filepath.Join(v2opts.enginePath, "wal")
dirFilterFunc := func(path string) bool {
base := filepath.Base(path)
if base == "_series" ||
(len(base) > 0 && base[0] == '_') || //skip internal databases
base == "index" {
return true
}
return false
}
if len(v1.meta.Databases()) == 0 {
log.Info("No database found in the 1.x meta")
return db2BucketIds, nil
}
// Check space
log.Info("Checking space")
size, err := DirSize(v1opts.dataDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.dataDir, err)
}
size2, err := DirSize(v1opts.walDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.walDir, err)
}
size += size2
v2dir := filepath.Dir(v2opts.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return nil, fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}
if options.verbose {
log.Info("Disk space info",
zap.String("Free space", humanize.Bytes(diskInfo.Free)),
zap.String("Requested space", humanize.Bytes(size)))
}
if size > diskInfo.Free {
return nil, fmt.Errorf("not enough space on target disk of %s: need %d, available %d ", v2dir, size, diskInfo.Free)
}
log.Info("Upgrading databases")
// read each database / retention policy from v1.meta and create bucket db-name/rp-name
// create database in v2.meta
// copy shard info from v1.meta
for _, db := range v1.meta.Databases() {
if db.Name == "_internal" {
if options.verbose {
log.Info("Skipping _internal ")
}
continue
}
if options.verbose {
log.Info("Upgrading database ",
zap.String("database", db.Name))
}
// db to buckets IDs mapping
db2BucketIds[db.Name] = make([]string, 0, len(db.RetentionPolicies))
for _, rp := range db.RetentionPolicies {
sourcePath := filepath.Join(v1opts.dataDir, db.Name, rp.Name)
bucket := &influxdb.Bucket{
OrgID: orgID,
Type: influxdb.BucketTypeUser,
Name: db.Name + "-" + rp.Name,
Description: fmt.Sprintf("Upgraded from v1 database %s with retention policy %s", db.Name, rp.Name),
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
}
if options.verbose {
log.Info("Creating bucket ",
zap.String("Bucket", bucket.Name))
}
err = v2.bucketSvc.CreateBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("error creating bucket %s: %w", bucket.Name, err)
}
db2BucketIds[db.Name] = append(db2BucketIds[db.Name], bucket.ID.String())
if options.verbose {
log.Info("Creating database with retention policy",
zap.String("database", bucket.ID.String()))
}
spec := rp.ToSpec()
spec.Name = meta.DefaultRetentionPolicyName
dbv2, err := v2.meta.CreateDatabaseWithRetentionPolicy(bucket.ID.String(), spec)
if err != nil {
return nil, fmt.Errorf("error creating database %s: %w", bucket.ID.String(), err)
}
mapping := &influxdb.DBRPMappingV2{
Database: db.Name,
RetentionPolicy: rp.Name,
Default: db.DefaultRetentionPolicy == rp.Name,
OrganizationID: orgID,
BucketID: bucket.ID,
}
if options.verbose {
log.Info("Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()))
}
err = v2.dbrpSvc.Create(ctx, mapping)
if err != nil {
return nil, fmt.Errorf("error creating mapping %s/%s -> Org %s, bucket %s: %w", mapping.Database, mapping.RetentionPolicy, mapping.OrganizationID.String(), mapping.BucketID.String(), err)
}
shardsNum := 0
for _, sg := range rp.ShardGroups {
if options.verbose {
log.Info("Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime))
}
shardsNum += len(sg.Shards)
_, err := v2.meta.CreateShardGroupWithShards(dbv2.Name, dbv2.DefaultRetentionPolicy, sg.StartTime, sg.Shards)
if err != nil {
return nil, fmt.Errorf("error creating database %s: %w", bucket.ID.String(), err)
}
}
//empty retention policy doesn't have data
if shardsNum > 0 {
targetPath := filepath.Join(targetDataPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
err = CopyDir(sourcePath,
targetPath,
nil,
dirFilterFunc,
nil)
if err != nil {
return nil, fmt.Errorf("error copying v1 data from %s to %s: %w", sourcePath, targetPath, err)
}
sourcePath = filepath.Join(v1opts.walDir, db.Name, rp.Name)
targetPath = filepath.Join(targetWalPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
err = CopyDir(sourcePath,
targetPath,
nil,
dirFilterFunc,
nil)
if err != nil {
return nil, fmt.Errorf("error copying v1 data from %s to %s: %w", sourcePath, targetPath, err)
}
} else {
log.Warn("Empty retention policy")
}
}
}
err = v2.close()
if err != nil {
return nil, fmt.Errorf("error closing v2: %w", err)
}
return db2BucketIds, nil
}

View File

@ -0,0 +1,210 @@
package upgrade
import (
"context"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/internal/testutil"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
// Default context.
var ctx = context.Background()
func TestUpgradeRealDB(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "")
require.Nil(t, err)
defer os.RemoveAll(tmpdir)
err = testutil.Unzip("testdata/v1db.zip", tmpdir)
require.Nil(t, err)
tl := launcher.NewTestLauncherServer(nil)
defer tl.ShutdownOrFail(t, ctx)
boltPath := filepath.Join(tl.Path, bolt.DefaultFilename)
enginePath := filepath.Join(tl.Path, "engine")
v1opts := &optionsV1{dbDir: tmpdir + "/v1db"}
err = v1opts.checkDirs()
require.Nil(t, err)
v1, err := newInfluxDBv1(v1opts)
require.Nil(t, err)
v2opts := &optionsV2{
boltPath: boltPath,
enginePath: enginePath,
userName: "my-user",
password: "my-password",
orgName: "my-org",
bucket: "my-bucket",
retention: "7d",
token: "my-token",
}
v2, err := newInfluxDBv2(ctx, v2opts, zap.NewNop())
require.Nil(t, err)
options.target = *v2opts
req, err := nonInteractive()
require.Nil(t, err)
resp, err := setupAdmin(ctx, v2, req)
require.Nil(t, err)
require.NotNil(t, resp)
require.NotNil(t, resp.Org)
require.NotNil(t, resp.Auth)
require.NotNil(t, resp.User)
require.NotNil(t, resp.Bucket)
assert.Equal(t, "my-org", resp.Org.Name)
assert.Equal(t, "my-user", resp.User.Name)
assert.Equal(t, "my-bucket", resp.Bucket.Name)
assert.Equal(t, "my-token", resp.Auth.Token)
log, err := zap.NewDevelopment()
require.Nil(t, err)
db2bids, err := upgradeDatabases(ctx, v1, v2, v1opts, v2opts, resp.Org.ID, log)
require.Nil(t, err)
err = v2.close()
require.Nil(t, err)
require.Len(t, db2bids, 3)
require.Len(t, db2bids["mydb"], 2)
require.Len(t, db2bids["test"], 1)
require.Len(t, db2bids["empty"], 1)
v2, err = newInfluxDBv2(ctx, &optionsV2{boltPath: boltPath, enginePath: enginePath}, log)
require.Nil(t, err)
orgs, _, err := v2.ts.FindOrganizations(ctx, influxdb.OrganizationFilter{})
require.Nil(t, err)
require.NotNil(t, orgs)
require.Len(t, orgs, 1)
assert.Equal(t, "my-org", orgs[0].Name)
tl.Org = orgs[0]
users, _, err := v2.ts.FindUsers(ctx, influxdb.UserFilter{})
require.Nil(t, err)
require.NotNil(t, users)
require.Len(t, users, 1)
assert.Equal(t, "my-user", users[0].Name)
tl.User = users[0]
buckets, _, err := v2.ts.FindBuckets(ctx, influxdb.BucketFilter{})
require.Nil(t, err)
bucketNames := []string{"my-bucket", "_tasks", "_monitoring", "mydb-autogen", "mydb-1week", "test-autogen", "empty-autogen"}
myDbAutogenBucketId := ""
myDb1weekBucketId := ""
testBucketId := ""
emptyBucketId := ""
require.NotNil(t, buckets)
require.Len(t, buckets, len(bucketNames))
for _, b := range buckets {
assert.Contains(t, bucketNames, b.Name)
switch b.Name {
case bucketNames[0]:
tl.Bucket = b
case bucketNames[3]:
myDbAutogenBucketId = b.ID.String()
case bucketNames[4]:
myDb1weekBucketId = b.ID.String()
case bucketNames[5]:
testBucketId = b.ID.String()
case bucketNames[6]:
emptyBucketId = b.ID.String()
}
}
assert.NoDirExists(t, filepath.Join(enginePath, "data", "_internal"))
dbChecks := []struct {
dbname string
shardsNum int
}{
{myDbAutogenBucketId, 3},
{testBucketId, 5},
{myDb1weekBucketId, 1},
{emptyBucketId, 0},
}
for _, check := range dbChecks {
db := v2.meta.Database(check.dbname)
require.NotNil(t, db)
assert.Len(t, db.ShardInfos(), check.shardsNum)
if check.shardsNum > 0 {
assert.DirExists(t, filepath.Join(enginePath, "data", check.dbname, meta.DefaultRetentionPolicyName))
}
assert.NoDirExists(t, filepath.Join(enginePath, "data", check.dbname, "_series"))
for _, si := range db.ShardInfos() {
assert.NoDirExists(t, filepath.Join(enginePath, "data", check.dbname, strconv.FormatUint(si.ID, 10), "index"))
}
}
auths, _, err := v2.kvService.FindAuthorizations(ctx, influxdb.AuthorizationFilter{})
require.Nil(t, err)
require.Len(t, auths, 1)
tl.Auth = auths[0]
err = v2.close()
require.Nil(t, err)
// start server
err = tl.Run(ctx)
require.Nil(t, err)
respBody := mustRunQuery(t, tl, "test", "select count(avg) from stat")
assert.Contains(t, respBody, `["1970-01-01T00:00:00Z",5776]`)
respBody = mustRunQuery(t, tl, "mydb", "select count(avg) from testv1")
assert.Contains(t, respBody, `["1970-01-01T00:00:00Z",2882]`)
respBody = mustRunQuery(t, tl, "mydb", "select count(i) from testv1")
assert.Contains(t, respBody, `["1970-01-01T00:00:00Z",21]`)
respBody = mustRunQuery(t, tl, "mydb", `select count(line) from mydb."1week".log`)
assert.Contains(t, respBody, `["1970-01-01T00:00:00Z",1]`)
}
func mustRunQuery(t *testing.T, tl *launcher.TestLauncher, db, rawQ string) string {
queryUrl, err := url.Parse(tl.URL() + "/query")
require.Nil(t, err)
params := queryUrl.Query()
params.Set("db", db)
params.Set("q", rawQ)
queryUrl.RawQuery = params.Encode()
req, err := http.NewRequest(http.MethodGet, queryUrl.String(), nil)
require.Nil(t, err)
req.Header.Set("Authorization", "Token "+tl.Auth.Token)
resp, err := http.DefaultClient.Do(req)
require.Nil(t, err)
respBody, err := ioutil.ReadAll(resp.Body)
require.Nil(t, err)
return string(respBody)
}

135
cmd/influxd/upgrade/fs.go Normal file
View File

@ -0,0 +1,135 @@
package upgrade
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
)
// DirSize returns total size in bytes of containing files
func DirSize(path string) (uint64, error) {
var size uint64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
size += uint64(info.Size())
}
return err
})
return size, err
}
// CopyFile copies the contents of the file named src to the file named
// by dst. The file will be created if it does not already exist. If the
// destination file exists, all it's contents will be replaced by the contents
// of the source file. The file mode will be copied from the source and
// the copied data is synced/flushed to stable storage.
func CopyFile(src, dst string) (err error) {
in, err := os.Open(src)
if err != nil {
return
}
defer in.Close()
si, err := os.Stat(src)
if err != nil {
return
}
out, err := os.OpenFile(dst, os.O_WRONLY|os.O_CREATE, si.Mode())
//out, err := os.Create(dst)
if err != nil {
return
}
defer func() {
if e := out.Close(); e != nil {
err = e
}
}()
_, err = io.Copy(out, in)
if err != nil {
return
}
err = out.Sync()
if err != nil {
return
}
return
}
// CopyDir recursively copies a directory tree, attempting to preserve permissions.
// Source directory must exist, destination directory must *not* exist.
// Symlinks are ignored and skipped.
// dirRenameFunc is a mapping function that transforms path to a new name. Returning the path specifies the directory should not be renamed.
// dirFilterFunc ignores all directories where dirFilterFunc(path) is true. Passing nil for dirFilterFunc includes all directories.
// fileFilterFunc ignores all files where fileFilterFunc(path) is true. Passing nil for fileFilterFunc includes all files.
func CopyDir(src string, dst string, dirRenameFunc func(path string) string, dirFilterFunc func(path string) bool, fileFilterFunc func(path string) bool) (err error) {
src = filepath.Clean(src)
dst = filepath.Clean(dst)
if dirFilterFunc != nil && dirFilterFunc(src) {
return
}
si, err := os.Stat(src)
if err != nil {
return err
}
if !si.IsDir() {
return fmt.Errorf("source is not a directory")
}
_, err = os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
return
}
if err == nil {
return fmt.Errorf("destination '%s' already exists", dst)
}
err = os.MkdirAll(dst, si.Mode())
if err != nil {
return
}
entries, err := ioutil.ReadDir(src)
if err != nil {
return
}
for _, entry := range entries {
srcPath := filepath.Join(src, entry.Name())
entryName := entry.Name()
if dirRenameFunc != nil {
entryName = dirRenameFunc(entryName)
}
dstPath := filepath.Join(dst, entryName)
if entry.IsDir() {
err = CopyDir(srcPath, dstPath, dirRenameFunc, dirFilterFunc, fileFilterFunc)
if err != nil {
return
}
} else {
// Skip symlinks.
if entry.Mode()&os.ModeSymlink != 0 {
continue
}
if fileFilterFunc != nil && fileFilterFunc(src) {
continue
}
err = CopyFile(srcPath, dstPath)
if err != nil {
return
}
}
}
return
}

View File

@ -0,0 +1,97 @@
package upgrade
import (
"bytes"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
)
func TestCopyDirAndDirSize(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "tcd")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
err = os.MkdirAll(filepath.Join(tmpdir, "1", "1", "1"), 0700)
if err != nil {
t.Fatal(err)
}
err = os.MkdirAll(filepath.Join(tmpdir, "1", "2", "1"), 0770)
if err != nil {
t.Fatal(err)
}
err = os.MkdirAll(filepath.Join(tmpdir, "1", "2", "skip"), 0770)
if err != nil {
t.Fatal(err)
}
err = os.MkdirAll(filepath.Join(tmpdir, "2", "1", "1"), 0777)
if err != nil {
t.Fatal(err)
}
mustCreateFile(t, filepath.Join(tmpdir, "1", "1.bin"), 300, 0600)
mustCreateFile(t, filepath.Join(tmpdir, "1", "1", "1", "1.bin"), 250, 0600)
mustCreateFile(t, filepath.Join(tmpdir, "1", "1", "1", "2.bin"), 350, 0400)
mustCreateFile(t, filepath.Join(tmpdir, "1", "2", "1", "1.bin"), 200, 0640)
mustCreateFile(t, filepath.Join(tmpdir, "1", "2", "skip", "1.bin"), 200, 0640)
mustCreateFile(t, filepath.Join(tmpdir, "2", "1", "1", "1.bin"), 200, 0644)
mustCreateFile(t, filepath.Join(tmpdir, "2", "1", "1", "2.bin"), 100, 0640)
size, err := DirSize(tmpdir)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, uint64(1600), size)
targetDir, err := ioutil.TempDir("", "tcd")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(targetDir)
targetDir = filepath.Join(targetDir, "x")
err = CopyDir(tmpdir, targetDir, nil, func(path string) bool {
base := filepath.Base(path)
return base == "skip"
},
nil)
if err != nil {
t.Fatal(err)
}
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "1", "1.bin"), 300, 0600)
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "1", "1", "1", "1.bin"), 250, 0600)
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "1", "1", "1", "2.bin"), 350, 0400)
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "1", "2", "1", "1.bin"), 200, 0640)
assert.NoFileExists(t, filepath.Join(targetDir, "1", "2", "skip", "1.bin"))
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "2", "1", "1", "1.bin"), 200, 0644)
assetFileExistHasSizeAndPerm(t, filepath.Join(targetDir, "2", "1", "1", "2.bin"), 100, 0640)
}
func assetFileExistHasSizeAndPerm(t *testing.T, path string, size int, mode os.FileMode) {
t.Helper()
fi, err := os.Stat(path)
if err != nil {
t.Error(err)
} else {
assert.Equal(t, int64(size), fi.Size(), path)
assert.Equal(t, mode, fi.Mode()&0xFFF, path)
}
}
func mustCreateFile(t *testing.T, path string, size int, mode os.FileMode) {
t.Helper()
var buff bytes.Buffer
for i := 0; i < size; i++ {
b := byte(rand.Int31n(256))
buff.Write([]byte{b})
}
err := ioutil.WriteFile(path, buff.Bytes(), mode)
if err != nil {
t.Fatal(err)
}
}

BIN
cmd/influxd/upgrade/testdata/v1db.zip vendored Normal file

Binary file not shown.

View File

@ -346,7 +346,7 @@ func runUpgradeE(*cobra.Command, []string) error {
options.target.token = or.Auth.Token
db2BucketIds, err := upgradeDatabases(ctx, v1, v2, or.Org.ID, log)
db2BucketIds, err := upgradeDatabases(ctx, v1, v2, &options.source, &options.target, or.Org.ID, log)
if err != nil {
//remove all files
log.Info("Database upgrade error, removing data")

View File

@ -0,0 +1,66 @@
package testutil
import (
"archive/zip"
"fmt"
"io"
"os"
"path/filepath"
"strings"
)
// Unzip will extract a zip archive into dest
func Unzip(src string, dest string) error {
r, err := zip.OpenReader(src)
if err != nil {
return err
}
defer r.Close()
for _, f := range r.File {
// Store filename/path for returning and using later on
fpath := filepath.Join(dest, f.Name)
// Check for ZipSlip. More Info: http://bit.ly/2MsjAWE
if !strings.HasPrefix(fpath, filepath.Clean(dest)+string(os.PathSeparator)) {
return fmt.Errorf("%s: illegal file path", fpath)
}
if f.FileInfo().IsDir() {
// Make Folder
if err := os.MkdirAll(fpath, os.ModePerm); err != nil {
return err
}
continue
}
if err = os.MkdirAll(filepath.Dir(fpath), os.ModePerm); err != nil {
return err
}
outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
rc, err := f.Open()
if err != nil {
return err
}
_, err = io.Copy(outFile, rc)
if err != nil {
return err
}
if err := outFile.Close(); err != nil {
return err
}
if err := rc.Close(); err != nil {
return err
}
}
return nil
}

View File

@ -1,6 +1,8 @@
package fs
import "fmt"
import (
"fmt"
)
// A FileExistsError is returned when an operation cannot be completed due to a
// file already existing.
@ -15,3 +17,11 @@ func newFileExistsError(path string) FileExistsError {
func (e FileExistsError) Error() string {
return fmt.Sprintf("operation not allowed, file %q exists", e.path)
}
// DiskStatus is returned by DiskUsage
type DiskStatus struct {
All uint64
Used uint64
Free uint64
Avail uint64
}

View File

@ -5,6 +5,8 @@ package fs
import (
"os"
"syscall"
unix "golang.org/x/sys/unix"
)
// SyncDir flushes any file renames to the filesystem.
@ -66,3 +68,18 @@ func CreateFile(newpath string) (*os.File, error) {
return os.Create(newpath)
}
// DiskUsage returns disk usage of disk of path
func DiskUsage(path string) (*DiskStatus, error) {
var disk DiskStatus
fs := unix.Statfs_t{}
err := unix.Statfs(path, &fs)
if err != nil {
return nil, err
}
disk.All = fs.Blocks * uint64(fs.Bsize)
disk.Avail = fs.Bavail * uint64(fs.Bsize)
disk.Free = fs.Bfree * uint64(fs.Bsize)
disk.Used = disk.All - disk.Free
return &disk, nil
}

View File

@ -1,6 +1,10 @@
package fs
import "os"
import (
"os"
"syscall"
"unsafe"
)
func SyncDir(dirName string) error {
return nil
@ -50,3 +54,23 @@ func CreateFile(newpath string) (*os.File, error) {
return os.Create(newpath)
}
// DiskUsage returns disk usage of disk of path
func DiskUsage(path string) (*DiskStatus, error) {
var disk DiskStatus
h := syscall.MustLoadDLL("kernel32.dll")
c := h.MustFindProc("GetDiskFreeSpaceExW")
p, err := syscall.UTF16PtrFromString(path)
if err != nil {
return nil, err
}
r1, _, err := c.Call(uintptr(unsafe.Pointer(p)),
uintptr(unsafe.Pointer(&disk.Avail)),
uintptr(unsafe.Pointer(&disk.All)),
uintptr(unsafe.Pointer(&disk.Free)))
if r1 == 0 {
return nil, err
}
disk.Used = disk.All - disk.Free
return &disk, nil
}

View File

@ -709,8 +709,8 @@ func (c *Client) PruneShardGroups() error {
return nil
}
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// CreateShardGroupWithShards creates a shard group on a database and policy for a given timestamp and assign shards to the shard group
func (c *Client) CreateShardGroupWithShards(database, policy string, timestamp time.Time, shards []ShardInfo) (*ShardGroupInfo, error) {
// Check under a read-lock
c.mu.RLock()
if sg, _ := c.cacheData.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
@ -728,7 +728,7 @@ func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time)
return sg, nil
}
sgi, err := createShardGroup(data, database, policy, timestamp)
sgi, err := createShardGroup(data, database, policy, timestamp, shards...)
if err != nil {
return nil, err
}
@ -740,13 +740,17 @@ func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time)
return sgi, nil
}
func createShardGroup(data *Data, database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
return c.CreateShardGroupWithShards(database, policy, timestamp, nil)
}
func createShardGroup(data *Data, database, policy string, timestamp time.Time, shards ...ShardInfo) (*ShardGroupInfo, error) {
// It is the responsibility of the caller to check if it exists before calling this method.
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return nil, ErrShardGroupExists
}
if err := data.CreateShardGroup(database, policy, timestamp); err != nil {
if err := data.CreateShardGroup(database, policy, timestamp, shards...); err != nil {
return nil, err
}

View File

@ -1103,6 +1103,58 @@ func TestMetaClient_PruneShardGroups(t *testing.T) {
}
}
// Tests that calling CreateShardGroup for the same time range doesn't increment the data.Index
func TestMetaClient_CreateShardGroupWithShards(t *testing.T) {
t.Parallel()
d, c := newClient()
defer d()
defer c.Close()
if _, err := c.CreateDatabase("db0"); err != nil {
t.Fatal(err)
}
shards := []meta.ShardInfo{
{1, []meta.ShardOwner{{1}}},
{3, nil},
}
// create a shard group.
tmin := time.Now()
sg, err := c.CreateShardGroupWithShards("db0", "autogen", tmin, shards)
if err != nil {
t.Fatal(err)
} else if sg == nil {
t.Fatalf("expected ShardGroup")
}
if c.Data().MaxShardID != 3 {
t.Log("MaxShardID is not 3: ", c.Data().MaxShardID)
t.Fail()
}
// Test pre-creating shard groups.
dur := sg.EndTime.Sub(sg.StartTime) + time.Nanosecond
tmax := tmin.Add(dur)
groups, err := c.ShardGroupsByTimeRange("db0", "autogen", tmin, tmax)
if err != nil {
t.Fatal(err)
} else if len(groups) != 1 {
t.Fatalf("wrong number of shard groups: %d", len(groups))
} else if len(groups[0].Shards) != 2 {
t.Fatalf("wrong number of shards: %d", len(groups[0].Shards))
} else if groups[0].Shards[0].ID != 1 {
t.Fatalf("wrong id of shard 0: %d", groups[0].Shards[0].ID)
} else if len(groups[0].Shards[0].Owners) != 1 {
t.Fatalf("wrong number of shard 0 owners: %d", len(groups[0].Shards[0].Owners))
} else if groups[0].Shards[0].Owners[0].NodeID != 1 {
t.Fatalf("wrong number of shard 0 owner 0 nodeID: %d", groups[0].Shards[0].Owners[0].NodeID)
} else if groups[0].Shards[1].ID != 3 {
t.Fatalf("wrong id of shard 1: %d", groups[0].Shards[1].ID)
} else if groups[0].Shards[1].Owners != nil {
t.Fatalf("wrong content of shard 1 owners: %v", groups[0].Shards[1].Owners)
}
}
func TestMetaClient_PersistClusterIDAfterRestart(t *testing.T) {
t.Parallel()

View File

@ -355,7 +355,7 @@ func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.
}
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time, shards ...ShardInfo) error {
// Find retention policy.
rpi, err := data.RetentionPolicy(database, policy)
if err != nil {
@ -380,9 +380,19 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
}
data.MaxShardID++
sgi.Shards = []ShardInfo{
{ID: data.MaxShardID},
if len(shards) > 0 {
sgi.Shards = make([]ShardInfo, len(shards))
for i, si := range shards {
sgi.Shards[i] = si
if si.ID > data.MaxShardID {
data.MaxShardID = si.ID
}
}
} else {
data.MaxShardID++
sgi.Shards = []ShardInfo{
{ID: data.MaxShardID},
}
}
// Retention policy has a new shard group, so update the policy. Shard
@ -1132,6 +1142,16 @@ func DefaultRetentionPolicyInfo() *RetentionPolicyInfo {
return NewRetentionPolicyInfo(DefaultRetentionPolicyName)
}
// ToSpec returns RetentionPolicySpec instance with the same data as in RetentionPolicyInfo
func (rpi *RetentionPolicyInfo) ToSpec() *RetentionPolicySpec {
return &RetentionPolicySpec{
Name: rpi.Name,
ReplicaN: &rpi.ReplicaN,
Duration: &rpi.Duration,
ShardGroupDuration: rpi.ShardGroupDuration,
}
}
// Apply applies a specification to the retention policy info.
func (rpi *RetentionPolicyInfo) Apply(spec *RetentionPolicySpec) *RetentionPolicyInfo {
rp := &RetentionPolicyInfo{

View File

@ -384,6 +384,33 @@ func TestShardGroupInfo_Contains(t *testing.T) {
}
}
func TestRetentionPolicyInfo_ToSpec(t *testing.T) {
rp := &meta.RetentionPolicyInfo{
Name: "bar",
ReplicaN: 1,
Duration: 24 * time.Hour,
ShardGroupDuration: time.Hour,
}
spec := rp.ToSpec()
if spec == nil {
t.Fatal("invalid spec")
} else if spec.Name != rp.Name {
t.Fatalf("invalid name: %s", spec.Name)
} else if spec.ReplicaN == nil {
t.Fatalf("invalid ReplicaN")
} else if *spec.ReplicaN != rp.ReplicaN {
t.Fatalf("invalid ReplicaN: %d", *spec.ReplicaN)
} else if spec.Duration == nil {
t.Fatalf("invalid Duration")
} else if *spec.Duration != rp.Duration {
t.Fatalf("invalid Duration: %s", spec.Duration.String())
} else if spec.ShardGroupDuration != rp.ShardGroupDuration {
t.Fatalf("invalid ShardGroupDuration: %s", spec.ShardGroupDuration.String())
}
}
func randString(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
b := make([]rune, n)