185 lines
4.7 KiB
Go
185 lines
4.7 KiB
Go
// Package deletetsm bulk deletes a measurement from a raw tsm file.
|
|
package deletetsm
|
|
|
|
import (
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
errors2 "github.com/influxdata/influxdb/pkg/errors"
|
|
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
|
|
)
|
|
|
|
// Command represents the program execution for "influxd deletetsm".
|
|
type Command struct {
|
|
// Standard input/output, overridden for testing.
|
|
Stderr io.Writer
|
|
Stdout io.Writer
|
|
|
|
measurement string // measurement to delete
|
|
sanitize bool // remove all keys with non-printable unicode
|
|
verbose bool // verbose logging
|
|
}
|
|
|
|
// NewCommand returns a new instance of Command.
|
|
func NewCommand() *Command {
|
|
return &Command{
|
|
Stderr: os.Stderr,
|
|
Stdout: os.Stdout,
|
|
}
|
|
}
|
|
|
|
// Run executes the command.
|
|
func (cmd *Command) Run(args ...string) (err error) {
|
|
fs := flag.NewFlagSet("deletetsm", flag.ExitOnError)
|
|
fs.StringVar(&cmd.measurement, "measurement", "", "")
|
|
fs.BoolVar(&cmd.sanitize, "sanitize", false, "")
|
|
fs.BoolVar(&cmd.verbose, "v", false, "")
|
|
fs.SetOutput(cmd.Stdout)
|
|
fs.Usage = cmd.printUsage
|
|
if err := fs.Parse(args); err != nil {
|
|
return err
|
|
} else if fs.NArg() == 0 {
|
|
fmt.Printf("path required\n\n")
|
|
fs.Usage()
|
|
return nil
|
|
}
|
|
|
|
if !cmd.verbose {
|
|
log.SetOutput(io.Discard)
|
|
}
|
|
|
|
// Validate measurement or sanitize flag.
|
|
if cmd.measurement == "" && !cmd.sanitize {
|
|
return fmt.Errorf("-measurement or -sanitize flag required")
|
|
}
|
|
|
|
// Process each TSM file.
|
|
for _, path := range fs.Args() {
|
|
log.Printf("processing: %s", path)
|
|
if err := cmd.process(path); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cmd *Command) process(path string) (retErr error) {
|
|
// Remove previous temporary files.
|
|
outputPath := path + ".rewriting.tmp"
|
|
|
|
// Open TSM reader.
|
|
input, err := os.Open(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r, err := tsm1.NewTSMReader(input)
|
|
if err != nil {
|
|
// close the input file on error creating the TSMReader
|
|
_ = input.Close()
|
|
return fmt.Errorf("unable to read %s: %w", path, err)
|
|
}
|
|
// Nested function to ensure all the deferred close operations happen before final deletion or rename
|
|
size, err := func() (size uint32, fRetErr error) {
|
|
// This will close the input file
|
|
defer errors2.Capture(&retErr, r.Close)()
|
|
|
|
if err := os.RemoveAll(outputPath); err != nil {
|
|
return 0, err
|
|
} else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Create TSMWriter to temporary location.
|
|
output, err := os.Create(outputPath)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
w, err := tsm1.NewTSMWriter(output)
|
|
if err != nil {
|
|
// close the output file on error creating the TSMWriter
|
|
_ = output.Close()
|
|
return 0, fmt.Errorf("unable to write %s: %w", outputPath, err)
|
|
}
|
|
|
|
// This will close the output file
|
|
defer errors2.Capture(&fRetErr, w.Close)()
|
|
|
|
// Iterate over the input blocks.
|
|
itr := r.BlockIterator()
|
|
blockWritten := false
|
|
for itr.Next() {
|
|
// Read key & time range.
|
|
key, minTime, maxTime, _, _, block, err := itr.Read()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Skip block if this is the measurement and time range we are deleting.
|
|
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
|
|
measurement, tags := models.ParseKey(series)
|
|
if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) {
|
|
log.Printf("deleting block: %s (%s-%s) sz=%d",
|
|
key,
|
|
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
|
|
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
|
|
len(block),
|
|
)
|
|
continue
|
|
}
|
|
|
|
if err := w.WriteBlock(key, minTime, maxTime, block); err != nil {
|
|
return 0, err
|
|
}
|
|
blockWritten = true
|
|
}
|
|
|
|
// Write index & close.
|
|
// It is okay to have no index values if no block was written
|
|
if err := w.WriteIndex(); err != nil && !(blockWritten || errors.Is(err, tsm1.ErrNoValues)) {
|
|
return 0, err
|
|
} else {
|
|
return w.Size(), nil
|
|
}
|
|
}()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if size > 0 {
|
|
// Replace original file with new file.
|
|
return os.Rename(outputPath, path)
|
|
} else {
|
|
// Empty TSM file of size == 0, remove it
|
|
if err = os.RemoveAll(path); err != nil {
|
|
err = fmt.Errorf("cannot remove %s: %w", path, err)
|
|
}
|
|
if err2 := os.RemoveAll(outputPath); err2 != nil && err == nil {
|
|
return fmt.Errorf("cannot remove temporary file %s: %w", outputPath, err)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cmd *Command) printUsage() {
|
|
fmt.Print(`Deletes a measurement from a raw tsm file.
|
|
|
|
Usage: influx_inspect deletetsm [flags] path...
|
|
|
|
-measurement NAME
|
|
The name of the measurement to remove.
|
|
-sanitize
|
|
Remove all keys with non-printable unicode characters.
|
|
-v
|
|
Enable verbose logging.`)
|
|
}
|