influxdb/cmd/influx_inspect/deletetsm/deletetsm.go

185 lines
4.7 KiB
Go

// Package deletetsm bulk deletes a measurement from a raw tsm file.
package deletetsm
import (
"errors"
"flag"
"fmt"
"io"
"log"
"os"
"time"
"github.com/influxdata/influxdb/models"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
// Command represents the program execution for "influxd deletetsm".
type Command struct {
// Standard input/output, overridden for testing.
Stderr io.Writer
Stdout io.Writer
measurement string // measurement to delete
sanitize bool // remove all keys with non-printable unicode
verbose bool // verbose logging
}
// NewCommand returns a new instance of Command.
func NewCommand() *Command {
return &Command{
Stderr: os.Stderr,
Stdout: os.Stdout,
}
}
// Run executes the command.
func (cmd *Command) Run(args ...string) (err error) {
fs := flag.NewFlagSet("deletetsm", flag.ExitOnError)
fs.StringVar(&cmd.measurement, "measurement", "", "")
fs.BoolVar(&cmd.sanitize, "sanitize", false, "")
fs.BoolVar(&cmd.verbose, "v", false, "")
fs.SetOutput(cmd.Stdout)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return err
} else if fs.NArg() == 0 {
fmt.Printf("path required\n\n")
fs.Usage()
return nil
}
if !cmd.verbose {
log.SetOutput(io.Discard)
}
// Validate measurement or sanitize flag.
if cmd.measurement == "" && !cmd.sanitize {
return fmt.Errorf("-measurement or -sanitize flag required")
}
// Process each TSM file.
for _, path := range fs.Args() {
log.Printf("processing: %s", path)
if err := cmd.process(path); err != nil {
return err
}
}
return nil
}
func (cmd *Command) process(path string) (retErr error) {
// Remove previous temporary files.
outputPath := path + ".rewriting.tmp"
// Open TSM reader.
input, err := os.Open(path)
if err != nil {
return err
}
r, err := tsm1.NewTSMReader(input)
if err != nil {
// close the input file on error creating the TSMReader
_ = input.Close()
return fmt.Errorf("unable to read %s: %w", path, err)
}
// Nested function to ensure all the deferred close operations happen before final deletion or rename
size, err := func() (size uint32, fRetErr error) {
// This will close the input file
defer errors2.Capture(&retErr, r.Close)()
if err := os.RemoveAll(outputPath); err != nil {
return 0, err
} else if err := os.RemoveAll(outputPath + ".idx.tmp"); err != nil {
return 0, err
}
// Create TSMWriter to temporary location.
output, err := os.Create(outputPath)
if err != nil {
return 0, err
}
w, err := tsm1.NewTSMWriter(output)
if err != nil {
// close the output file on error creating the TSMWriter
_ = output.Close()
return 0, fmt.Errorf("unable to write %s: %w", outputPath, err)
}
// This will close the output file
defer errors2.Capture(&fRetErr, w.Close)()
// Iterate over the input blocks.
itr := r.BlockIterator()
blockWritten := false
for itr.Next() {
// Read key & time range.
key, minTime, maxTime, _, _, block, err := itr.Read()
if err != nil {
return 0, err
}
// Skip block if this is the measurement and time range we are deleting.
series, _ := tsm1.SeriesAndFieldFromCompositeKey(key)
measurement, tags := models.ParseKey(series)
if string(measurement) == cmd.measurement || (cmd.sanitize && !models.ValidKeyTokens(measurement, tags)) {
log.Printf("deleting block: %s (%s-%s) sz=%d",
key,
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
len(block),
)
continue
}
if err := w.WriteBlock(key, minTime, maxTime, block); err != nil {
return 0, err
}
blockWritten = true
}
// Write index & close.
// It is okay to have no index values if no block was written
if err := w.WriteIndex(); err != nil && !(blockWritten || errors.Is(err, tsm1.ErrNoValues)) {
return 0, err
} else {
return w.Size(), nil
}
}()
if err != nil {
return err
}
if size > 0 {
// Replace original file with new file.
return os.Rename(outputPath, path)
} else {
// Empty TSM file of size == 0, remove it
if err = os.RemoveAll(path); err != nil {
err = fmt.Errorf("cannot remove %s: %w", path, err)
}
if err2 := os.RemoveAll(outputPath); err2 != nil && err == nil {
return fmt.Errorf("cannot remove temporary file %s: %w", outputPath, err)
} else {
return err
}
}
}
func (cmd *Command) printUsage() {
fmt.Print(`Deletes a measurement from a raw tsm file.
Usage: influx_inspect deletetsm [flags] path...
-measurement NAME
The name of the measurement to remove.
-sanitize
Remove all keys with non-printable unicode characters.
-v
Enable verbose logging.`)
}