diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index f299fee1c9..cec4543aeb 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -18,10 +18,16 @@ import ( func main() { - var path string + var path, tsm string flag.StringVar(&path, "p", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]") + flag.StringVar(&tsm, "tsm", "", "Path to a tsm1 files") flag.Parse() + if tsm != "" { + dumpTsm1(tsm) + return + } + tstore := tsdb.NewStore(filepath.Join(path, "data")) tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags) tstore.EngineOptions.Config.Dir = filepath.Join(path, "data") @@ -70,34 +76,18 @@ func main() { // Sample a point from each measurement to determine the field types for _, shardID := range shardIDs { shard := tstore.Shard(shardID) - tx, err := shard.ReadOnlyTx() if err != nil { fmt.Printf("Failed to get transaction: %v", err) } - for _, key := range series { - fieldSummary := []string{} - cursor := tx.Cursor(key, m.FieldNames(), shard.FieldCodec(m.Name), true) + codec := shard.FieldCodec(m.Name) + for _, field := range codec.Fields() { + ft := fmt.Sprintf("%s:%s", field.Name, field.Type) + fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues, + len(fields), ft, len(series)) - // Series doesn't exist in this shard - if cursor == nil { - continue - } - - // Seek to the beginning - _, fields := cursor.SeekTo(0) - if fields, ok := fields.(map[string]interface{}); ok { - for field, value := range fields { - fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value)) - } - sort.Strings(fieldSummary) - - fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues, - len(fields), strings.Join(fieldSummary, ","), len(series)) - } - break } - tx.Rollback() + } } } diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go new file mode 100644 index 0000000000..b8d9b44f65 --- /dev/null +++ b/cmd/influx_inspect/tsm.go @@ -0,0 +1,379 @@ +package main + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "text/tabwriter" + "time" + + "github.com/golang/snappy" + "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +type tsmIndex struct { + series int + offset int64 + minTime time.Time + maxTime time.Time + blocks []*block +} + +type block struct { + id uint64 + offset int64 +} + +type blockStats struct { + min, max int + counts [][]int +} + +func (b *blockStats) inc(typ int, enc byte) { + for len(b.counts) <= typ { + b.counts = append(b.counts, []int{}) + } + for len(b.counts[typ]) <= int(enc) { + b.counts[typ] = append(b.counts[typ], 0) + } + b.counts[typ][enc] += 1 +} + +func (b *blockStats) size(sz int) { + if b.min == 0 || sz < b.min { + b.min = sz + } + if b.min == 0 || sz > b.max { + b.max = sz + } +} + +var ( + fieldType = []string{ + "timestamp", "float", "int", "bool", "string", + } + blockTypes = []string{ + "float64", "int64", "bool", "string", + } + timeEnc = []string{ + "none", "s8b", "rle", + } + floatEnc = []string{ + "none", "gor", + } + intEnc = []string{ + "none", "s8b", + } + boolEnc = []string{ + "none", "bp", + } + stringEnc = []string{ + "none", "snpy", + } + encDescs = [][]string{ + timeEnc, floatEnc, intEnc, boolEnc, stringEnc, + } +) + +func readFields(path string) (map[string]*tsdb.MeasurementFields, error) { + fields := make(map[string]*tsdb.MeasurementFields) + + f, err := os.OpenFile(filepath.Join(path, tsm1.FieldsFileExtension), os.O_RDONLY, 0666) + if os.IsNotExist(err) { + return fields, nil + } else if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + data, err := snappy.Decode(nil, b) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(data, &fields); err != nil { + return nil, err + } + return fields, nil +} + +func readSeries(path string) (map[string]*tsdb.Series, error) { + series := make(map[string]*tsdb.Series) + + f, err := os.OpenFile(filepath.Join(path, tsm1.SeriesFileExtension), os.O_RDONLY, 0666) + if os.IsNotExist(err) { + return series, nil + } else if err != nil { + return nil, err + } + defer f.Close() + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + data, err := snappy.Decode(nil, b) + if err != nil { + return nil, err + } + + if err := json.Unmarshal(data, &series); err != nil { + return nil, err + } + + return series, nil +} + +func readIds(path string) (map[string]uint64, error) { + f, err := os.OpenFile(filepath.Join(path, tsm1.IDsFileExtension), os.O_RDONLY, 0666) + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { + return nil, err + } + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + b, err = snappy.Decode(nil, b) + if err != nil { + return nil, err + } + + ids := make(map[string]uint64) + if b != nil { + if err := json.Unmarshal(b, &ids); err != nil { + return nil, err + } + } + return ids, err +} +func readIndex(f *os.File) *tsmIndex { + // Get the file size + stat, err := f.Stat() + if err != nil { + panic(err.Error()) + } + + // Seek to the series count + f.Seek(-4, os.SEEK_END) + b := make([]byte, 8) + _, err = f.Read(b[:4]) + if err != nil { + fmt.Printf("error: %v\n", err.Error()) + os.Exit(1) + } + + seriesCount := binary.BigEndian.Uint32(b) + + // Get the min time + f.Seek(-20, os.SEEK_END) + f.Read(b) + minTime := time.Unix(0, int64(btou64(b))) + + // Get max time + f.Seek(-12, os.SEEK_END) + f.Read(b) + maxTime := time.Unix(0, int64(btou64(b))) + + // Figure out where the index starts + indexStart := stat.Size() - int64(seriesCount*12+20) + + // Seek to the start of the index + f.Seek(indexStart, os.SEEK_SET) + count := int(seriesCount) + index := &tsmIndex{ + offset: indexStart, + minTime: minTime, + maxTime: maxTime, + series: count, + } + + // Read the index entries + for i := 0; i < count; i++ { + f.Read(b) + id := binary.BigEndian.Uint64(b) + f.Read(b[:4]) + pos := binary.BigEndian.Uint32(b[:4]) + index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)}) + } + + return index +} + +func dumpTsm1(path string) { + f, err := os.Open(path) + if err != nil { + println(err.Error()) + os.Exit(1) + } + + // Get the file size + stat, err := f.Stat() + if err != nil { + println(err.Error()) + os.Exit(1) + } + + b := make([]byte, 8) + f.Read(b[:4]) + + // Verify magic number + if binary.BigEndian.Uint32(b[:4]) != 0x16D116D1 { + println("Not a tsm1 file.") + os.Exit(1) + } + + ids, err := readIds(filepath.Dir(path)) + if err != nil { + println("Failed to read series:", err.Error()) + os.Exit(1) + } + + invIds := map[uint64]string{} + for k, v := range ids { + invIds[v] = k + } + + index := readIndex(f) + blockStats := &blockStats{} + + println("Summary:") + fmt.Printf(" File: %s\n", path) + fmt.Printf(" Time Range: %s - %s\n", + index.minTime.UTC().Format(time.RFC3339Nano), + index.maxTime.UTC().Format(time.RFC3339Nano), + ) + fmt.Printf(" Duration: %s ", index.maxTime.Sub(index.minTime)) + fmt.Printf(" Series: %d ", index.series) + fmt.Printf(" File Size: %d\n", stat.Size()) + println() + + println("Index:") + tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "ID", "Ofs", "Key", "Field"}, "\t")) + for i, block := range index.blocks { + key := invIds[block.id] + split := strings.Split(key, "#!~#") + + fmt.Fprintln(tw, " "+strings.Join([]string{ + strconv.FormatInt(int64(i), 10), + strconv.FormatUint(block.id, 10), + strconv.FormatInt(int64(block.offset), 10), + split[0], + split[1], + }, "\t")) + + } + tw.Flush() + println() + println("Blocks:") + + tw = tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Ofs", "Len", "ID", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t")) + + // Staring at 4 because the magic number is 4 bytes + i := int64(4) + var blockCount, pointCount, blockSize int64 + indexSize := stat.Size() - index.offset + + // Start at the beginning and read every block + for i < index.offset { + f.Seek(int64(i), 0) + + f.Read(b) + id := btou64(b) + f.Read(b[:4]) + length := binary.BigEndian.Uint32(b[:4]) + buf := make([]byte, length) + f.Read(buf) + + blockSize += int64(len(buf)) + 12 + + startTime := time.Unix(0, int64(btou64(buf[:8]))) + blockType := buf[8] + + encoded := buf[9:] + + v, err := tsm1.DecodeBlock(buf) + if err != nil { + fmt.Printf("error: %v\n", err.Error()) + os.Exit(1) + } + + pointCount += int64(len(v)) + + // Length of the timestamp block + tsLen, j := binary.Uvarint(encoded) + + // Unpack the timestamp bytes + ts := encoded[int(j) : int(j)+int(tsLen)] + + // Unpack the value bytes + values := encoded[int(j)+int(tsLen):] + + tsEncoding := timeEnc[int(ts[0]>>4)] + vEncoding := encDescs[int(blockType+1)][values[0]>>4] + + typeDesc := blockTypes[blockType] + + blockStats.inc(0, ts[0]>>4) + blockStats.inc(int(blockType+1), values[0]>>4) + blockStats.size(len(buf)) + + fmt.Fprintln(tw, " "+strings.Join([]string{ + strconv.FormatInt(blockCount, 10), + strconv.FormatInt(i, 10), + strconv.FormatInt(int64(len(buf)), 10), + strconv.FormatUint(id, 10), + typeDesc, + startTime.UTC().Format(time.RFC3339Nano), + strconv.FormatInt(int64(len(v)), 10), + fmt.Sprintf("%s/%s", tsEncoding, vEncoding), + fmt.Sprintf("%d/%d", len(ts), len(values)), + }, "\t")) + + i += (12 + int64(length)) + blockCount += 1 + } + + tw.Flush() + println() + + fmt.Printf("Statistics\n") + fmt.Printf(" Blocks:\n") + fmt.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n", + blockCount, blockSize, blockStats.min, blockStats.max, blockSize/blockCount) + fmt.Printf(" Index:\n") + fmt.Printf(" Total: %d Size: %d\n", len(index.blocks), indexSize) + fmt.Printf(" Points:\n") + fmt.Printf(" Total: %d", pointCount) + println() + + println(" Encoding:") + for i, counts := range blockStats.counts { + if len(counts) == 0 { + continue + } + fmt.Printf(" %s: ", strings.Title(fieldType[i])) + for j, v := range counts { + fmt.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100)) + } + println() + } + fmt.Printf(" Compression:\n") + fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount)) + fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount)) + + println() +} diff --git a/monitor/service.go b/monitor/service.go index 3e76f6b1db..bd14d9622e 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -368,7 +368,7 @@ func (m *Monitor) storeStatistics() { points := make(models.Points, 0, len(stats)) for _, s := range stats { - points = append(points, models.NewPoint(s.Name, s.Tags, s.Values, time.Now())) + points = append(points, models.NewPoint(s.Name, s.Tags, s.Values, time.Now().Truncate(time.Second))) } err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{ diff --git a/tsdb/engine/tsm1/timestamp.go b/tsdb/engine/tsm1/timestamp.go index ad7ed64419..46e33fc14e 100644 --- a/tsdb/engine/tsm1/timestamp.go +++ b/tsdb/engine/tsm1/timestamp.go @@ -124,7 +124,7 @@ func (e *encoder) Bytes() ([]byte, error) { max, div, rle, dts := e.reduce() // The deltas are all the same, so we can run-length encode them - if rle && len(e.ts) > 60 { + if rle && len(e.ts) > 1 { return e.encodeRLE(e.ts[0], e.ts[1], div, len(e.ts)) } diff --git a/tsdb/engine/tsm1/timestamp_test.go b/tsdb/engine/tsm1/timestamp_test.go index 8f4410a041..0f729d8bc8 100644 --- a/tsdb/engine/tsm1/timestamp_test.go +++ b/tsdb/engine/tsm1/timestamp_test.go @@ -24,8 +24,8 @@ func Test_TimeEncoder(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != timeCompressedPackedSimple { - t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) + if got := b[0] >> 4; got != timeCompressedRLE { + t.Fatalf("Wrong encoding used: expected rle, got %v", got) } dec := NewTimeDecoder(b) @@ -89,8 +89,8 @@ func Test_TimeEncoder_Two(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != timeCompressedPackedSimple { - t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) + if got := b[0] >> 4; got != timeCompressedRLE { + t.Fatalf("Wrong encoding used: expected rle, got %v", got) } dec := NewTimeDecoder(b) @@ -115,7 +115,7 @@ func Test_TimeEncoder_Three(t *testing.T) { enc := NewTimeEncoder() t1 := time.Unix(0, 0) t2 := time.Unix(0, 1) - t3 := time.Unix(0, 2) + t3 := time.Unix(0, 3) enc.Write(t1) enc.Write(t2) @@ -127,7 +127,7 @@ func Test_TimeEncoder_Three(t *testing.T) { } if got := b[0] >> 4; got != timeCompressedPackedSimple { - t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) + t.Fatalf("Wrong encoding used: expected rle, got %v", got) } dec := NewTimeDecoder(b) @@ -167,8 +167,8 @@ func Test_TimeEncoder_Large_Range(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - if got := b[0] >> 4; got != timeCompressedPackedSimple { - t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got) + if got := b[0] >> 4; got != timeCompressedRLE { + t.Fatalf("Wrong encoding used: expected rle, got %v", got) } dec := NewTimeDecoder(b) @@ -285,7 +285,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) { ts := []time.Time{ time.Unix(0, 3), time.Unix(0, 2), - time.Unix(0, 1), + time.Unix(0, 0), } for _, v := range ts { @@ -390,6 +390,46 @@ func Test_TimeEncoder_Quick(t *testing.T) { }, nil) } +func Test_TimeEncoder_RLESeconds(t *testing.T) { + enc := NewTimeEncoder() + ts := make([]time.Time, 6) + + ts[0] = time.Unix(0, 1444448158000000000) + ts[1] = time.Unix(0, 1444448168000000000) + ts[2] = time.Unix(0, 1444448178000000000) + ts[3] = time.Unix(0, 1444448188000000000) + ts[4] = time.Unix(0, 1444448198000000000) + ts[5] = time.Unix(0, 1444448208000000000) + + for _, v := range ts { + enc.Write(v) + } + + b, err := enc.Bytes() + if got := b[0] >> 4; got != timeCompressedRLE { + t.Fatalf("Wrong encoding used: expected rle, got %v", got) + } + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + dec := NewTimeDecoder(b) + for i, v := range ts { + if !dec.Next() { + t.Fatalf("Next == false, expected true") + } + + if v != dec.Read() { + t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v) + } + } + + if dec.Next() { + t.Fatalf("unexpected extra values") + } + +} func BenchmarkTimeEncoder(b *testing.B) { enc := NewTimeEncoder() x := make([]time.Time, 1024)