Limit parallelism for 'influx_tsm -parallel'
parent
f92114d8f2
commit
cdde2959af
|
@ -2,12 +2,15 @@ package main
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -36,96 +39,107 @@ Convert a database from b1 or bz1 format to tsm1 format.
|
|||
This tool will backup any directory before conversion. It is up to the
|
||||
end-user to delete the backup on the disk, once the end-user is happy
|
||||
with the converted data. Backups are named by suffixing the database
|
||||
name with '.%s'. The backups will be ignored by the system since they
|
||||
name with '.%v'. The backups will be ignored by the system since they
|
||||
are not registered with the cluster.
|
||||
|
||||
To restore a backup, delete the tsm1 version, rename the backup directory
|
||||
restart the node.`, backupExt)
|
||||
|
||||
var dataPath string
|
||||
var ds string
|
||||
var tsmSz uint64
|
||||
var parallel bool
|
||||
var disBack bool
|
||||
type options struct {
|
||||
DataPath string
|
||||
DBs []string
|
||||
TSMSize uint64
|
||||
Parallel bool
|
||||
SkipBackup bool
|
||||
UpdateInterval time.Duration
|
||||
Quiet bool
|
||||
}
|
||||
|
||||
func (o *options) Parse() error {
|
||||
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
|
||||
|
||||
var dbs string
|
||||
|
||||
fs.StringVar(&dbs, "dbs", "", "Comma-delimited list of databases to convert. Default is to convert all databases.")
|
||||
fs.Uint64Var(&opts.TSMSize, "sz", maxTSMSz, "Maximum size of individual TSM files.")
|
||||
fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)")
|
||||
fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.")
|
||||
fs.BoolVar(&opts.Quiet, "quiet", false, "Suppresses the regular status updates.")
|
||||
fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
|
||||
fs.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "Usage: %v [options] <data-path> \n", os.Args[0])
|
||||
fmt.Fprintf(os.Stderr, "%v\n\n", description)
|
||||
fs.PrintDefaults()
|
||||
fmt.Fprintf(os.Stderr, "\n")
|
||||
}
|
||||
|
||||
if err := fs.Parse(os.Args[1:]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(fs.Args()) < 1 {
|
||||
return errors.New("no data directory specified")
|
||||
}
|
||||
o.DataPath = fs.Args()[0]
|
||||
|
||||
if o.TSMSize > maxTSMSz {
|
||||
return fmt.Errorf("bad TSM file size, maximum TSM file size is %d", maxTSMSz)
|
||||
}
|
||||
|
||||
// Check if specific databases were requested.
|
||||
o.DBs = strings.Split(dbs, ",")
|
||||
if len(o.DBs) == 1 && o.DBs[0] == "" {
|
||||
o.DBs = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var opts options
|
||||
|
||||
const maxTSMSz = 2 * 1000 * 1000 * 1000
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&ds, "dbs", "", "Comma-delimited list of databases to convert. Default is to convert all databases.")
|
||||
flag.Uint64Var(&tsmSz, "sz", maxTSMSz, "Maximum size of individual TSM files.")
|
||||
flag.BoolVar(¶llel, "parallel", false, "Perform parallel conversion.")
|
||||
flag.BoolVar(&disBack, "nobackup", false, "Disable database backups. Not recommended.")
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "Usage: %s [options] <data-path> \n", os.Args[0])
|
||||
fmt.Fprintf(os.Stderr, "%s\n\n", description)
|
||||
flag.PrintDefaults()
|
||||
fmt.Fprintf(os.Stderr, "\n")
|
||||
}
|
||||
log.SetOutput(os.Stderr)
|
||||
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
|
||||
}
|
||||
|
||||
func main() {
|
||||
pg := NewParallelGroup(1)
|
||||
|
||||
flag.Parse()
|
||||
if len(flag.Args()) < 1 {
|
||||
fmt.Fprintf(os.Stderr, "No data directory specified\n")
|
||||
os.Exit(1)
|
||||
}
|
||||
dataPath = flag.Args()[0]
|
||||
|
||||
if tsmSz > maxTSMSz {
|
||||
fmt.Fprintf(os.Stderr, "Maximum TSM file size is %d\n", maxTSMSz)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Check if specific directories were requested.
|
||||
reqDs := strings.Split(ds, ",")
|
||||
if len(reqDs) == 1 && reqDs[0] == "" {
|
||||
reqDs = nil
|
||||
if err := opts.Parse(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Determine the list of databases
|
||||
dbs, err := ioutil.ReadDir(dataPath)
|
||||
dbs, err := ioutil.ReadDir(opts.DataPath)
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to access data directory at %s: %s\n", dataPath, err.Error())
|
||||
os.Exit(1)
|
||||
log.Fatalf("failed to access data directory at %v: %v\n", opts.DataPath, err)
|
||||
}
|
||||
fmt.Println() // Cleanly separate output from start of program.
|
||||
|
||||
if opts.Parallel {
|
||||
if !isEnvSet("GOMAXPROCS") {
|
||||
// Only modify GOMAXPROCS if it wasn't set in the environment
|
||||
// This means 'GOMAXPROCS=1 influx_tsm -parallel' will not actually
|
||||
// run in parallel
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
}
|
||||
}
|
||||
|
||||
// Dump summary of what is about to happen.
|
||||
fmt.Println("b1 and bz1 shard conversion.")
|
||||
fmt.Println("-----------------------------------")
|
||||
fmt.Println("Data directory is: ", dataPath)
|
||||
fmt.Println("Databases specified: ", allDBs(reqDs))
|
||||
fmt.Println("Database backups enabled:", yesno(!disBack))
|
||||
fmt.Println("Parallel mode enabled: ", yesno(parallel))
|
||||
fmt.Println("Data directory is: ", opts.DataPath)
|
||||
fmt.Println("Databases specified: ", allDBs(opts.DBs))
|
||||
fmt.Println("Database backups enabled:", yesno(!opts.SkipBackup))
|
||||
fmt.Println("Parallel mode enabled: ", yesno(opts.Parallel), runtime.GOMAXPROCS(0))
|
||||
fmt.Println()
|
||||
|
||||
// Get the list of shards for conversion.
|
||||
var shards []*tsdb.ShardInfo
|
||||
for _, db := range dbs {
|
||||
if strings.HasSuffix(db.Name(), backupExt) {
|
||||
fmt.Printf("Skipping %s as it looks like a backup.\n", db.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
d := tsdb.NewDatabase(filepath.Join(dataPath, db.Name()))
|
||||
shs, err := d.Shards()
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to access shards for database %s: %s\n", d.Name(), err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
shards = append(shards, shs...)
|
||||
}
|
||||
sort.Sort(tsdb.ShardInfos(shards))
|
||||
usl := len(shards)
|
||||
shards = tsdb.ShardInfos(shards).FilterFormat(tsdb.TSM1).ExclusiveDatabases(reqDs)
|
||||
sl := len(shards)
|
||||
shards := collectShards(dbs)
|
||||
|
||||
// Anything to convert?
|
||||
fmt.Printf("\n%d shard(s) detected, %d non-TSM shards detected.\n", usl, sl)
|
||||
fmt.Printf("\nFound %d shards that will be converted.\n", len(shards))
|
||||
if len(shards) == 0 {
|
||||
fmt.Printf("Nothing to do.\n")
|
||||
fmt.Println("Nothing to do.")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
|
@ -135,7 +149,7 @@ func main() {
|
|||
w.Init(os.Stdout, 0, 8, 1, '\t', 0)
|
||||
fmt.Fprintln(w, "Database\tRetention\tPath\tEngine\tSize")
|
||||
for _, si := range shards {
|
||||
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%d\n", si.Database, si.RetentionPolicy, si.FullPath(dataPath), si.FormatAsString(), si.Size)
|
||||
fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%d\n", si.Database, si.RetentionPolicy, si.FullPath(opts.DataPath), si.FormatAsString(), si.Size)
|
||||
}
|
||||
w.Flush()
|
||||
|
||||
|
@ -144,7 +158,7 @@ func main() {
|
|||
liner := bufio.NewReader(os.Stdin)
|
||||
yn, err := liner.ReadString('\n')
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "failed to read response: %s", err.Error())
|
||||
log.Printf("failed to read response: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
yn = strings.TrimRight(strings.ToLower(yn), "\n")
|
||||
|
@ -154,57 +168,61 @@ func main() {
|
|||
}
|
||||
fmt.Println("Conversion starting....")
|
||||
|
||||
// Backup each directory.
|
||||
// GOMAXPROCS(0) just queires the current value
|
||||
pg := NewParallelGroup(runtime.GOMAXPROCS(0))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
conversionStart := time.Now()
|
||||
if !disBack {
|
||||
databases := tsdb.ShardInfos(shards).Databases()
|
||||
|
||||
// Backup each directory.
|
||||
if !opts.SkipBackup {
|
||||
databases := shards.Databases()
|
||||
fmt.Printf("Backing up %d databases...\n", len(databases))
|
||||
if parallel {
|
||||
pg = NewParallelGroup(len(databases))
|
||||
}
|
||||
for _, db := range databases {
|
||||
pg.Request()
|
||||
go func(db string) {
|
||||
defer pg.Release()
|
||||
wg.Add(len(databases))
|
||||
for i := range databases {
|
||||
db := databases[i]
|
||||
go pg.Do(func() {
|
||||
defer wg.Done()
|
||||
|
||||
start := time.Now()
|
||||
err := backupDatabase(filepath.Join(dataPath, db))
|
||||
log.Printf("Backup of databse '%v' started", db)
|
||||
err := backupDatabase(filepath.Join(opts.DataPath, db))
|
||||
if err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Backup of database %s failed: %s\n", db, err.Error())
|
||||
log.Printf("Backup of database %v failed: %v\n", db, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("Database %s backed up (%v)\n", db, time.Now().Sub(start))
|
||||
}(db)
|
||||
log.Printf("Database %v backed up (%v)\n", db, time.Now().Sub(start))
|
||||
})
|
||||
}
|
||||
pg.Wait()
|
||||
wg.Wait()
|
||||
} else {
|
||||
fmt.Println("Database backup disabled.")
|
||||
}
|
||||
|
||||
// Convert each shard.
|
||||
if parallel {
|
||||
pg = NewParallelGroup(len(shards))
|
||||
}
|
||||
for _, si := range shards {
|
||||
pg.Request()
|
||||
go func(si *tsdb.ShardInfo) {
|
||||
defer pg.Release()
|
||||
wg.Add(len(shards))
|
||||
for i := range shards {
|
||||
si := shards[i]
|
||||
go pg.Do(func() {
|
||||
defer wg.Done()
|
||||
|
||||
start := time.Now()
|
||||
log.Printf("Starting conversion of shard: %v", si.FullPath(opts.DataPath))
|
||||
if err := convertShard(si); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "Failed to convert %s: %s\n", si.FullPath(dataPath), err.Error())
|
||||
log.Printf("Failed to convert %v: %v\n", si.FullPath(opts.DataPath), err)
|
||||
os.Exit(1)
|
||||
}
|
||||
fmt.Printf("Conversion of %s successful (%s)\n", si.FullPath(dataPath), time.Now().Sub(start))
|
||||
}(si)
|
||||
log.Printf("Conversion of %v successful (%v)\n", si.FullPath(opts.DataPath), time.Since(start))
|
||||
})
|
||||
}
|
||||
pg.Wait()
|
||||
wg.Wait()
|
||||
|
||||
// Dump stats.
|
||||
preSize := tsdb.ShardInfos(shards).Size()
|
||||
preSize := shards.Size()
|
||||
postSize := TsmBytesWritten
|
||||
totalTime := time.Since(conversionStart)
|
||||
|
||||
fmt.Printf("\nSummary statistics\n========================================\n")
|
||||
fmt.Printf("Databases converted: %d\n", len(tsdb.ShardInfos(shards).Databases()))
|
||||
fmt.Printf("Databases converted: %d\n", len(shards.Databases()))
|
||||
fmt.Printf("Shards converted: %d\n", len(shards))
|
||||
fmt.Printf("TSM files created: %d\n", TsmFilesCreated)
|
||||
fmt.Printf("Points read: %d\n", PointsRead)
|
||||
|
@ -214,18 +232,41 @@ func main() {
|
|||
fmt.Printf("Points without fields filtered: %d\n", b1.NoFieldsFiltered+bz1.NoFieldsFiltered)
|
||||
fmt.Printf("Disk usage pre-conversion (bytes): %d\n", preSize)
|
||||
fmt.Printf("Disk usage post-conversion (bytes): %d\n", postSize)
|
||||
fmt.Printf("Reduction factor: %d%%\n", (100*preSize-postSize)/preSize)
|
||||
fmt.Printf("Reduction factor: %d%%\n", 100*(preSize-postSize)/preSize)
|
||||
fmt.Printf("Bytes per TSM point: %.2f\n", float64(postSize)/float64(PointsWritten))
|
||||
fmt.Printf("Total conversion time: %v\n", time.Now().Sub(conversionStart))
|
||||
fmt.Printf("Total conversion time: %v\n", totalTime)
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
func collectShards(dbs []os.FileInfo) tsdb.ShardInfos {
|
||||
// Get the list of shards for conversion.
|
||||
var shards tsdb.ShardInfos
|
||||
for _, db := range dbs {
|
||||
if strings.HasSuffix(db.Name(), backupExt) {
|
||||
log.Printf("Skipping %v as it looks like a backup.\n", db.Name())
|
||||
continue
|
||||
}
|
||||
|
||||
d := tsdb.NewDatabase(filepath.Join(opts.DataPath, db.Name()))
|
||||
shs, err := d.Shards()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to access shards for database %v: %v\n", d.Name(), err)
|
||||
}
|
||||
shards = append(shards, shs...)
|
||||
}
|
||||
|
||||
sort.Sort(shards)
|
||||
shards = shards.FilterFormat(tsdb.TSM1)
|
||||
if len(dbs) > 0 {
|
||||
shards = shards.ExclusiveDatabases(opts.DBs)
|
||||
}
|
||||
|
||||
return shards
|
||||
}
|
||||
|
||||
// backupDatabase backs up the database at src.
|
||||
func backupDatabase(src string) error {
|
||||
dest := filepath.Join(src + "." + backupExt)
|
||||
if _, err := os.Stat(dest); !os.IsNotExist(err) {
|
||||
return fmt.Errorf("backup of %s already exists", src)
|
||||
}
|
||||
return copyDir(dest, src)
|
||||
}
|
||||
|
||||
|
@ -270,8 +311,8 @@ func copyDir(dest, src string) error {
|
|||
|
||||
// convertShard converts the shard in-place.
|
||||
func convertShard(si *tsdb.ShardInfo) error {
|
||||
src := si.FullPath(dataPath)
|
||||
dst := fmt.Sprintf("%s.%s", src, tsmExt)
|
||||
src := si.FullPath(opts.DataPath)
|
||||
dst := fmt.Sprintf("%v.%v", src, tsmExt)
|
||||
|
||||
var reader ShardReader
|
||||
switch si.Format {
|
||||
|
@ -280,66 +321,50 @@ func convertShard(si *tsdb.ShardInfo) error {
|
|||
case tsdb.B1:
|
||||
reader = b1.NewReader(src)
|
||||
default:
|
||||
return fmt.Errorf("Unsupported shard format: %s", si.FormatAsString())
|
||||
return fmt.Errorf("Unsupported shard format: %v", si.FormatAsString())
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
// Open the shard, and create a converter.
|
||||
if err := reader.Open(); err != nil {
|
||||
return fmt.Errorf("Failed to open %s for conversion: %s", src, err.Error())
|
||||
return fmt.Errorf("Failed to open %v for conversion: %v", src, err)
|
||||
}
|
||||
converter := NewConverter(dst, uint32(tsmSz))
|
||||
converter := NewConverter(dst, uint32(opts.TSMSize))
|
||||
|
||||
// Perform the conversion.
|
||||
if err := converter.Process(reader); err != nil {
|
||||
return fmt.Errorf("Conversion of %s failed: %s", src, err.Error())
|
||||
return fmt.Errorf("Conversion of %v failed: %v", src, err)
|
||||
}
|
||||
|
||||
// Delete source shard, and rename new tsm1 shard.
|
||||
if err := reader.Close(); err != nil {
|
||||
return fmt.Errorf("Conversion of %s failed due to close: %s", src, err.Error())
|
||||
return fmt.Errorf("Conversion of %v failed due to close: %v", src, err)
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(si.FullPath(dataPath)); err != nil {
|
||||
return fmt.Errorf("Deletion of %s failed: %s", src, err.Error())
|
||||
if err := os.RemoveAll(si.FullPath(opts.DataPath)); err != nil {
|
||||
return fmt.Errorf("Deletion of %v failed: %v", src, err)
|
||||
}
|
||||
if err := os.Rename(dst, src); err != nil {
|
||||
return fmt.Errorf("Rename of %s to %s failed: %s", dst, src, err.Error())
|
||||
return fmt.Errorf("Rename of %v to %v failed: %v", dst, src, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParallelGroup allows the maximum parrallelism of a set of operations to be controlled.
|
||||
type ParallelGroup struct {
|
||||
c chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
type ParallelGroup chan struct{}
|
||||
|
||||
// NewParallelGroup returns a group which allows n operations to run in parallel. A value of 0
|
||||
// means no operations will ever run.
|
||||
func NewParallelGroup(n int) *ParallelGroup {
|
||||
return &ParallelGroup{
|
||||
c: make(chan struct{}, n),
|
||||
}
|
||||
func NewParallelGroup(n int) ParallelGroup {
|
||||
return make(chan struct{}, n)
|
||||
}
|
||||
|
||||
// Request requests permission to start an operation. It will block unless and until
|
||||
// the parallel requirements would not be violated.
|
||||
func (p *ParallelGroup) Request() {
|
||||
p.wg.Add(1)
|
||||
p.c <- struct{}{}
|
||||
}
|
||||
func (p ParallelGroup) Do(f func()) {
|
||||
p <- struct{}{} // acquire working slot
|
||||
defer func() { <-p }()
|
||||
|
||||
// Release informs the group that a previoulsy requested operation has completed.
|
||||
func (p *ParallelGroup) Release() {
|
||||
<-p.c
|
||||
p.wg.Done()
|
||||
}
|
||||
|
||||
// Wait blocks until the ParallelGroup has no unreleased operations.
|
||||
func (p *ParallelGroup) Wait() {
|
||||
p.wg.Wait()
|
||||
f()
|
||||
}
|
||||
|
||||
// yesno returns "yes" for true, "no" for false.
|
||||
|
@ -357,3 +382,13 @@ func allDBs(dbs []string) string {
|
|||
}
|
||||
return fmt.Sprintf("%v", dbs)
|
||||
}
|
||||
|
||||
// isEnvSet checks to see if a variable was set in the environment
|
||||
func isEnvSet(name string) bool {
|
||||
for _, s := range os.Environ() {
|
||||
if strings.SplitN(s, "=", 2)[0] == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue