From a668a9ed0a65fb2e3dd093f5e247b1ae750b7f36 Mon Sep 17 00:00:00 2001 From: Daniel Moran <danxmoran@gmail.com> Date: Mon, 25 Jan 2021 14:28:36 -0800 Subject: [PATCH] feat(cmd/influxd): add `influxd inspect export-lp` command to export LP (#20467) --- CHANGELOG.md | 1 + cmd/influxd/inspect/export_lp.go | 423 +++++++++++++++++++++++++ cmd/influxd/inspect/export_lp_test.go | 438 ++++++++++++++++++++++++++ cmd/influxd/inspect/inspect.go | 14 +- cmd/influxd/main.go | 2 +- 5 files changed, 866 insertions(+), 12 deletions(-) create mode 100644 cmd/influxd/inspect/export_lp.go create mode 100644 cmd/influxd/inspect/export_lp_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cd15f906a1..ae29a3deab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards 1. [20524](https://github.com/influxdata/influxdb/pull/20524): Add `influxd print-config` command to support automated config inspection. 1. [20561](https://github.com/influxdata/influxdb/pull/20561): Add `nats-port` config option for `influxd` server. 1. [20564](https://github.com/influxdata/influxdb/pull/20564): Add `nats-max-payload-bytes` config option for `influxd` server. +1. [20467](https://github.com/influxdata/influxdb/pull/20467): Add `influxd inspect export-lp` command to extract data in line-protocol format. ### Bug Fixes diff --git a/cmd/influxd/inspect/export_lp.go b/cmd/influxd/inspect/export_lp.go new file mode 100644 index 0000000000..8c6dd3b300 --- /dev/null +++ b/cmd/influxd/inspect/export_lp.go @@ -0,0 +1,423 @@ +package inspect + +import ( + "bufio" + "compress/gzip" + "fmt" + "io" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + "github.com/influxdata/influxdb/v2" + "github.com/influxdata/influxdb/v2/kit/cli" + "github.com/influxdata/influxdb/v2/models" + "github.com/influxdata/influxdb/v2/pkg/escape" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// exportFlags contains CLI-compatible forms of export options. +type exportFlags struct { + enginePath string + bucketID influxdb.ID + measurements []string + startTime string + endTime string + + outputPath string + compress bool + + logLevel zapcore.Level +} + +// exportFilters contains storage-optimized forms of parameters used to restrict exports. +type exportFilters struct { + measurements map[string]struct{} + start int64 + end int64 +} + +func newFilters() *exportFilters { + return &exportFilters{ + measurements: make(map[string]struct{}), + start: math.MinInt64, + end: math.MaxInt64, + } +} + +// filters converts CLI-specified filters into storage-optimized forms. +func (f *exportFlags) filters() (*exportFilters, error) { + filters := newFilters() + + if f.startTime != "" { + s, err := time.Parse(time.RFC3339, f.startTime) + if err != nil { + return nil, err + } + filters.start = s.UnixNano() + } + + if f.endTime != "" { + e, err := time.Parse(time.RFC3339, f.endTime) + if err != nil { + return nil, err + } + filters.end = e.UnixNano() + } + + for _, m := range f.measurements { + filters.measurements[m] = struct{}{} + } + + return filters, nil +} + +func newFlags() *exportFlags { + return &exportFlags{ + logLevel: zapcore.InfoLevel, + compress: false, + } +} + +// NewExportLineProtocolCommand builds and registers the `export` subcommand of `influxd inspect`. +func NewExportLineProtocolCommand(v *viper.Viper) *cobra.Command { + flags := newFlags() + + cmd := &cobra.Command{ + Use: `export-lp`, + Short: "Export TSM data as line protocol", + Long: ` +This command will export all TSM data stored in a bucket +to line protocol for inspection and re-ingestion.`, + Args: cobra.NoArgs, + RunE: func(*cobra.Command, []string) error { + return exportRunE(flags) + }, + } + + opts := []cli.Opt{ + { + DestP: &flags.enginePath, + Flag: "engine-path", + Desc: "path to persistent engine files", + Required: true, + }, + { + DestP: &flags.bucketID, + Flag: "bucket-id", + Desc: "ID of bucket containing data to export", + Required: true, + }, + { + DestP: &flags.measurements, + Flag: "measurement", + Desc: "optional: name(s) of specific measurement to export", + }, + { + DestP: &flags.startTime, + Flag: "start", + Desc: "optional: the start time to export (RFC3339 format)", + }, + { + DestP: &flags.endTime, + Flag: "end", + Desc: "optional: the end time to export (RFC3339 format)", + }, + { + DestP: &flags.outputPath, + Flag: "output-path", + Desc: "path where exported line-protocol should be written", + Required: true, + }, + { + DestP: &flags.compress, + Flag: "compress", + Desc: "if true, compress output with GZIP", + }, + { + DestP: &flags.logLevel, + Flag: "log-level", + Default: flags.logLevel, + }, + } + + cli.BindOptions(v, cmd, opts) + return cmd +} + +func exportRunE(flags *exportFlags) error { + logconf := zap.NewProductionConfig() + logconf.Level = zap.NewAtomicLevelAt(flags.logLevel) + logger, err := logconf.Build() + if err != nil { + return err + } + + filters, err := flags.filters() + if err != nil { + return err + } + + f, err := os.Create(flags.outputPath) + if err != nil { + return err + } + defer f.Close() + + // Because calling (*os.File).Write is relatively expensive, + // and we don't *need* to sync to disk on every written line of export, + // use a sized buffered writer so that we only sync the file every megabyte. + bw := bufio.NewWriterSize(f, 1024*1024) + defer bw.Flush() + + var w io.Writer = bw + if flags.compress { + gzw := gzip.NewWriter(w) + defer gzw.Close() + w = gzw + } + + if err := exportTSMs(flags.enginePath, flags.bucketID, filters, w, logger); err != nil { + return err + } + + if err := exportWALs(flags.enginePath, flags.bucketID, filters, w, logger); err != nil { + return err + } + + logger.Info("export complete") + return nil +} + +// exportTSMs finds, reads, and exports all data stored in TSM files for a bucket that matches a set of filters. +func exportTSMs(engineDir string, bucketID influxdb.ID, filters *exportFilters, out io.Writer, log *zap.Logger) error { + // TSM is stored under `<engine>/data/<bucket-id>/<rp>/<shard-id>/*.tsm` + tsmDir := filepath.Join(engineDir, "data", bucketID.String()) + tsmPattern := filepath.Join(tsmDir, "*", "*", fmt.Sprintf("*.%s", tsm1.TSMFileExtension)) + log.Debug("searching for TSM files", zap.String("file_pattern", tsmPattern)) + tsmFiles, err := filepath.Glob(tsmPattern) + if err != nil { + return err + } + + log.Info("exporting TSM files", zap.String("tsm_dir", tsmDir), zap.Int("file_count", len(tsmFiles))) + + // Ensure we export in the same order that the TSM file store would process the files. + // See FileStore.Open() in tsm1/file_store.go + sort.Strings(tsmFiles) + + for _, f := range tsmFiles { + if err := exportTSM(f, filters, out, log); err != nil { + return err + } + } + + return nil +} + +func exportTSM(tsmFile string, filters *exportFilters, out io.Writer, log *zap.Logger) error { + log.Debug("exporting TSM file", zap.String("file_path", tsmFile)) + f, err := os.Open(tsmFile) + if err != nil { + // TSM files can disappear if we're exporting from the engine dir of a live DB, + // and compactions run between our path-lookup and export steps. + if os.IsNotExist(err) { + log.Warn("skipping missing TSM file", zap.String("file_path", tsmFile)) + return nil + } + return err + } + defer f.Close() + + reader, err := tsm1.NewTSMReader(f) + if err != nil { + return err + } + defer reader.Close() + + if !reader.OverlapsTimeRange(filters.start, filters.end) { + return nil + } + filterMeasurement := len(filters.measurements) > 0 + + for i := 0; i < reader.KeyCount(); i++ { + key, _ := reader.KeyAt(i) + values, err := reader.ReadAll(key) + if err != nil { + log.Error( + "unable to read key, skipping point", + zap.ByteString("key", key), + zap.String("tsm_file", tsmFile), + zap.Error(err), + ) + continue + } + key, field := tsm1.SeriesAndFieldFromCompositeKey(key) + if filterMeasurement { + measurement, _ := models.ParseKey(key) + if _, ok := filters.measurements[measurement]; !ok { + continue + } + } + field = escape.Bytes(field) + + if err := writeValues(key, field, values, filters, out, log); err != nil { + return err + } + } + + return nil +} + +// exportTSMs finds, reads, and exports all data stored in WAL files for a bucket that matches a set of filters. +// +// N.B. exported lines can include some duplicates from a matching call to exportTSMs on the same engine/bucket. +// This is OK since writes are idempotent. +func exportWALs(engineDir string, bucketID influxdb.ID, filters *exportFilters, out io.Writer, log *zap.Logger) error { + // WAL is stored under `<engine>/wal/<bucket-id>/<rp>/<shard-id>/*.wal` + walDir := filepath.Join(engineDir, "wal", bucketID.String()) + walPattern := filepath.Join(walDir, "*", "*", fmt.Sprintf("*.%s", tsm1.WALFileExtension)) + log.Debug("searching for WAL files", zap.String("file_pattern", walPattern)) + walFiles, err := filepath.Glob(walPattern) + if err != nil { + return err + } + + // N.B. WAL files might contain tombstone markers that haven't been sync'd down into TSM yet. + // We can't really deal with them when working at this low level, so we warn the user if we encounter one. + var tombstoneWarnOnce sync.Once + warnTombstone := func() { + tombstoneWarnOnce.Do(func() { + log.Warn("detected deletes in WAL file, some deleted data may be brought back by replaying this export") + }) + } + + // Ensure we export in the same order that the TSM WAL would process the files. + // See segmentFileNames in tsm1/wal.go + sort.Strings(walFiles) + + log.Info("exporting WAL files", zap.String("wal_dir", walDir), zap.Int("file_count", len(walFiles))) + for _, f := range walFiles { + if err := exportWAL(f, filters, out, log, warnTombstone); err != nil { + return err + } + } + + return nil +} + +func exportWAL(walFile string, filters *exportFilters, out io.Writer, log *zap.Logger, onDelete func()) error { + log.Debug("exporting WAL file", zap.String("file_path", walFile)) + f, err := os.Open(walFile) + if err != nil { + // WAL files can disappear if we're exporting from the engine dir of a live DB, + // and a snapshot is written between our path-lookup and export steps. + if os.IsNotExist(err) { + log.Warn("skipping missing WAL file", zap.String("file_path", walFile)) + return nil + } + } + defer f.Close() + + reader := tsm1.NewWALSegmentReader(f) + defer reader.Close() + + filterMeasurement := len(filters.measurements) > 0 + + for reader.Next() { + entry, err := reader.Read() + if err != nil { + n := reader.Count() + log.Error( + "stopping at corrupt position in WAL file", + zap.String("file_path", walFile), + zap.Int64("position", n), + ) + break + } + + switch t := entry.(type) { + case *tsm1.DeleteWALEntry, *tsm1.DeleteRangeWALEntry: + onDelete() + continue + case *tsm1.WriteWALEntry: + for key, values := range t.Values { + key, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) + if filterMeasurement { + measurement, _ := models.ParseKey(key) + if _, ok := filters.measurements[measurement]; !ok { + continue + } + } + field = escape.Bytes(field) + if err := writeValues(key, field, values, filters, out, log); err != nil { + return err + } + } + } + } + + return nil +} + +func writeValues(key []byte, field []byte, values []tsm1.Value, filters *exportFilters, out io.Writer, log *zap.Logger) error { + buf := []byte(fmt.Sprintf("%s %s=", key, field)) + prefixLen := len(buf) + + for _, value := range values { + ts := value.UnixNano() + if ts < filters.start || ts > filters.end { + continue + } + + // Re-slice buf to be "<series_key> <field>=". + buf = buf[:prefixLen] + + // Append the correct representation of the value. + switch v := value.Value().(type) { + case float64: + buf = strconv.AppendFloat(buf, v, 'g', -1, 64) + case int64: + buf = strconv.AppendInt(buf, v, 10) + buf = append(buf, 'i') + case uint64: + buf = strconv.AppendUint(buf, v, 10) + buf = append(buf, 'u') + case bool: + buf = strconv.AppendBool(buf, v) + case string: + buf = append(buf, '"') + buf = append(buf, models.EscapeStringField(v)...) + buf = append(buf, '"') + default: + // This shouldn't be possible. + log.Error( + "ignoring value with unsupported type", + zap.ByteString("key", key), + zap.ByteString("field", field), + zap.String("value", value.String()), + ) + continue + } + + // Now buf has "<series_key> <field>=<value>". + // Append the timestamp and a newline, then write it. + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, ts, 10) + buf = append(buf, '\n') + if _, err := out.Write(buf); err != nil { + // Underlying IO error needs to be returned. + return err + } + } + + return nil +} diff --git a/cmd/influxd/inspect/export_lp_test.go b/cmd/influxd/inspect/export_lp_test.go new file mode 100644 index 0000000000..6be7863837 --- /dev/null +++ b/cmd/influxd/inspect/export_lp_test.go @@ -0,0 +1,438 @@ +package inspect + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "sort" + "strconv" + "strings" + "testing" + + "github.com/golang/snappy" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +type corpus map[string][]tsm1.Value + +var ( + floatCorpus = corpus{ + tsm1.SeriesFieldKey("floats,k=f", "f"): []tsm1.Value{ + tsm1.NewValue(1, float64(1.5)), + tsm1.NewValue(2, float64(3)), + }, + } + + floatLines = []string{ + "floats,k=f f=1.5 1", + "floats,k=f f=3 2", + } + + intCorpus = corpus{ + tsm1.SeriesFieldKey("ints,k=i", "i"): []tsm1.Value{ + tsm1.NewValue(10, int64(15)), + tsm1.NewValue(20, int64(30)), + }, + } + + intLines = []string{ + "ints,k=i i=15i 10", + "ints,k=i i=30i 20", + } + + boolCorpus = corpus{ + tsm1.SeriesFieldKey("bools,k=b", "b"): []tsm1.Value{ + tsm1.NewValue(100, true), + tsm1.NewValue(200, false), + }, + } + + boolLines = []string{ + "bools,k=b b=true 100", + "bools,k=b b=false 200", + } + + stringCorpus = corpus{ + tsm1.SeriesFieldKey("strings,k=s", "s"): []tsm1.Value{ + tsm1.NewValue(1000, "1k"), + tsm1.NewValue(2000, "2k"), + }, + } + + stringLines = []string{ + `strings,k=s s="1k" 1000`, + `strings,k=s s="2k" 2000`, + } + + uintCorpus = corpus{ + tsm1.SeriesFieldKey("uints,k=u", "u"): []tsm1.Value{ + tsm1.NewValue(3000, uint64(45)), + tsm1.NewValue(4000, uint64(60)), + }, + } + + uintLines = []string{ + `uints,k=u u=45u 3000`, + `uints,k=u u=60u 4000`, + } + + escapeStringCorpus = corpus{ + tsm1.SeriesFieldKey("t", "s"): []tsm1.Value{ + tsm1.NewValue(1, `1. "quotes"`), + tsm1.NewValue(2, `2. back\slash`), + tsm1.NewValue(3, `3. bs\q"`), + }, + } + + escCorpusExpLines = []string{ + `t s="1. \"quotes\"" 1`, + `t s="2. back\\slash" 2`, + `t s="3. bs\\q\"" 3`, + } + + basicCorpus = make(corpus) + basicCorpusExpLines []string + + numsOnlyFilter = newFilters() + numsOnlyExpLines []string + + earlyEntriesOnlyFilter = newFilters() + earlyEntriesOnlyExpLines []string + + lateEntriesOnlyFilter = newFilters() + lateEntriesOnlyExpLines []string +) + +func init() { + for _, c := range []corpus{floatCorpus, intCorpus, boolCorpus, stringCorpus, uintCorpus} { + for k, v := range c { + basicCorpus[k] = v + } + } + + for _, l := range [][]string{floatLines, intLines, boolLines, stringLines, uintLines} { + basicCorpusExpLines = append(basicCorpusExpLines, l...) + } + + for _, m := range []string{"floats", "ints", "uints"} { + numsOnlyFilter.measurements[m] = struct{}{} + } + for _, l := range [][]string{floatLines, intLines, uintLines} { + numsOnlyExpLines = append(numsOnlyExpLines, l...) + } + + earlyEntriesOnlyFilter.end = 150 + earlyEntriesOnlyExpLines = append(earlyEntriesOnlyExpLines, floatLines...) + earlyEntriesOnlyExpLines = append(earlyEntriesOnlyExpLines, intLines...) + earlyEntriesOnlyExpLines = append(earlyEntriesOnlyExpLines, boolLines[0]) + + lateEntriesOnlyFilter.start = 150 + lateEntriesOnlyExpLines = append(lateEntriesOnlyExpLines, boolLines[1]) + lateEntriesOnlyExpLines = append(lateEntriesOnlyExpLines, stringLines...) + lateEntriesOnlyExpLines = append(lateEntriesOnlyExpLines, uintLines...) +} + +func Test_exportWAL(t *testing.T) { + log := zaptest.NewLogger(t) + + for _, c := range []struct { + corpus corpus + filter *exportFilters + lines []string + }{ + {corpus: basicCorpus, filter: newFilters(), lines: basicCorpusExpLines}, + {corpus: escapeStringCorpus, filter: newFilters(), lines: escCorpusExpLines}, + {corpus: basicCorpus, filter: numsOnlyFilter, lines: numsOnlyExpLines}, + {corpus: basicCorpus, filter: earlyEntriesOnlyFilter, lines: earlyEntriesOnlyExpLines}, + {corpus: basicCorpus, filter: lateEntriesOnlyFilter, lines: lateEntriesOnlyExpLines}, + } { + walFile, err := writeCorpusToWALFile(c.corpus) + if err != nil { + t.Fatal(err) + } + defer os.Remove(walFile.Name()) + + var out bytes.Buffer + if err := exportWAL(walFile.Name(), c.filter, &out, log, func() {}); err != nil { + t.Fatal(err) + } + + lines := strings.Split(out.String(), "\n") + for _, exp := range c.lines { + found := false + for _, l := range lines { + if exp == l { + found = true + break + } + } + + if !found { + t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String()) + } + } + } + + // Missing .wal file should not cause a failure. + var out bytes.Buffer + if err := exportWAL("file-that-does-not-exist.wal", newFilters(), &out, log, func() {}); err != nil { + t.Fatal(err) + } +} + +func Test_exportTSM(t *testing.T) { + log := zaptest.NewLogger(t) + + for _, c := range []struct { + corpus corpus + filter *exportFilters + lines []string + }{ + {corpus: basicCorpus, filter: newFilters(), lines: basicCorpusExpLines}, + {corpus: escapeStringCorpus, filter: newFilters(), lines: escCorpusExpLines}, + {corpus: basicCorpus, filter: numsOnlyFilter, lines: numsOnlyExpLines}, + {corpus: basicCorpus, filter: earlyEntriesOnlyFilter, lines: earlyEntriesOnlyExpLines}, + {corpus: basicCorpus, filter: lateEntriesOnlyFilter, lines: lateEntriesOnlyExpLines}, + } { + tsmFile, err := writeCorpusToTSMFile(c.corpus) + if err != nil { + t.Fatal(err) + } + defer os.Remove(tsmFile.Name()) + + var out bytes.Buffer + if err := exportTSM(tsmFile.Name(), c.filter, &out, log); err != nil { + t.Fatal(err) + } + + lines := strings.Split(out.String(), "\n") + for _, exp := range c.lines { + found := false + for _, l := range lines { + if exp == l { + found = true + break + } + } + + if !found { + t.Fatalf("expected line %q to be in exported output:\n%s", exp, out.String()) + } + } + } + + // Missing .tsm file should not cause a failure. + var out bytes.Buffer + if err := exportTSM("file-that-does-not-exist.tsm", newFilters(), &out, log); err != nil { + t.Fatal(err) + } +} + +var sink interface{} + +func benchmarkExportTSM(c corpus, b *testing.B) { + log := zap.NewNop() + + // Garbage collection is relatively likely to happen during export, so track allocations. + b.ReportAllocs() + + f, err := writeCorpusToTSMFile(c) + if err != nil { + b.Fatal(err) + } + defer os.Remove(f.Name()) + + var out bytes.Buffer + b.ResetTimer() + b.StartTimer() + for i := 0; i < b.N; i++ { + if err := exportTSM(f.Name(), newFilters(), &out, log); err != nil { + b.Fatal(err) + } + + sink = out.Bytes() + out.Reset() + } +} + +func BenchmarkExportTSMFloats_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeFloatsCorpus(100, 250), b) +} + +func BenchmarkExportTSMInts_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeIntsCorpus(100, 250), b) +} + +func BenchmarkExportTSMBools_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeBoolsCorpus(100, 250), b) +} + +func BenchmarkExportTSMStrings_100s_250vps(b *testing.B) { + benchmarkExportTSM(makeStringsCorpus(100, 250), b) +} + +func benchmarkExportWAL(c corpus, b *testing.B) { + log := zap.NewNop() + + // Garbage collection is relatively likely to happen during export, so track allocations. + b.ReportAllocs() + + f, err := writeCorpusToWALFile(c) + if err != nil { + b.Fatal(err) + } + defer os.Remove(f.Name()) + + var out bytes.Buffer + b.ResetTimer() + b.StartTimer() + for i := 0; i < b.N; i++ { + if err := exportWAL(f.Name(), newFilters(), &out, log, func() {}); err != nil { + b.Fatal(err) + } + + sink = out.Bytes() + out.Reset() + } +} + +func BenchmarkExportWALFloats_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeFloatsCorpus(100, 250), b) +} + +func BenchmarkExportWALInts_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeIntsCorpus(100, 250), b) +} + +func BenchmarkExportWALBools_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeBoolsCorpus(100, 250), b) +} + +func BenchmarkExportWALStrings_100s_250vps(b *testing.B) { + benchmarkExportWAL(makeStringsCorpus(100, 250), b) +} + +// makeCorpus returns a new corpus filled with values generated by fn. +// The RNG passed to fn is seeded with numSeries * numValuesPerSeries, for predictable output. +func makeCorpus(numSeries, numValuesPerSeries int, fn func(*rand.Rand) interface{}) corpus { + rng := rand.New(rand.NewSource(int64(numSeries) * int64(numValuesPerSeries))) + var unixNano int64 + corpus := make(corpus, numSeries) + for i := 0; i < numSeries; i++ { + vals := make([]tsm1.Value, numValuesPerSeries) + for j := 0; j < numValuesPerSeries; j++ { + vals[j] = tsm1.NewValue(unixNano, fn(rng)) + unixNano++ + } + + k := fmt.Sprintf("m,t=%d", i) + corpus[tsm1.SeriesFieldKey(k, "x")] = vals + } + + return corpus +} + +func makeFloatsCorpus(numSeries, numFloatsPerSeries int) corpus { + return makeCorpus(numSeries, numFloatsPerSeries, func(rng *rand.Rand) interface{} { + return rng.Float64() + }) +} + +func makeIntsCorpus(numSeries, numIntsPerSeries int) corpus { + return makeCorpus(numSeries, numIntsPerSeries, func(rng *rand.Rand) interface{} { + // This will only return positive integers. That's probably okay. + return rng.Int63() + }) +} + +func makeBoolsCorpus(numSeries, numBoolsPerSeries int) corpus { + return makeCorpus(numSeries, numBoolsPerSeries, func(rng *rand.Rand) interface{} { + return rand.Int63n(2) == 1 + }) +} + +func makeStringsCorpus(numSeries, numStringsPerSeries int) corpus { + return makeCorpus(numSeries, numStringsPerSeries, func(rng *rand.Rand) interface{} { + // The string will randomly have 2-6 parts + parts := make([]string, rand.Intn(4)+2) + + for i := range parts { + // Each part is a random base36-encoded number + parts[i] = strconv.FormatInt(rand.Int63(), 36) + } + + // Join the individual parts with underscores. + return strings.Join(parts, "_") + }) +} + +// writeCorpusToWALFile writes the given corpus as a WAL file, and returns a handle to that file. +// It is the caller's responsibility to remove the returned temp file. +func writeCorpusToWALFile(c corpus) (*os.File, error) { + walFile, err := ioutil.TempFile("", "export_test_corpus_wal") + if err != nil { + return nil, err + } + + e := &tsm1.WriteWALEntry{Values: c} + b, err := e.Encode(nil) + if err != nil { + return nil, err + } + + w := tsm1.NewWALSegmentWriter(walFile) + if err := w.Write(e.Type(), snappy.Encode(nil, b)); err != nil { + return nil, err + } + + if err := w.Flush(); err != nil { + return nil, err + } + // (*tsm1.WALSegmentWriter).sync isn't exported, but it only Syncs the file anyway. + if err := walFile.Sync(); err != nil { + return nil, err + } + + return walFile, nil +} + +// writeCorpusToTSMFile writes the given corpus as a TSM file, and returns a handle to that file. +// It is the caller's responsibility to remove the returned temp file. +func writeCorpusToTSMFile(c corpus) (*os.File, error) { + tsmFile, err := ioutil.TempFile("", "export_test_corpus_tsm") + if err != nil { + return nil, err + } + + w, err := tsm1.NewTSMWriter(tsmFile) + if err != nil { + return nil, err + } + + // Write the series in alphabetical order so that each test run is comparable, + // given an identical corpus. + keys := make([]string, 0, len(c)) + for k := range c { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + if err := w.Write([]byte(k), c[k]); err != nil { + return nil, err + } + } + + if err := w.WriteIndex(); err != nil { + return nil, err + } + + if err := w.Close(); err != nil { + return nil, err + } + + return tsmFile, nil +} diff --git a/cmd/influxd/inspect/inspect.go b/cmd/influxd/inspect/inspect.go index 8ff3b8c63e..4111fe58f5 100644 --- a/cmd/influxd/inspect/inspect.go +++ b/cmd/influxd/inspect/inspect.go @@ -2,10 +2,11 @@ package inspect import ( "github.com/spf13/cobra" + "github.com/spf13/viper" ) // NewCommand creates the new command. -func NewCommand() *cobra.Command { +func NewCommand(v *viper.Viper) *cobra.Command { base := &cobra.Command{ Use: "inspect", Short: "Commands for inspecting on-disk database data", @@ -18,17 +19,8 @@ func NewCommand() *cobra.Command { // List of available sub-commands // If a new sub-command is created, it must be added here subCommands := []*cobra.Command{ - //NewBuildTSICommand(), - //NewCompactSeriesFileCommand(), - //NewExportBlocksCommand(), + NewExportLineProtocolCommand(v), NewExportIndexCommand(), - //NewReportTSMCommand(), - //NewVerifyTSMCommand(), - //NewVerifyWALCommand(), - //NewReportTSICommand(), - //NewVerifySeriesFileCommand(), - //NewDumpWALCommand(), - //NewDumpTSICommand(), } base.AddCommand(subCommands...) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index f5d9dbdf9a..d74a5a49a4 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -36,7 +36,7 @@ func main() { rootCmd := launcher.NewInfluxdCommand(ctx, v) // upgrade binds options to env variables, so it must be added after rootCmd is initialized rootCmd.AddCommand(upgrade.NewCommand(ctx, v)) - rootCmd.AddCommand(inspect.NewCommand()) + rootCmd.AddCommand(inspect.NewCommand(v)) rootCmd.AddCommand(versionCmd()) rootCmd.SilenceUsage = true