diff --git a/Gopkg.lock b/Gopkg.lock index aebc84871a..57db089179 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -447,16 +447,23 @@ [[projects]] branch = "master" - digest = "1:423e244b01a2d7c36072f68f589643a41b814c1868865c2009b7bee49983ab04" + digest = "1:38f69bc2178087c2021679cc2d8076c057b5590b913459118faa14ef02f27832" name = "github.com/influxdata/platform" packages = [ ".", "logger", "models", "pkg/binaryutil", + "pkg/bloom", "pkg/bytesutil", + "pkg/data/gen", + "pkg/encoding/simple8b", "pkg/escape", + "pkg/file", + "pkg/limiter", + "pkg/metrics", "pkg/mmap", + "pkg/pool", "pkg/rhh", "pkg/slices", "pkg/snowflake", @@ -467,11 +474,14 @@ "telegraf/plugins", "telegraf/plugins/inputs", "telegraf/plugins/outputs", + "toml", "tsdb", "tsdb/cursors", + "tsdb/tsi1", + "tsdb/tsm1", ] pruneopts = "UT" - revision = "dc5616e3f9ed409490e465a02ee3f1da50ad2e13" + revision = "da7e0f615a0fe464c0c754da58095e0ea32bc83b" [[projects]] branch = "master" @@ -1073,6 +1083,7 @@ "github.com/influxdata/flux/values", "github.com/influxdata/influxql", "github.com/influxdata/platform/models", + "github.com/influxdata/platform/pkg/data/gen", "github.com/influxdata/platform/query/functions/inputs/storage", "github.com/influxdata/platform/storage/reads", "github.com/influxdata/platform/storage/reads/datatypes", diff --git a/Gopkg.toml b/Gopkg.toml index 528067ac84..aa84233530 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,3 +1,5 @@ +required = ["github.com/influxdata/platform/pkg/data/gen"] + [[constraint]] name = "collectd.org" version = "0.3.0" diff --git a/cmd/influx_tools/export/exporter.go b/cmd/influx_tools/export/exporter.go index 8dfdc19df3..9942ea0399 100644 --- a/cmd/influx_tools/export/exporter.go +++ b/cmd/influx_tools/export/exporter.go @@ -138,7 +138,7 @@ func (e *exporter) loadShardGroups() error { min := time.Unix(0, models.MinNanoTime) max := time.Unix(0, models.MaxNanoTime) - groups, err := e.metaClient.ShardGroupsByTimeRange(e.db, e.rp, min, max) + groups, err := e.metaClient.NodeShardGroupsByTimeRange(e.db, e.rp, min, max) if err != nil { return err } diff --git a/cmd/influx_tools/generate/exec/command.go b/cmd/influx_tools/generate/exec/command.go new file mode 100644 index 0000000000..6219a1a744 --- /dev/null +++ b/cmd/influx_tools/generate/exec/command.go @@ -0,0 +1,163 @@ +package exec + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "math" + "os" + "time" + + "github.com/influxdata/influxdb/cmd/influx_tools/generate" + "github.com/influxdata/influxdb/cmd/influx_tools/internal/profile" + "github.com/influxdata/influxdb/cmd/influx_tools/server" + "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/platform/pkg/data/gen" +) + +// Command represents the program execution for "store query". +type Command struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + deps Dependencies + server server.Interface + filter SeriesGeneratorFilter + + configPath string + printOnly bool + noTSI bool + concurrency int + spec generate.Spec + + profile profile.Config +} + +type SeriesGeneratorFilter func(sgi meta.ShardGroupInfo, g SeriesGenerator) SeriesGenerator + +type Dependencies struct { + Server server.Interface + + // SeriesGeneratorFilter wraps g with a SeriesGenerator that + // returns a subset of keys from g + SeriesGeneratorFilter SeriesGeneratorFilter +} + +// NewCommand returns a new instance of Command. +func NewCommand(deps Dependencies) *Command { + return &Command{ + Stdin: os.Stdin, + Stdout: os.Stdout, + Stderr: os.Stderr, + server: deps.Server, + filter: deps.SeriesGeneratorFilter, + } +} + +func (cmd *Command) Run(args []string) (err error) { + err = cmd.parseFlags(args) + if err != nil { + return err + } + + err = cmd.server.Open(cmd.configPath) + if err != nil { + return err + } + + plan, err := cmd.spec.Plan(cmd.server) + if err != nil { + return err + } + + plan.PrintPlan(cmd.Stdout) + + if cmd.printOnly { + return nil + } + + if err = plan.InitFileSystem(cmd.server.MetaClient()); err != nil { + return err + } + + return cmd.exec(plan) +} + +func (cmd *Command) parseFlags(args []string) error { + fs := flag.NewFlagSet("gen-init", flag.ContinueOnError) + fs.StringVar(&cmd.configPath, "config", "", "Config file") + fs.BoolVar(&cmd.printOnly, "print", false, "Print data spec only") + fs.BoolVar(&cmd.noTSI, "no-tsi", false, "Skip building TSI index") + fs.IntVar(&cmd.concurrency, "c", 1, "Number of shards to generate concurrently") + fs.StringVar(&cmd.profile.CPU, "cpuprofile", "", "Collect a CPU profile") + fs.StringVar(&cmd.profile.Memory, "memprofile", "", "Collect a memory profile") + cmd.spec.AddFlags(fs) + + if err := fs.Parse(args); err != nil { + return err + } + + if cmd.spec.Database == "" { + return errors.New("database is required") + } + + if cmd.spec.Retention == "" { + return errors.New("retention policy is required") + } + + return nil +} + +func (cmd *Command) exec(p *generate.Plan) error { + groups := p.ShardGroups() + gens := make([]SeriesGenerator, len(groups)) + for i := range gens { + var ( + name []byte + keys []string + tv []gen.CountableSequence + ) + + name = []byte("m0") + tv = make([]gen.CountableSequence, len(p.Tags)) + setTagVals(p.Tags, tv) + keys = make([]string, len(p.Tags)) + setTagKeys("tag", keys) + + sgi := groups[i] + vg := gen.NewIntegerConstantValuesSequence(p.PointsPerSeriesPerShard, sgi.StartTime, p.ShardDuration/time.Duration(p.PointsPerSeriesPerShard), 1) + gens[i] = NewSeriesGenerator(name, []byte("v0"), vg, gen.NewTagsValuesSequenceKeysValues(keys, tv)) + if cmd.filter != nil { + gens[i] = cmd.filter(sgi, gens[i]) + } + } + + stop := cmd.profile.Start() + defer stop() + + start := time.Now().UTC() + defer func() { + elapsed := time.Since(start) + fmt.Println() + fmt.Printf("Total time: %0.1f seconds\n", elapsed.Seconds()) + }() + + g := Generator{Concurrency: cmd.concurrency, BuildTSI: !cmd.noTSI} + return g.Run(context.Background(), p.Database, p.ShardPath(), p.NodeShardGroups(), gens) +} + +func setTagVals(tags []int, tv []gen.CountableSequence) { + for j := range tags { + tv[j] = gen.NewCounterByteSequenceCount(tags[j]) + } +} + +func setTagKeys(prefix string, keys []string) { + tw := int(math.Ceil(math.Log10(float64(len(keys))))) + tf := fmt.Sprintf("%s%%0%dd", prefix, tw) + for i := range keys { + keys[i] = fmt.Sprintf(tf, i) + } +} diff --git a/cmd/influx_tools/generate/exec/generator.go b/cmd/influx_tools/generate/exec/generator.go new file mode 100644 index 0000000000..c70bb7ff59 --- /dev/null +++ b/cmd/influx_tools/generate/exec/generator.go @@ -0,0 +1,187 @@ +package exec + +import ( + "context" + "fmt" + "path" + "path/filepath" + "strconv" + "sync" + + "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" + "github.com/influxdata/influxdb/cmd/influx_tools/internal/shard" + "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/services/meta" + "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/influxdata/influxdb/tsdb/index/tsi1" +) + +type Generator struct { + Concurrency int + BuildTSI bool + + sfile *tsdb.SeriesFile +} + +func (g *Generator) Run(ctx context.Context, database, shardPath string, groups []meta.ShardGroupInfo, gens []SeriesGenerator) (err error) { + limit := make(chan struct{}, g.Concurrency) + for i := 0; i < g.Concurrency; i++ { + limit <- struct{}{} + } + + var ( + wg sync.WaitGroup + errs errlist.ErrorList + ch = make(chan error, len(groups)) + ) + + dbPath := path.Dir(shardPath) + g.sfile = tsdb.NewSeriesFile(filepath.Join(dbPath, tsdb.SeriesFileDirectory)) + if err := g.sfile.Open(); err != nil { + return err + } + defer g.sfile.Close() + g.sfile.DisableCompactions() + + wg.Add(len(groups)) + for i := 0; i < len(groups); i++ { + go func(n int) { + <-limit + defer func() { + wg.Done() + limit <- struct{}{} + }() + + sgi := &groups[n] + if len(sgi.Shards) > 1 { + ch <- fmt.Errorf("multiple shards for the same owner %v", sgi.Shards[0].Owners) + return + } + + id := sgi.Shards[0].ID + + var ( + idx seriesIndex + ti *tsi1.Index + ) + if g.BuildTSI { + ti = tsi1.NewIndex(g.sfile, database, tsi1.WithPath(filepath.Join(shardPath, strconv.Itoa(int(id)), "index"))) + if err := ti.Open(); err != nil { + ch <- fmt.Errorf("error opening TSI1 index %d: %s", id, err.Error()) + return + } + idx = ti + } else { + idx = &seriesFileAdapter{sf: g.sfile, buf: make([]byte, 0, 2048)} + } + + if err := g.writeShard(idx, gens[n], id, shardPath); err != nil { + ch <- fmt.Errorf("error writing shard %d: %s", id, err.Error()) + } + + if ti != nil { + ti.Compact() + ti.Wait() + if err := ti.Close(); err != nil { + ch <- fmt.Errorf("error compacting TSI1 index %d: %s", id, err.Error()) + } + } + }(i) + } + wg.Wait() + + close(ch) + for e := range ch { + errs.Add(e) + } + + parts := g.sfile.Partitions() + wg.Add(len(parts)) + ch = make(chan error, len(parts)) + for i := range parts { + go func(n int) { + <-limit + defer func() { + wg.Done() + limit <- struct{}{} + }() + + p := parts[n] + c := tsdb.NewSeriesPartitionCompactor() + if err := c.Compact(p); err != nil { + ch <- fmt.Errorf("error compacting series partition %d: %s", n, err.Error()) + } + }(i) + } + wg.Wait() + + close(ch) + for e := range ch { + errs.Add(e) + } + + return errs.Err() +} + +// seriesBatchSize specifies the number of series keys passed to the index. +const seriesBatchSize = 1000 + +func (g *Generator) writeShard(idx seriesIndex, sg SeriesGenerator, id uint64, path string) error { + sw := shard.NewWriter(id, path) + defer sw.Close() + + var ( + keys [][]byte + names [][]byte + tags []models.Tags + ) + + for sg.Next() { + seriesKey := sg.Key() + keys = append(keys, seriesKey) + names = append(names, sg.Name()) + tags = append(tags, sg.Tags()) + + if len(keys) == seriesBatchSize { + if err := idx.CreateSeriesListIfNotExists(keys, names, tags); err != nil { + return err + } + keys = keys[:0] + names = names[:0] + tags = tags[:0] + } + + vg := sg.ValuesGenerator() + + key := tsm1.SeriesFieldKeyBytes(string(seriesKey), string(sg.Field())) + for vg.Next() { + sw.WriteV(key, vg.Values()) + } + + if err := sw.Err(); err != nil { + return err + } + } + + if len(keys) > seriesBatchSize { + if err := idx.CreateSeriesListIfNotExists(keys, names, tags); err != nil { + return err + } + } + return nil +} + +type seriesIndex interface { + CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) error +} + +type seriesFileAdapter struct { + sf *tsdb.SeriesFile + buf []byte +} + +func (s *seriesFileAdapter) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsSlice []models.Tags) (err error) { + _, err = s.sf.CreateSeriesListIfNotExists(names, tagsSlice) + return +} diff --git a/cmd/influx_tools/generate/exec/series_generator.go b/cmd/influx_tools/generate/exec/series_generator.go new file mode 100644 index 0000000000..7aed0629cd --- /dev/null +++ b/cmd/influx_tools/generate/exec/series_generator.go @@ -0,0 +1,89 @@ +package exec + +import ( + "github.com/influxdata/influxdb/models" + "github.com/influxdata/platform/pkg/data/gen" +) + +type SeriesGenerator interface { + // Next advances the series generator to the next series key. + Next() bool + + // Key returns the series key. + // The returned value may be cached. + Key() []byte + + // Name returns the name of the measurement. + // The returned value may be cached. + Name() []byte + + // Tags returns the tag set. + // The returned value may be cached. + Tags() models.Tags + + // Field returns the name of the field. + // The returned value may be cached. + Field() []byte + + // ValuesGenerator returns a values sequence for the current series. + ValuesGenerator() gen.ValuesSequence +} + +type cache struct { + key []byte + tags models.Tags +} + +type seriesGenerator struct { + name []byte + tags gen.TagsSequence + field []byte + vg gen.ValuesSequence + + c cache +} + +func NewSeriesGenerator(name []byte, field []byte, vg gen.ValuesSequence, tags gen.TagsSequence) SeriesGenerator { + return &seriesGenerator{ + name: name, + field: field, + vg: vg, + tags: tags, + } +} + +func (g *seriesGenerator) Next() bool { + if g.tags.Next() { + g.c = cache{} + g.vg.Reset() + return true + } + + return false +} + +func (g *seriesGenerator) Key() []byte { + if len(g.c.key) == 0 { + g.c.key = models.MakeKey(g.name, g.tags.Value()) + } + return g.c.key +} + +func (g *seriesGenerator) Name() []byte { + return g.name +} + +func (g *seriesGenerator) Tags() models.Tags { + if len(g.c.tags) == 0 { + g.c.tags = g.tags.Value().Clone() + } + return g.c.tags +} + +func (g *seriesGenerator) Field() []byte { + return g.field +} + +func (g *seriesGenerator) ValuesGenerator() gen.ValuesSequence { + return g.vg +} diff --git a/cmd/influx_tools/generate/init/command.go b/cmd/influx_tools/generate/init/command.go new file mode 100644 index 0000000000..134b943842 --- /dev/null +++ b/cmd/influx_tools/generate/init/command.go @@ -0,0 +1,79 @@ +package init + +import ( + "errors" + "flag" + "io" + "os" + + "github.com/influxdata/influxdb/cmd/influx_tools/generate" + "github.com/influxdata/influxdb/cmd/influx_tools/server" +) + +// Command represents the program execution for "store query". +type Command struct { + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + server server.Interface + + configPath string + printOnly bool + spec generate.Spec +} + +// NewCommand returns a new instance of Command. +func NewCommand(server server.Interface) *Command { + return &Command{ + Stdin: os.Stdin, + Stdout: os.Stdout, + Stderr: os.Stderr, + server: server, + } +} + +func (cmd *Command) Run(args []string) (err error) { + err = cmd.parseFlags(args) + if err != nil { + return err + } + + err = cmd.server.Open(cmd.configPath) + if err != nil { + return err + } + + plan, err := cmd.spec.Plan(cmd.server) + if err != nil { + return err + } + + plan.PrintPlan(cmd.Stdout) + + if !cmd.printOnly { + return plan.InitMetadata(cmd.server.MetaClient()) + } + + return nil +} + +func (cmd *Command) parseFlags(args []string) error { + fs := flag.NewFlagSet("gen-init", flag.ContinueOnError) + fs.StringVar(&cmd.configPath, "config", "", "Config file") + fs.BoolVar(&cmd.printOnly, "print", false, "Print data spec only") + cmd.spec.AddFlags(fs) + + if err := fs.Parse(args); err != nil { + return err + } + + if cmd.spec.Database == "" { + return errors.New("database is required") + } + + if cmd.spec.Retention == "" { + return errors.New("retention policy is required") + } + + return nil +} diff --git a/cmd/influx_tools/generate/plan.go b/cmd/influx_tools/generate/plan.go new file mode 100644 index 0000000000..26a36b0cd8 --- /dev/null +++ b/cmd/influx_tools/generate/plan.go @@ -0,0 +1,230 @@ +package generate + +import ( + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "text/tabwriter" + "time" + + "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" + "github.com/influxdata/influxdb/cmd/influx_tools/server" + "github.com/influxdata/influxdb/services/meta" + "github.com/pkg/errors" +) + +type Plan struct { + Database string + Retention string + ReplicaN int + StartTime time.Time + ShardCount int + ShardDuration time.Duration + Tags TagCardinalities + PointsPerSeriesPerShard int + DatabasePath string + + info *meta.DatabaseInfo + groups []meta.ShardGroupInfo +} + +func (p *Plan) String() string { + sb := new(strings.Builder) + p.PrintPlan(sb) + return sb.String() +} + +func (p *Plan) PrintPlan(w io.Writer) { + tw := tabwriter.NewWriter(w, 25, 4, 2, ' ', 0) + fmt.Fprintf(tw, "Data Path\t%s\n", p.ShardPath()) + fmt.Fprintf(tw, "Tag cardinalities\t%s\n", p.Tags) + fmt.Fprintf(tw, "Points per series per shard\t%d\n", p.PointsPerSeriesPerShard) + fmt.Fprintf(tw, "Total points per shard\t%d\n", p.Tags.Cardinality()*p.PointsPerSeriesPerShard) + fmt.Fprintf(tw, "Total series\t%d\n", p.Tags.Cardinality()) + fmt.Fprintf(tw, "Total points\t%d\n", p.Tags.Cardinality()*p.ShardCount*p.PointsPerSeriesPerShard) + fmt.Fprintf(tw, "Shard Count\t%d\n", p.ShardCount) + fmt.Fprintf(tw, "Database\t%s/%s (Shard duration: %s)\n", p.Database, p.Retention, p.ShardDuration) + fmt.Fprintf(tw, "Start time\t%s\n", p.StartTime) + fmt.Fprintf(tw, "End time\t%s\n", p.EndTime()) + tw.Flush() +} + +func (p *Plan) ShardPath() string { + return filepath.Join(p.DatabasePath, p.Retention) +} + +// TimeSpan returns the total duration for which the data set. +func (p *Plan) TimeSpan() time.Duration { + return p.ShardDuration * time.Duration(p.ShardCount) +} + +func (p *Plan) EndTime() time.Time { + return p.StartTime.Add(p.TimeSpan()) +} + +func (p *Plan) InitMetadata(client server.MetaClient) (err error) { + if err = client.DropDatabase(p.Database); err != nil { + return err + } + + rp := meta.RetentionPolicySpec{ + Name: p.Retention, + ShardGroupDuration: p.ShardDuration, + ReplicaN: &p.ReplicaN, + } + info, err := client.CreateDatabaseWithRetentionPolicy(p.Database, &rp) + if err != nil { + return err + } + + return p.createShardGroupMetadata(client, info.DefaultRetentionPolicy) +} + +// InitFileSystem initializes the file system structure, cleaning up +// existing files and re-creating the appropriate shard directories. +func (p *Plan) InitFileSystem(client server.MetaClient) error { + var err error + if err = os.RemoveAll(p.DatabasePath); err != nil { + return err + } + + minT, maxT := p.TimeRange() + + groups, err := client.NodeShardGroupsByTimeRange(p.Database, p.Retention, minT, maxT) + if err != nil { + return err + } + + p.groups = groups + + for i := 0; i < len(groups); i++ { + sgi := &groups[i] + if len(sgi.Shards) > 1 { + return fmt.Errorf("multiple shards for the same owner %v", sgi.Shards[0].Owners) + } + + if err = os.MkdirAll(filepath.Join(p.ShardPath(), strconv.Itoa(int(sgi.Shards[0].ID))), 0777); err != nil { + return err + } + } + + p.info = client.Database(p.Database) + + return nil +} + +// NodeShardGroups returns ShardGroupInfo with Shards limited to the current node +func (p *Plan) NodeShardGroups() []meta.ShardGroupInfo { + return p.groups +} + +func (p *Plan) ShardGroups() []meta.ShardGroupInfo { + return p.info.RetentionPolicy(p.info.DefaultRetentionPolicy).ShardGroups +} + +func (p *Plan) createShardGroupMetadata(client server.MetaClient, rp string) error { + ts := p.StartTime.Truncate(p.ShardDuration).UTC() + + var err error + groups := make([]*meta.ShardGroupInfo, p.ShardCount) + for i := 0; i < p.ShardCount; i++ { + groups[i], err = client.CreateShardGroup(p.Database, rp, ts) + if err != nil { + return err + } + ts = ts.Add(p.ShardDuration) + } + + return nil +} + +func (p *Plan) TimeRange() (start, end time.Time) { + start = p.StartTime.Truncate(p.ShardDuration).UTC() + end = start.Add(time.Duration(p.ShardDuration.Nanoseconds() * int64(p.ShardCount))) + return start, end +} + +func (p *Plan) Validate() error { + // build default values + def := &planDefaults{} + WalkPlan(def, p) + + // validate + val := &planValidator{} + WalkPlan(val, p) + return val.Err() +} + +type Visitor interface { + Visit(node Node) Visitor +} + +type Node interface{ node() } + +func (*Plan) node() {} + +func WalkPlan(v Visitor, node Node) { + if v = v.Visit(node); v == nil { + return + } + + switch n := node.(type) { + case *Plan: + + default: + panic(fmt.Sprintf("WalkConfig: unexpected node type %T", n)) + } +} + +type planValidator struct { + errs errlist.ErrorList +} + +func (v *planValidator) Visit(node Node) Visitor { + switch n := node.(type) { + case *Plan: + if n.DatabasePath == "" { + v.errs.Add(errors.New("missing DataPath")) + } + + if n.StartTime.Add(n.TimeSpan()).After(time.Now()) { + v.errs.Add(fmt.Errorf("start time must be ≤ %s", time.Now().Truncate(n.ShardDuration).UTC().Add(-n.TimeSpan()))) + } + } + + return v +} +func (v *planValidator) Err() error { + return v.errs.Err() +} + +type planDefaults struct{} + +func (v *planDefaults) Visit(node Node) Visitor { + switch n := node.(type) { + case *Plan: + if n.DatabasePath == "" { + n.DatabasePath = "${HOME}/.influxdb/data" + } + if n.Database == "" { + n.Database = "db" + } + if n.Retention == "" { + n.Retention = "autogen" + } + if n.ShardDuration == 0 { + n.ShardDuration = 24 * time.Hour + } + if n.ShardCount == 0 { + n.ShardCount = 1 + } + if n.StartTime.IsZero() { + n.StartTime = time.Now().Truncate(n.ShardDuration).Add(-n.TimeSpan()) + } + } + + return v +} diff --git a/cmd/influx_tools/generate/spec.go b/cmd/influx_tools/generate/spec.go new file mode 100644 index 0000000000..975cca6211 --- /dev/null +++ b/cmd/influx_tools/generate/spec.go @@ -0,0 +1,92 @@ +package generate + +import ( + "flag" + "fmt" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/cmd/influx_tools/server" +) + +type TagCardinalities []int + +func (t TagCardinalities) String() string { + s := make([]string, 0, len(t)) + for i := 0; i < len(t); i++ { + s = append(s, strconv.Itoa(t[i])) + } + return fmt.Sprintf("[%s]", strings.Join(s, ",")) +} + +func (t TagCardinalities) Cardinality() int { + n := 1 + for i := range t { + n *= t[i] + } + return n +} + +func (t *TagCardinalities) Set(tags string) error { + *t = (*t)[:0] + for _, s := range strings.Split(tags, ",") { + v, err := strconv.Atoi(s) + if err != nil { + return fmt.Errorf("cannot parse tag cardinality: %s", s) + } + *t = append(*t, v) + } + return nil +} + +type Spec struct { + StartTime string + Database string + Retention string + ReplicaN int + ShardCount int + ShardDuration time.Duration + Tags TagCardinalities + PointsPerSeriesPerShard int +} + +func (a *Spec) AddFlags(fs *flag.FlagSet) { + fs.StringVar(&a.StartTime, "start-time", "", "Start time") + fs.StringVar(&a.Database, "db", "db", "Name of database to create") + fs.StringVar(&a.Retention, "rp", "rp", "Name of retention policy") + fs.IntVar(&a.ReplicaN, "rf", 1, "Replication factor") + fs.IntVar(&a.ShardCount, "shards", 1, "Number of shards to create") + fs.DurationVar(&a.ShardDuration, "shard-duration", 24*time.Hour, "Shard duration (default 24h)") + a.Tags = []int{10, 10, 10} + fs.Var(&a.Tags, "t", "Tag cardinality") + fs.IntVar(&a.PointsPerSeriesPerShard, "p", 100, "Points per series per shard") +} + +func (a *Spec) Plan(server server.Interface) (*Plan, error) { + plan := &Plan{ + Database: a.Database, + Retention: a.Retention, + ReplicaN: a.ReplicaN, + ShardCount: a.ShardCount, + ShardDuration: a.ShardDuration, + Tags: a.Tags, + PointsPerSeriesPerShard: a.PointsPerSeriesPerShard, + DatabasePath: filepath.Join(server.TSDBConfig().Dir, a.Database), + } + + if a.StartTime != "" { + if t, err := time.Parse(time.RFC3339, a.StartTime); err != nil { + return nil, err + } else { + plan.StartTime = t.UTC() + } + } + + if err := plan.Validate(); err != nil { + return nil, err + } + + return plan, nil +} diff --git a/cmd/influx_tools/help/help.go b/cmd/influx_tools/help/help.go index dcff693b04..016c10edfe 100644 --- a/cmd/influx_tools/help/help.go +++ b/cmd/influx_tools/help/help.go @@ -35,6 +35,8 @@ The commands are: export reshapes existing shards to a new shard duration compact-shard fully compacts the specified shard + gen-init creates database and retention policy metadata + gen-exec generates data help display this help message Use "influx-tools command -help" for more information about a command. diff --git a/cmd/influx_tools/importer/importer.go b/cmd/influx_tools/importer/importer.go index c428c779dc..992657844f 100644 --- a/cmd/influx_tools/importer/importer.go +++ b/cmd/influx_tools/importer/importer.go @@ -96,7 +96,7 @@ func (i *importer) createDatabaseWithRetentionPolicy(rp *meta.RetentionPolicySpe } func (i *importer) StartShardGroup(start int64, end int64) error { - existingSg, err := i.MetaClient.ShardGroupsByTimeRange(i.db, i.rpi.Name, time.Unix(0, start), time.Unix(0, end)) + existingSg, err := i.MetaClient.NodeShardGroupsByTimeRange(i.db, i.rpi.Name, time.Unix(0, start), time.Unix(0, end)) if err != nil { return err } diff --git a/cmd/influx_tools/internal/profile/profile.go b/cmd/influx_tools/internal/profile/profile.go new file mode 100644 index 0000000000..b13a644b0e --- /dev/null +++ b/cmd/influx_tools/internal/profile/profile.go @@ -0,0 +1,64 @@ +package profile + +import ( + "log" + "os" + "runtime" + "runtime/pprof" +) + +type Config struct { + // CPU, if set, specifies the file name of the CPU profile to capture + CPU string + + // Memory, if set, specifies the file name of the CPU profile to capture + Memory string +} + +func (c *Config) noProfiles() bool { + return c.CPU == "" && c.Memory == "" +} + +// Start starts a CPU and / or Memory profile if configured and returns a +// function that should be called to terminate the profiles. +func (c *Config) Start() func() { + if c.noProfiles() { + return func() { return } + } + + var prof struct { + cpu *os.File + mem *os.File + } + + if c.CPU != "" { + f, err := os.Create(c.CPU) + if err != nil { + log.Fatalf("cpuprofile: %v", err) + } + prof.cpu = f + _ = pprof.StartCPUProfile(prof.cpu) + } + + if c.Memory != "" { + f, err := os.Create(c.Memory) + if err != nil { + log.Fatalf("memprofile: %v", err) + } + prof.mem = f + runtime.MemProfileRate = 4096 + } + + return func() { + if prof.cpu != nil { + pprof.StopCPUProfile() + _ = prof.cpu.Close() + prof.cpu = nil + } + if prof.mem != nil { + _ = pprof.Lookup("heap").WriteTo(prof.mem, 0) + _ = prof.mem.Close() + prof.mem = nil + } + } +} diff --git a/cmd/influx_tools/internal/shard/writer.go b/cmd/influx_tools/internal/shard/writer.go index f7136be4e2..8fafd20d59 100644 --- a/cmd/influx_tools/internal/shard/writer.go +++ b/cmd/influx_tools/internal/shard/writer.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/influxdb/cmd/influx_tools/internal/errlist" "github.com/influxdata/influxdb/tsdb/engine/tsm1" + "github.com/influxdata/platform/pkg/data/gen" ) const ( @@ -22,6 +23,7 @@ type Writer struct { files []string gen, seq int err error + buf []byte auto bool } @@ -62,6 +64,8 @@ func NewWriter(id uint64, path string, opts ...option) *Writer { opt(w) } + w.nextTSM() + return w } @@ -70,16 +74,39 @@ func (w *Writer) Write(key []byte, values tsm1.Values) { return } - if w.tw == nil { + if w.tw.Size() > maxTSMFileSize { + w.closeTSM() w.nextTSM() } + if err := w.tw.Write(key, values); err != nil { + if err == tsm1.ErrMaxBlocksExceeded { + w.closeTSM() + w.nextTSM() + } else { + w.err = err + } + } +} + +func (w *Writer) WriteV(key []byte, values gen.Values) { + if w.err != nil { + return + } + if w.tw.Size() > maxTSMFileSize { w.closeTSM() w.nextTSM() } - if err := w.tw.Write(key, values); err != nil { + minT, maxT := values.MinTime(), values.MaxTime() + var err error + if w.buf, err = values.Encode(w.buf); err != nil { + w.err = err + return + } + + if err := w.tw.WriteBlock(key, minT, maxT, w.buf); err != nil { if err == tsm1.ErrMaxBlocksExceeded { w.closeTSM() w.nextTSM() diff --git a/cmd/influx_tools/main.go b/cmd/influx_tools/main.go index 44d3fd0d82..7d50ce80e2 100644 --- a/cmd/influx_tools/main.go +++ b/cmd/influx_tools/main.go @@ -6,10 +6,13 @@ import ( "fmt" "io" "os" + "time" "github.com/influxdata/influxdb/cmd" "github.com/influxdata/influxdb/cmd/influx_tools/compact" "github.com/influxdata/influxdb/cmd/influx_tools/export" + genexec "github.com/influxdata/influxdb/cmd/influx_tools/generate/exec" + geninit "github.com/influxdata/influxdb/cmd/influx_tools/generate/init" "github.com/influxdata/influxdb/cmd/influx_tools/help" "github.com/influxdata/influxdb/cmd/influx_tools/importer" "github.com/influxdata/influxdb/cmd/influx_tools/server" @@ -65,10 +68,21 @@ func (m *Main) Run(args ...string) error { return fmt.Errorf("export failed: %s", err) } case "import": - cmd := importer.NewCommand(&ossServer{logger: zap.NewNop()}) - if err := cmd.Run(args); err != nil { + c := importer.NewCommand(&ossServer{logger: zap.NewNop()}) + if err := c.Run(args); err != nil { return fmt.Errorf("import failed: %s", err) } + case "gen-init": + c := geninit.NewCommand(&ossServer{logger: zap.NewNop()}) + if err := c.Run(args); err != nil { + return fmt.Errorf("gen-init failed: %s", err) + } + case "gen-exec": + deps := genexec.Dependencies{Server: &ossServer{logger: zap.NewNop()}} + c := genexec.NewCommand(deps) + if err := c.Run(args); err != nil { + return fmt.Errorf("gen-exec failed: %s", err) + } default: return fmt.Errorf(`unknown command "%s"`+"\n"+`Run 'influx-tools help' for usage`+"\n\n", name) } @@ -81,6 +95,7 @@ type ossServer struct { config *run.Config noClient bool client *meta.Client + mc server.MetaClient } func (s *ossServer) Open(path string) (err error) { @@ -103,6 +118,7 @@ func (s *ossServer) Open(path string) (err error) { s.client = nil return err } + s.mc = &ossMetaClient{s.client} return nil } @@ -113,9 +129,10 @@ func (s *ossServer) Close() { } } -func (s *ossServer) MetaClient() server.MetaClient { return s.client } +func (s *ossServer) MetaClient() server.MetaClient { return s.mc } func (s *ossServer) TSDBConfig() tsdb.Config { return s.config.Data } func (s *ossServer) Logger() *zap.Logger { return s.logger } +func (s *ossServer) NodeID() uint64 { return 0 } // ParseConfig parses the config at path. // It returns a demo configuration if path is blank. @@ -152,3 +169,13 @@ func (s *ossServer) resolvePath(path string) string { } return "" } + +type ossMetaClient struct { + *meta.Client +} + +func (*ossMetaClient) NodeID() uint64 { return 0 } + +func (c *ossMetaClient) NodeShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) { + return c.ShardGroupsByTimeRange(database, policy, min, max) +} diff --git a/cmd/influx_tools/server/server.go b/cmd/influx_tools/server/server.go index 4c21e25415..5be44b8847 100644 --- a/cmd/influx_tools/server/server.go +++ b/cmd/influx_tools/server/server.go @@ -18,6 +18,8 @@ type Interface interface { type MetaClient interface { Database(name string) *meta.DatabaseInfo + NodeID() uint64 + DropDatabase(name string) error RetentionPolicy(database, name string) (rpi *meta.RetentionPolicyInfo, err error) ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) @@ -26,4 +28,8 @@ type MetaClient interface { CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) DeleteShardGroup(database, policy string, id uint64) error CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) + // NodeShardGroupsByTimeRange returns a list of all shard groups on a database and policy + // that may contain data for the specified time range and limits the Shards to the current node only. + // Shard groups are sorted by start time. + NodeShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) }