From 908259340b7c41d207f9db5b26b8c97c0d63d983 Mon Sep 17 00:00:00 2001 From: Joe LeGasse Date: Wed, 27 Jan 2016 16:13:23 -0800 Subject: [PATCH] Improvements to influx_tsm - Improve logging and status updates - Added debug http endpoint - Properly resume/skip backed up files --- cmd/influx_tsm/converter.go | 34 +++---- cmd/influx_tsm/main.go | 164 ++++++++++++++++------------------ cmd/influx_tsm/tracker.go | 171 ++++++++++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 110 deletions(-) create mode 100644 cmd/influx_tsm/tracker.go diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go index bd5b6c9607..7463d60c15 100644 --- a/cmd/influx_tsm/converter.go +++ b/cmd/influx_tsm/converter.go @@ -5,20 +5,10 @@ import ( "math" "os" "path/filepath" - "sync/atomic" "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) -var ( - NanFiltered uint64 - InfFiltered uint64 - PointsWritten uint64 - PointsRead uint64 - TsmFilesCreated uint64 - TsmBytesWritten int64 -) - type KeyIterator interface { Next() bool Read() (string, []tsm1.Value, error) @@ -29,13 +19,15 @@ type Converter struct { path string maxTSMFileSize uint32 sequence int + tracker *tracker } // NewConverter returns a new instance of the Converter. -func NewConverter(path string, sz uint32) *Converter { +func NewConverter(path string, sz uint32, t *tracker) *Converter { return &Converter{ path: path, maxTSMFileSize: sz, + tracker: t, } } @@ -53,7 +45,7 @@ func (c *Converter) Process(iter KeyIterator) error { if err != nil { return err } - scrubbed := scrubValues(v) + scrubbed := c.scrubValues(v) if w == nil { w, err = c.nextTSMWriter() @@ -64,15 +56,17 @@ func (c *Converter) Process(iter KeyIterator) error { if err := w.Write(k, scrubbed); err != nil { return err } - atomic.AddUint64(&PointsRead, uint64(len(v))) - atomic.AddUint64(&PointsWritten, uint64(len(scrubbed))) + + c.tracker.AddPointsRead(len(v)) + c.tracker.AddPointsWritten(len(scrubbed)) // If we have a max file size configured and we're over it, start a new TSM file. if w.Size() > c.maxTSMFileSize { if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { return err } - atomic.AddInt64(&TsmBytesWritten, int64(w.Size())) + + c.tracker.AddTSMBytes(w.Size()) if err := w.Close(); err != nil { return err @@ -85,7 +79,7 @@ func (c *Converter) Process(iter KeyIterator) error { if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { return err } - atomic.AddInt64(&TsmBytesWritten, int64(w.Size())) + c.tracker.AddTSMBytes(w.Size()) if err := w.Close(); err != nil { return err @@ -111,14 +105,14 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { return nil, err } - atomic.AddUint64(&TsmFilesCreated, 1) + c.tracker.IncrTSMFileCount() return w, nil } // scrubValues takes a slice and removes float64 NaN and Inf. If neither is // present in the slice, the original slice is returned. This is to avoid // copying slices unnecessarily. -func scrubValues(values []tsm1.Value) []tsm1.Value { +func (c *Converter) scrubValues(values []tsm1.Value) []tsm1.Value { var scrubbed []tsm1.Value if values == nil { @@ -130,11 +124,11 @@ func scrubValues(values []tsm1.Value) []tsm1.Value { var filter bool if math.IsNaN(f) { filter = true - atomic.AddUint64(&NanFiltered, 1) + c.tracker.IncrNaN() } if math.IsInf(f, 0) { filter = true - atomic.AddUint64(&InfFiltered, 1) + c.tracker.IncrInf() } if filter { diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go index e0863a4a1f..b43465a195 100644 --- a/cmd/influx_tsm/main.go +++ b/cmd/influx_tsm/main.go @@ -13,10 +13,12 @@ import ( "runtime" "sort" "strings" - "sync" "text/tabwriter" "time" + "net/http" + _ "net/http/pprof" + "github.com/influxdb/influxdb/cmd/influx_tsm/b1" "github.com/influxdb/influxdb/cmd/influx_tsm/bz1" "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb" @@ -48,11 +50,12 @@ restart the node.`, backupExt) type options struct { DataPath string DBs []string + DebugAddr string TSMSize uint64 Parallel bool SkipBackup bool UpdateInterval time.Duration - Quiet bool + // Quiet bool } func (o *options) Parse() error { @@ -64,7 +67,8 @@ func (o *options) Parse() error { fs.Uint64Var(&opts.TSMSize, "sz", maxTSMSz, "Maximum size of individual TSM files.") fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)") fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.") - fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.") + // fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.") + fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address") fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.") fs.Usage = func() { fmt.Fprintf(os.Stderr, "Usage: %v [options] \n", os.Args[0]) @@ -92,12 +96,19 @@ func (o *options) Parse() error { o.DBs = nil } + if o.DebugAddr != "" { + log.Printf("Starting debugging server on http://%v", o.DebugAddr) + go func() { + log.Fatal(http.ListenAndServe(o.DebugAddr, nil)) + }() + } + return nil } var opts options -const maxTSMSz = 2 * 1000 * 1000 * 1000 +const maxTSMSz = 2 * 1024 * 1024 * 1024 func init() { log.SetOutput(os.Stderr) @@ -166,72 +177,13 @@ func main() { } fmt.Println("Conversion starting....") - // GOMAXPROCS(0) just queires the current value - pg := NewParallelGroup(runtime.GOMAXPROCS(0)) - var wg sync.WaitGroup + tr := newTracker(shards, opts) - conversionStart := time.Now() - - // Backup each directory. - if !opts.SkipBackup { - databases := shards.Databases() - fmt.Printf("Backing up %d databases...\n", len(databases)) - wg.Add(len(databases)) - for i := range databases { - db := databases[i] - go pg.Do(func() { - defer wg.Done() - - start := time.Now() - log.Printf("Backup of databse '%v' started", db) - err := backupDatabase(filepath.Join(opts.DataPath, db)) - if err != nil { - log.Fatalf("Backup of database %v failed: %v\n", db, err) - } - log.Printf("Database %v backed up (%v)\n", db, time.Now().Sub(start)) - }) - } - wg.Wait() - } else { - fmt.Println("Database backup disabled.") + if err := tr.Run(); err != nil { + log.Fatalf("Error occurred preventing completion: %v\n", err) } - wg.Add(len(shards)) - for i := range shards { - si := shards[i] - go pg.Do(func() { - defer wg.Done() - - start := time.Now() - log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath)) - if err := convertShard(si); err != nil { - log.Fatalf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err) - } - log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start)) - }) - } - wg.Wait() - - // Dump stats. - preSize := shards.Size() - postSize := TsmBytesWritten - totalTime := time.Since(conversionStart) - - fmt.Printf("\nSummary statistics\n========================================\n") - fmt.Printf("Databases converted: %d\n", len(shards.Databases())) - fmt.Printf("Shards converted: %d\n", len(shards)) - fmt.Printf("TSM files created: %d\n", TsmFilesCreated) - fmt.Printf("Points read: %d\n", PointsRead) - fmt.Printf("Points written: %d\n", PointsWritten) - fmt.Printf("NaN filtered: %d\n", NanFiltered) - fmt.Printf("Inf filtered: %d\n", InfFiltered) - fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered) - fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize) - fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize) - fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize) - fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(PointsWritten)) - fmt.Printf("Total conversion time: %v\n", totalTime) - fmt.Println() + tr.PrintStats() } func collectShards(dbs []os.FileInfo) tsdb.ShardInfos { @@ -273,40 +225,72 @@ func copyDir(dest, src string) error { // Strip the src from the path and replace with dest. toPath := strings.Replace(path, src, dest, 1) - // Copy it. if info.IsDir() { - if err := os.MkdirAll(toPath, info.Mode()); err != nil { + return os.MkdirAll(toPath, info.Mode()) + } + + in, err := os.Open(path) + if err != nil { + return err + } + defer in.Close() + + srcInfo, err := os.Stat(path) + if err != nil { + return err + } + + // TODO(jlegasse): this just appends to the current backup file, if it exists + out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode()) + if err != nil { + return err + } + defer out.Close() + + dstInfo, err := os.Stat(toPath) + if err != nil { + return err + } + + if dstInfo.Size() == srcInfo.Size() { + log.Printf("Backup file already found for %v with correct size, skipping.", path) + return nil + } + + if dstInfo.Size() > srcInfo.Size() { + log.Printf("Invalid backup file found for %v, replacing with good copy.", path) + if err := out.Truncate(0); err != nil { return err } - } else { - err := func() error { - in, err := os.Open(path) - if err != nil { - return err - } - defer in.Close() - - out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode()) - if err != nil { - return err - } - defer out.Close() - - _, err = io.Copy(out, in) - return err - }() - if err != nil { + if _, err := out.Seek(0, os.SEEK_SET); err != nil { return err } } - return nil + + if dstInfo.Size() > 0 { + log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size()) + } + + off, err := out.Seek(0, os.SEEK_END) + if err != nil { + return err + } + if _, err := in.Seek(off, os.SEEK_SET); err != nil { + return err + } + + log.Printf("Backing up file %v", path) + + _, err = io.Copy(out, in) + + return err } return filepath.Walk(src, copyFile) } // convertShard converts the shard in-place. -func convertShard(si *tsdb.ShardInfo) error { +func convertShard(si *tsdb.ShardInfo, tr *tracker) error { src := si.FullPath(opts.DataPath) dst := fmt.Sprintf("%v.%v", src, tsmExt) @@ -325,7 +309,7 @@ func convertShard(si *tsdb.ShardInfo) error { if err := reader.Open(); err != nil { return fmt.Errorf("Failed to open %v for conversion: %v", src, err) } - converter := NewConverter(dst, uint32(opts.TSMSize)) + converter := NewConverter(dst, uint32(opts.TSMSize), tr) // Perform the conversion. if err := converter.Process(reader); err != nil { diff --git a/cmd/influx_tsm/tracker.go b/cmd/influx_tsm/tracker.go new file mode 100644 index 0000000000..6928eca4f0 --- /dev/null +++ b/cmd/influx_tsm/tracker.go @@ -0,0 +1,171 @@ +package main + +import ( + "fmt" + "log" + "path/filepath" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/influxdb/influxdb/cmd/influx_tsm/b1" + "github.com/influxdb/influxdb/cmd/influx_tsm/bz1" + "github.com/influxdb/influxdb/cmd/influx_tsm/tsdb" +) + +// tracker will orchestrate and track the conversions of non-TSM shards to TSM +type tracker struct { + shards tsdb.ShardInfos + opts options + + pg ParallelGroup + wg sync.WaitGroup + + stats Stats +} + +type Stats struct { + NanFiltered uint64 + InfFiltered uint64 + PointsWritten uint64 + PointsRead uint64 + TsmFilesCreated uint64 + TsmBytesWritten uint64 + CompletedShards uint64 + TotalTime time.Duration +} + +// newTracker will setup and return a clean tracker instance +func newTracker(shards tsdb.ShardInfos, opts options) *tracker { + t := &tracker{ + shards: shards, + opts: opts, + pg: NewParallelGroup(runtime.GOMAXPROCS(0)), + } + + return t +} + +func (t *tracker) Errorf(str string, args ...interface{}) { + +} + +func (t *tracker) Run() error { + conversionStart := time.Now() + + // Backup each directory. + if !opts.SkipBackup { + databases := t.shards.Databases() + fmt.Printf("Backing up %d databases...\n", len(databases)) + t.wg.Add(len(databases)) + for i := range databases { + db := databases[i] + go t.pg.Do(func() { + defer t.wg.Done() + + start := time.Now() + log.Printf("Backup of databse '%v' started", db) + err := backupDatabase(filepath.Join(opts.DataPath, db)) + if err != nil { + log.Fatalf("Backup of database %v failed: %v\n", db, err) + } + log.Printf("Database %v backed up (%v)\n", db, time.Now().Sub(start)) + }) + } + t.wg.Wait() + } else { + fmt.Println("Database backup disabled.") + } + + t.wg.Add(len(t.shards)) + for i := range t.shards { + si := t.shards[i] + go t.pg.Do(func() { + defer func() { + atomic.AddUint64(&t.stats.CompletedShards, 1) + t.wg.Done() + }() + + start := time.Now() + log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath)) + if err := convertShard(si, t); err != nil { + log.Fatalf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err) + } + log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start)) + }) + } + + done := make(chan struct{}) + go func() { + t.wg.Wait() + close(done) + }() + +WAIT_LOOP: + for { + select { + case <-done: + break WAIT_LOOP + case <-time.After(opts.UpdateInterval): + t.StatusUpdate() + } + } + + t.stats.TotalTime = time.Since(conversionStart) + + return nil +} + +func (t *tracker) StatusUpdate() { + shardCount := atomic.LoadUint64(&t.stats.CompletedShards) + pointCount := atomic.LoadUint64(&t.stats.PointsRead) + pointWritten := atomic.LoadUint64(&t.stats.PointsWritten) + + log.Printf("Still Working: Completed Shards: %d/%d Points read/written: %d/%d", shardCount, len(t.shards), pointCount, pointWritten) +} + +func (t *tracker) PrintStats() { + preSize := t.shards.Size() + postSize := int64(t.stats.TsmBytesWritten) + + fmt.Printf("\nSummary statistics\n========================================\n") + fmt.Printf("Databases converted: %d\n", len(t.shards.Databases())) + fmt.Printf("Shards converted: %d\n", len(t.shards)) + fmt.Printf("TSM files created: %d\n", t.stats.TsmFilesCreated) + fmt.Printf("Points read: %d\n", t.stats.PointsRead) + fmt.Printf("Points written: %d\n", t.stats.PointsWritten) + fmt.Printf("NaN filtered: %d\n", t.stats.NanFiltered) + fmt.Printf("Inf filtered: %d\n", t.stats.InfFiltered) + fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered) + fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize) + fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize) + fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize) + fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.stats.PointsWritten)) + fmt.Printf("Total conversion time: %v\n", t.stats.TotalTime) + fmt.Println() +} + +func (t *tracker) AddPointsRead(n int) { + atomic.AddUint64(&t.stats.PointsRead, uint64(n)) +} + +func (t *tracker) AddPointsWritten(n int) { + atomic.AddUint64(&t.stats.PointsWritten, uint64(n)) +} + +func (t *tracker) AddTSMBytes(n uint32) { + atomic.AddUint64(&t.stats.TsmBytesWritten, uint64(n)) +} + +func (t *tracker) IncrTSMFileCount() { + atomic.AddUint64(&t.stats.TsmFilesCreated, 1) +} + +func (t *tracker) IncrNaN() { + atomic.AddUint64(&t.stats.NanFiltered, 1) +} + +func (t *tracker) IncrInf() { + atomic.AddUint64(&t.stats.InfFiltered, 1) +}