Improvements to influx_tsm

- Improve logging and status updates
- Added debug http endpoint
- Properly resume/skip backed up files
pull/5454/head
Joe LeGasse 2016-01-27 16:13:23 -08:00
parent 4f89c15bd3
commit 908259340b
3 changed files with 259 additions and 110 deletions

View File

@ -5,20 +5,10 @@ import (
"math"
"os"
"path/filepath"
"sync/atomic"
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
)
var (
NanFiltered uint64
InfFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
TsmBytesWritten int64
)
type KeyIterator interface {
Next() bool
Read() (string, []tsm1.Value, error)
@ -29,13 +19,15 @@ type Converter struct {
path string
maxTSMFileSize uint32
sequence int
tracker *tracker
}
// NewConverter returns a new instance of the Converter.
func NewConverter(path string, sz uint32) *Converter {
func NewConverter(path string, sz uint32, t *tracker) *Converter {
return &Converter{
path: path,
maxTSMFileSize: sz,
tracker: t,
}
}
@ -53,7 +45,7 @@ func (c *Converter) Process(iter KeyIterator) error {
if err != nil {
return err
}
scrubbed := scrubValues(v)
scrubbed := c.scrubValues(v)
if w == nil {
w, err = c.nextTSMWriter()
@ -64,15 +56,17 @@ func (c *Converter) Process(iter KeyIterator) error {
if err := w.Write(k, scrubbed); err != nil {
return err
}
atomic.AddUint64(&PointsRead, uint64(len(v)))
atomic.AddUint64(&PointsWritten, uint64(len(scrubbed)))
c.tracker.AddPointsRead(len(v))
c.tracker.AddPointsWritten(len(scrubbed))
// If we have a max file size configured and we're over it, start a new TSM file.
if w.Size() > c.maxTSMFileSize {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
atomic.AddInt64(&TsmBytesWritten, int64(w.Size()))
c.tracker.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
@ -85,7 +79,7 @@ func (c *Converter) Process(iter KeyIterator) error {
if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
return err
}
atomic.AddInt64(&TsmBytesWritten, int64(w.Size()))
c.tracker.AddTSMBytes(w.Size())
if err := w.Close(); err != nil {
return err
@ -111,14 +105,14 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
return nil, err
}
atomic.AddUint64(&TsmFilesCreated, 1)
c.tracker.IncrTSMFileCount()
return w, nil
}
// scrubValues takes a slice and removes float64 NaN and Inf. If neither is
// present in the slice, the original slice is returned. This is to avoid
// copying slices unnecessarily.
func scrubValues(values []tsm1.Value) []tsm1.Value {
func (c *Converter) scrubValues(values []tsm1.Value) []tsm1.Value {
var scrubbed []tsm1.Value
if values == nil {
@ -130,11 +124,11 @@ func scrubValues(values []tsm1.Value) []tsm1.Value {
var filter bool
if math.IsNaN(f) {
filter = true
atomic.AddUint64(&NanFiltered, 1)
c.tracker.IncrNaN()
}
if math.IsInf(f, 0) {
filter = true
atomic.AddUint64(&InfFiltered, 1)
c.tracker.IncrInf()
}
if filter {

View File

@ -13,10 +13,12 @@ import (
"runtime"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
"net/http"
_ "net/http/pprof"
"github.com/influxdb/influxdb/cmd/influx_tsm/b1"
"github.com/influxdb/influxdb/cmd/influx_tsm/bz1"
"github.com/influxdb/influxdb/cmd/influx_tsm/tsdb"
@ -48,11 +50,12 @@ restart the node.`, backupExt)
type options struct {
DataPath string
DBs []string
DebugAddr string
TSMSize uint64
Parallel bool
SkipBackup bool
UpdateInterval time.Duration
Quiet bool
// Quiet bool
}
func (o *options) Parse() error {
@ -64,7 +67,8 @@ func (o *options) Parse() error {
fs.Uint64Var(&opts.TSMSize, "sz", maxTSMSz, "Maximum size of individual TSM files.")
fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)")
fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.")
fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.")
// fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.")
fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address")
fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
fs.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %v [options] <data-path> \n", os.Args[0])
@ -92,12 +96,19 @@ func (o *options) Parse() error {
o.DBs = nil
}
if o.DebugAddr != "" {
log.Printf("Starting debugging server on http://%v", o.DebugAddr)
go func() {
log.Fatal(http.ListenAndServe(o.DebugAddr, nil))
}()
}
return nil
}
var opts options
const maxTSMSz = 2 * 1000 * 1000 * 1000
const maxTSMSz = 2 * 1024 * 1024 * 1024
func init() {
log.SetOutput(os.Stderr)
@ -166,72 +177,13 @@ func main() {
}
fmt.Println("Conversion starting....")
// GOMAXPROCS(0) just queires the current value
pg := NewParallelGroup(runtime.GOMAXPROCS(0))
var wg sync.WaitGroup
tr := newTracker(shards, opts)
conversionStart := time.Now()
// Backup each directory.
if !opts.SkipBackup {
databases := shards.Databases()
fmt.Printf("Backing up %d databases...\n", len(databases))
wg.Add(len(databases))
for i := range databases {
db := databases[i]
go pg.Do(func() {
defer wg.Done()
start := time.Now()
log.Printf("Backup of databse '%v' started", db)
err := backupDatabase(filepath.Join(opts.DataPath, db))
if err != nil {
log.Fatalf("Backup of database %v failed: %v\n", db, err)
}
log.Printf("Database %v backed up (%v)\n", db, time.Now().Sub(start))
})
}
wg.Wait()
} else {
fmt.Println("Database backup disabled.")
if err := tr.Run(); err != nil {
log.Fatalf("Error occurred preventing completion: %v\n", err)
}
wg.Add(len(shards))
for i := range shards {
si := shards[i]
go pg.Do(func() {
defer wg.Done()
start := time.Now()
log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath))
if err := convertShard(si); err != nil {
log.Fatalf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err)
}
log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start))
})
}
wg.Wait()
// Dump stats.
preSize := shards.Size()
postSize := TsmBytesWritten
totalTime := time.Since(conversionStart)
fmt.Printf("\nSummary statistics\n========================================\n")
fmt.Printf("Databases converted: %d\n", len(shards.Databases()))
fmt.Printf("Shards converted: %d\n", len(shards))
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
fmt.Printf("Points read: %d\n", PointsRead)
fmt.Printf("Points written: %d\n", PointsWritten)
fmt.Printf("NaN filtered: %d\n", NanFiltered)
fmt.Printf("Inf filtered: %d\n", InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered)
fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize)
fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize)
fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize)
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(PointsWritten))
fmt.Printf("Total conversion time: %v\n", totalTime)
fmt.Println()
tr.PrintStats()
}
func collectShards(dbs []os.FileInfo) tsdb.ShardInfos {
@ -273,40 +225,72 @@ func copyDir(dest, src string) error {
// Strip the src from the path and replace with dest.
toPath := strings.Replace(path, src, dest, 1)
// Copy it.
if info.IsDir() {
if err := os.MkdirAll(toPath, info.Mode()); err != nil {
return os.MkdirAll(toPath, info.Mode())
}
in, err := os.Open(path)
if err != nil {
return err
}
defer in.Close()
srcInfo, err := os.Stat(path)
if err != nil {
return err
}
// TODO(jlegasse): this just appends to the current backup file, if it exists
out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode())
if err != nil {
return err
}
defer out.Close()
dstInfo, err := os.Stat(toPath)
if err != nil {
return err
}
if dstInfo.Size() == srcInfo.Size() {
log.Printf("Backup file already found for %v with correct size, skipping.", path)
return nil
}
if dstInfo.Size() > srcInfo.Size() {
log.Printf("Invalid backup file found for %v, replacing with good copy.", path)
if err := out.Truncate(0); err != nil {
return err
}
} else {
err := func() error {
in, err := os.Open(path)
if err != nil {
return err
}
defer in.Close()
out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode())
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
return err
}()
if err != nil {
if _, err := out.Seek(0, os.SEEK_SET); err != nil {
return err
}
}
return nil
if dstInfo.Size() > 0 {
log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size())
}
off, err := out.Seek(0, os.SEEK_END)
if err != nil {
return err
}
if _, err := in.Seek(off, os.SEEK_SET); err != nil {
return err
}
log.Printf("Backing up file %v", path)
_, err = io.Copy(out, in)
return err
}
return filepath.Walk(src, copyFile)
}
// convertShard converts the shard in-place.
func convertShard(si *tsdb.ShardInfo) error {
func convertShard(si *tsdb.ShardInfo, tr *tracker) error {
src := si.FullPath(opts.DataPath)
dst := fmt.Sprintf("%v.%v", src, tsmExt)
@ -325,7 +309,7 @@ func convertShard(si *tsdb.ShardInfo) error {
if err := reader.Open(); err != nil {
return fmt.Errorf("Failed to open %v for conversion: %v", src, err)
}
converter := NewConverter(dst, uint32(opts.TSMSize))
converter := NewConverter(dst, uint32(opts.TSMSize), tr)
// Perform the conversion.
if err := converter.Process(reader); err != nil {

171
cmd/influx_tsm/tracker.go Normal file
View File

@ -0,0 +1,171 @@
package main
import (
"fmt"
"log"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/influxdb/influxdb/cmd/influx_tsm/b1"
"github.com/influxdb/influxdb/cmd/influx_tsm/bz1"
"github.com/influxdb/influxdb/cmd/influx_tsm/tsdb"
)
// tracker will orchestrate and track the conversions of non-TSM shards to TSM
type tracker struct {
shards tsdb.ShardInfos
opts options
pg ParallelGroup
wg sync.WaitGroup
stats Stats
}
type Stats struct {
NanFiltered uint64
InfFiltered uint64
PointsWritten uint64
PointsRead uint64
TsmFilesCreated uint64
TsmBytesWritten uint64
CompletedShards uint64
TotalTime time.Duration
}
// newTracker will setup and return a clean tracker instance
func newTracker(shards tsdb.ShardInfos, opts options) *tracker {
t := &tracker{
shards: shards,
opts: opts,
pg: NewParallelGroup(runtime.GOMAXPROCS(0)),
}
return t
}
func (t *tracker) Errorf(str string, args ...interface{}) {
}
func (t *tracker) Run() error {
conversionStart := time.Now()
// Backup each directory.
if !opts.SkipBackup {
databases := t.shards.Databases()
fmt.Printf("Backing up %d databases...\n", len(databases))
t.wg.Add(len(databases))
for i := range databases {
db := databases[i]
go t.pg.Do(func() {
defer t.wg.Done()
start := time.Now()
log.Printf("Backup of databse '%v' started", db)
err := backupDatabase(filepath.Join(opts.DataPath, db))
if err != nil {
log.Fatalf("Backup of database %v failed: %v\n", db, err)
}
log.Printf("Database %v backed up (%v)\n", db, time.Now().Sub(start))
})
}
t.wg.Wait()
} else {
fmt.Println("Database backup disabled.")
}
t.wg.Add(len(t.shards))
for i := range t.shards {
si := t.shards[i]
go t.pg.Do(func() {
defer func() {
atomic.AddUint64(&t.stats.CompletedShards, 1)
t.wg.Done()
}()
start := time.Now()
log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath))
if err := convertShard(si, t); err != nil {
log.Fatalf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err)
}
log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start))
})
}
done := make(chan struct{})
go func() {
t.wg.Wait()
close(done)
}()
WAIT_LOOP:
for {
select {
case <-done:
break WAIT_LOOP
case <-time.After(opts.UpdateInterval):
t.StatusUpdate()
}
}
t.stats.TotalTime = time.Since(conversionStart)
return nil
}
func (t *tracker) StatusUpdate() {
shardCount := atomic.LoadUint64(&t.stats.CompletedShards)
pointCount := atomic.LoadUint64(&t.stats.PointsRead)
pointWritten := atomic.LoadUint64(&t.stats.PointsWritten)
log.Printf("Still Working: Completed Shards: %d/%d Points read/written: %d/%d", shardCount, len(t.shards), pointCount, pointWritten)
}
func (t *tracker) PrintStats() {
preSize := t.shards.Size()
postSize := int64(t.stats.TsmBytesWritten)
fmt.Printf("\nSummary statistics\n========================================\n")
fmt.Printf("Databases converted: %d\n", len(t.shards.Databases()))
fmt.Printf("Shards converted: %d\n", len(t.shards))
fmt.Printf("TSM files created: %d\n", t.stats.TsmFilesCreated)
fmt.Printf("Points read: %d\n", t.stats.PointsRead)
fmt.Printf("Points written: %d\n", t.stats.PointsWritten)
fmt.Printf("NaN filtered: %d\n", t.stats.NanFiltered)
fmt.Printf("Inf filtered: %d\n", t.stats.InfFiltered)
fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered)
fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize)
fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize)
fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize)
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(t.stats.PointsWritten))
fmt.Printf("Total conversion time: %v\n", t.stats.TotalTime)
fmt.Println()
}
func (t *tracker) AddPointsRead(n int) {
atomic.AddUint64(&t.stats.PointsRead, uint64(n))
}
func (t *tracker) AddPointsWritten(n int) {
atomic.AddUint64(&t.stats.PointsWritten, uint64(n))
}
func (t *tracker) AddTSMBytes(n uint32) {
atomic.AddUint64(&t.stats.TsmBytesWritten, uint64(n))
}
func (t *tracker) IncrTSMFileCount() {
atomic.AddUint64(&t.stats.TsmFilesCreated, 1)
}
func (t *tracker) IncrNaN() {
atomic.AddUint64(&t.stats.NanFiltered, 1)
}
func (t *tracker) IncrInf() {
atomic.AddUint64(&t.stats.InfFiltered, 1)
}