feat: port `influxd inspect dump-tsm` (#21761)
This differs from the 1.x implementation as the blocks and index in-depth printout will now only happen if explicitly stated, instead of being printed with filterKey setpull/21799/head
parent
8e8953f2c5
commit
7c529d929a
|
@ -27,6 +27,7 @@ This release adds an embedded SQLite database for storing metadata required by t
|
|||
1. [21621](https://github.com/influxdata/influxdb/pull/21621): Add `storage-wal-max-write-delay` config option to `influxd` to prevent deadlocks when the WAL is overloaded with concurrent writes.
|
||||
1. [21615](https://github.com/influxdata/influxdb/pull/21615): Ported the `influxd inspect verify-tsm` command from 1.x.
|
||||
1. [21646](https://github.com/influxdata/influxdb/pull/21646): Ported the `influxd inspect verify-tombstone` command from 1.x.
|
||||
1. [21761](https://github.com/influxdata/influxdb/pull/21761): Ported the `influxd inspect dump-tsm` command from 1.x.
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
|
|
|
@ -0,0 +1,342 @@
|
|||
package dump_tsm
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
type args struct {
|
||||
dumpIndex bool
|
||||
dumpBlocks bool
|
||||
dumpAll bool
|
||||
filterKey string
|
||||
path string
|
||||
}
|
||||
|
||||
func NewDumpTSMCommand() *cobra.Command {
|
||||
var arguments args
|
||||
cmd := &cobra.Command{
|
||||
Use: "dump-tsm",
|
||||
Short: "Dumps low-level details about tsm1 files",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
if arguments.path == "" {
|
||||
cmd.PrintErrf("TSM File not specified\n")
|
||||
return nil
|
||||
}
|
||||
arguments.dumpBlocks = arguments.dumpBlocks || arguments.dumpAll
|
||||
arguments.dumpIndex = arguments.dumpIndex || arguments.dumpAll
|
||||
return dumpTSM(cmd, arguments)
|
||||
},
|
||||
}
|
||||
|
||||
cmd.Flags().StringVar(&arguments.path, "file", "",
|
||||
"Path to TSM file")
|
||||
cmd.Flags().BoolVar(&arguments.dumpIndex, "index", false,
|
||||
"Dump raw index data")
|
||||
cmd.Flags().BoolVar(&arguments.dumpBlocks, "blocks", false,
|
||||
"Dump raw block data")
|
||||
cmd.Flags().BoolVar(&arguments.dumpAll, "all", false,
|
||||
"Dump all data. Caution: This may print a lot of information")
|
||||
cmd.Flags().StringVar(&arguments.filterKey, "filter-key", "",
|
||||
"Only display data matching this key substring")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
||||
func dumpTSM(cmd *cobra.Command, args args) error {
|
||||
f, err := os.Open(args.path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Get the file size
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r, err := tsm1.NewTSMReader(f)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error opening TSM file: %w", err)
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
minTime, maxTime := r.TimeRange()
|
||||
keyCount := r.KeyCount()
|
||||
|
||||
blockStats := &blockStats{}
|
||||
|
||||
cmd.Println("Summary:")
|
||||
cmd.Printf(" File: %s\n", args.path)
|
||||
cmd.Printf(" Time Range: %s - %s\n",
|
||||
time.Unix(0, minTime).UTC().Format(time.RFC3339Nano),
|
||||
time.Unix(0, maxTime).UTC().Format(time.RFC3339Nano),
|
||||
)
|
||||
cmd.Printf(" Duration: %s ", time.Unix(0, maxTime).Sub(time.Unix(0, minTime)))
|
||||
cmd.Printf(" Series: %d ", keyCount)
|
||||
cmd.Printf(" File Size: %d\n\n", stat.Size())
|
||||
|
||||
tw := tabwriter.NewWriter(cmd.OutOrStdout(), 8, 8, 1, '\t', 0)
|
||||
|
||||
if args.dumpIndex {
|
||||
dumpIndex(cmd, args, dumpIndexParams{
|
||||
tw: tw,
|
||||
minTime: minTime,
|
||||
maxTime: maxTime,
|
||||
keyCount: keyCount,
|
||||
r: r,
|
||||
})
|
||||
}
|
||||
|
||||
if args.dumpBlocks {
|
||||
tw = tabwriter.NewWriter(cmd.OutOrStdout(), 8, 8, 1, '\t', 0)
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Chk", "Ofs", "Len", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t"))
|
||||
}
|
||||
|
||||
indexSize := r.IndexSize()
|
||||
blockCount, pointCount, blockSize, err := dumpBlocks(cmd, dumpBlocksParams{
|
||||
tw: tw,
|
||||
keyCount: keyCount,
|
||||
filterKey: args.filterKey,
|
||||
dumpBlocks: args.dumpBlocks,
|
||||
blockStats: blockStats,
|
||||
f: f,
|
||||
r: r,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode block in tsm1 file: %w", err)
|
||||
}
|
||||
|
||||
// Flush the printer to display all block details
|
||||
if args.dumpBlocks {
|
||||
cmd.Println("Blocks:")
|
||||
tw.Flush()
|
||||
cmd.Println()
|
||||
}
|
||||
|
||||
// Always print summary statistics about the blocks
|
||||
var blockSizeAvg int64
|
||||
if blockCount > 0 {
|
||||
blockSizeAvg = blockSize / blockCount
|
||||
}
|
||||
cmd.Printf("Statistics\n")
|
||||
cmd.Printf(" Blocks:\n")
|
||||
cmd.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n",
|
||||
blockCount, blockSize, blockStats.min, blockStats.max, blockSizeAvg)
|
||||
cmd.Printf(" Index:\n")
|
||||
cmd.Printf(" Total: %d Size: %d\n", blockCount, indexSize)
|
||||
cmd.Printf(" Points:\n")
|
||||
cmd.Printf(" Total: %d\n", pointCount)
|
||||
|
||||
cmd.Println(" Encoding:")
|
||||
for i, counts := range blockStats.counts {
|
||||
if len(counts) == 0 {
|
||||
continue
|
||||
}
|
||||
cmd.Printf(" %s: ", strings.Title(fieldType[i]))
|
||||
for j, v := range counts {
|
||||
cmd.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100))
|
||||
}
|
||||
cmd.Println()
|
||||
}
|
||||
cmd.Printf(" Compression:\n")
|
||||
cmd.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount))
|
||||
cmd.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type dumpIndexParams struct {
|
||||
tw *tabwriter.Writer
|
||||
minTime int64
|
||||
maxTime int64
|
||||
keyCount int
|
||||
r *tsm1.TSMReader
|
||||
}
|
||||
|
||||
func dumpIndex(cmd *cobra.Command, args args, info dumpIndexParams) {
|
||||
cmd.Println("Index:")
|
||||
info.tw.Flush()
|
||||
cmd.Println()
|
||||
|
||||
fmt.Fprintln(info.tw, " "+strings.Join([]string{"Pos", "Min Time", "Max Time", "Ofs", "Size", "Key", "Field"}, "\t"))
|
||||
var pos int
|
||||
for i := 0; i < info.keyCount; i++ {
|
||||
key, _ := info.r.KeyAt(i)
|
||||
for _, e := range info.r.Entries(key) {
|
||||
pos++
|
||||
split := strings.Split(string(key), "#!~#")
|
||||
|
||||
measurement := split[0]
|
||||
field := split[1]
|
||||
|
||||
if args.filterKey != "" && !strings.Contains(string(key), args.filterKey) {
|
||||
continue
|
||||
}
|
||||
fmt.Fprintln(info.tw, " "+strings.Join([]string{
|
||||
strconv.FormatInt(int64(pos), 10),
|
||||
time.Unix(0, e.MinTime).UTC().Format(time.RFC3339Nano),
|
||||
time.Unix(0, e.MaxTime).UTC().Format(time.RFC3339Nano),
|
||||
strconv.FormatInt(e.Offset, 10),
|
||||
strconv.FormatInt(int64(e.Size), 10),
|
||||
measurement,
|
||||
field,
|
||||
}, "\t"))
|
||||
info.tw.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type dumpBlocksParams struct {
|
||||
tw *tabwriter.Writer
|
||||
keyCount int
|
||||
filterKey string
|
||||
dumpBlocks bool
|
||||
blockStats *blockStats
|
||||
f *os.File
|
||||
r *tsm1.TSMReader
|
||||
}
|
||||
|
||||
func dumpBlocks(cmd *cobra.Command, params dumpBlocksParams) (int64, int64, int64, error) {
|
||||
// Starting at 5 because the magic number is 4 bytes + 1 byte version
|
||||
i := int64(5)
|
||||
b := make([]byte, 8)
|
||||
var blockCount, pointCount, blockSize int64
|
||||
|
||||
// Start at the beginning and read every block
|
||||
for j := 0; j < params.keyCount; j++ {
|
||||
key, _ := params.r.KeyAt(j)
|
||||
for _, e := range params.r.Entries(key) {
|
||||
|
||||
params.f.Seek(e.Offset, 0)
|
||||
params.f.Read(b[:4])
|
||||
|
||||
chksum := binary.BigEndian.Uint32(b[:4])
|
||||
|
||||
buf := make([]byte, e.Size-4)
|
||||
params.f.Read(buf)
|
||||
|
||||
blockSize += int64(e.Size)
|
||||
|
||||
if params.filterKey != "" && !strings.Contains(string(key), params.filterKey) {
|
||||
i += blockSize
|
||||
blockCount++
|
||||
continue
|
||||
}
|
||||
|
||||
blockType := buf[0]
|
||||
|
||||
encoded := buf[1:]
|
||||
|
||||
var v []tsm1.Value
|
||||
v, err := tsm1.DecodeBlock(buf, v)
|
||||
if err != nil {
|
||||
return 0, 0, 0, err
|
||||
}
|
||||
startTime := time.Unix(0, v[0].UnixNano())
|
||||
|
||||
pointCount += int64(len(v))
|
||||
|
||||
// Length of the timestamp block
|
||||
tsLen, j := binary.Uvarint(encoded)
|
||||
|
||||
// Unpack the timestamp bytes
|
||||
ts := encoded[j : j+int(tsLen)]
|
||||
|
||||
// Unpack the value bytes
|
||||
values := encoded[j+int(tsLen):]
|
||||
|
||||
tsEncoding := timeEnc[int(ts[0]>>4)]
|
||||
vEncoding := encDescs[int(blockType+1)][values[0]>>4]
|
||||
|
||||
typeDesc := blockTypes[blockType]
|
||||
|
||||
params.blockStats.inc(0, ts[0]>>4)
|
||||
params.blockStats.inc(int(blockType+1), values[0]>>4)
|
||||
params.blockStats.size(len(buf))
|
||||
|
||||
// Add a row of block details to the printer. Doesn't actually print yet.
|
||||
if params.dumpBlocks {
|
||||
fmt.Fprintln(params.tw, " "+strings.Join([]string{
|
||||
strconv.FormatInt(blockCount, 10),
|
||||
strconv.FormatUint(uint64(chksum), 10),
|
||||
strconv.FormatInt(i, 10),
|
||||
strconv.FormatInt(int64(len(buf)), 10),
|
||||
typeDesc,
|
||||
startTime.UTC().Format(time.RFC3339Nano),
|
||||
strconv.FormatInt(int64(len(v)), 10),
|
||||
fmt.Sprintf("%s/%s", tsEncoding, vEncoding),
|
||||
fmt.Sprintf("%d/%d", len(ts), len(values)),
|
||||
}, "\t"))
|
||||
}
|
||||
|
||||
i += blockSize
|
||||
blockCount++
|
||||
}
|
||||
}
|
||||
return blockCount, pointCount, blockSize, nil
|
||||
}
|
||||
|
||||
var (
|
||||
fieldType = []string{
|
||||
"timestamp", "float", "int", "bool", "string", "unsigned",
|
||||
}
|
||||
blockTypes = []string{
|
||||
"float64", "int64", "bool", "string", "unsigned",
|
||||
}
|
||||
timeEnc = []string{
|
||||
"none", "s8b", "rle",
|
||||
}
|
||||
floatEnc = []string{
|
||||
"none", "gor",
|
||||
}
|
||||
intEnc = []string{
|
||||
"none", "s8b", "rle",
|
||||
}
|
||||
boolEnc = []string{
|
||||
"none", "bp",
|
||||
}
|
||||
stringEnc = []string{
|
||||
"none", "snpy",
|
||||
}
|
||||
unsignedEnc = []string{
|
||||
"none", "s8b", "rle",
|
||||
}
|
||||
encDescs = [][]string{
|
||||
timeEnc, floatEnc, intEnc, boolEnc, stringEnc, unsignedEnc,
|
||||
}
|
||||
)
|
||||
|
||||
type blockStats struct {
|
||||
min, max int
|
||||
counts [][]int
|
||||
}
|
||||
|
||||
func (b *blockStats) inc(typ int, enc byte) {
|
||||
for len(b.counts) <= typ {
|
||||
b.counts = append(b.counts, []int{})
|
||||
}
|
||||
for len(b.counts[typ]) <= int(enc) {
|
||||
b.counts[typ] = append(b.counts[typ], 0)
|
||||
}
|
||||
b.counts[typ][enc]++
|
||||
}
|
||||
|
||||
func (b *blockStats) size(sz int) {
|
||||
if b.min == 0 || sz < b.min {
|
||||
b.min = sz
|
||||
}
|
||||
if b.min == 0 || sz > b.max {
|
||||
b.max = sz
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package inspect
|
||||
|
||||
import (
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/dump_tsm"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_index"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/export_lp"
|
||||
"github.com/influxdata/influxdb/v2/cmd/influxd/inspect/verify_seriesfile"
|
||||
|
@ -30,6 +31,7 @@ func NewCommand(v *viper.Viper) (*cobra.Command, error) {
|
|||
base.AddCommand(verify_tsm.NewTSMVerifyCommand())
|
||||
base.AddCommand(verify_seriesfile.NewVerifySeriesfileCommand())
|
||||
base.AddCommand(verify_tombstone.NewVerifyTombstoneCommand())
|
||||
base.AddCommand(dump_tsm.NewDumpTSMCommand())
|
||||
|
||||
return base, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue