commit
16b3084ca9
|
@ -18,10 +18,16 @@ import (
|
|||
|
||||
func main() {
|
||||
|
||||
var path string
|
||||
var path, tsm string
|
||||
flag.StringVar(&path, "p", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]")
|
||||
flag.StringVar(&tsm, "tsm", "", "Path to a tsm1 files")
|
||||
flag.Parse()
|
||||
|
||||
if tsm != "" {
|
||||
dumpTsm1(tsm)
|
||||
return
|
||||
}
|
||||
|
||||
tstore := tsdb.NewStore(filepath.Join(path, "data"))
|
||||
tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
|
||||
tstore.EngineOptions.Config.Dir = filepath.Join(path, "data")
|
||||
|
@ -70,34 +76,18 @@ func main() {
|
|||
// Sample a point from each measurement to determine the field types
|
||||
for _, shardID := range shardIDs {
|
||||
shard := tstore.Shard(shardID)
|
||||
tx, err := shard.ReadOnlyTx()
|
||||
if err != nil {
|
||||
fmt.Printf("Failed to get transaction: %v", err)
|
||||
}
|
||||
|
||||
for _, key := range series {
|
||||
fieldSummary := []string{}
|
||||
cursor := tx.Cursor(key, m.FieldNames(), shard.FieldCodec(m.Name), true)
|
||||
codec := shard.FieldCodec(m.Name)
|
||||
for _, field := range codec.Fields() {
|
||||
ft := fmt.Sprintf("%s:%s", field.Name, field.Type)
|
||||
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
|
||||
len(fields), ft, len(series))
|
||||
|
||||
// Series doesn't exist in this shard
|
||||
if cursor == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Seek to the beginning
|
||||
_, fields := cursor.SeekTo(0)
|
||||
if fields, ok := fields.(map[string]interface{}); ok {
|
||||
for field, value := range fields {
|
||||
fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value))
|
||||
}
|
||||
sort.Strings(fieldSummary)
|
||||
|
||||
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
|
||||
len(fields), strings.Join(fieldSummary, ","), len(series))
|
||||
}
|
||||
break
|
||||
}
|
||||
tx.Rollback()
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,379 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
"github.com/influxdb/influxdb/tsdb/engine/tsm1"
|
||||
)
|
||||
|
||||
type tsmIndex struct {
|
||||
series int
|
||||
offset int64
|
||||
minTime time.Time
|
||||
maxTime time.Time
|
||||
blocks []*block
|
||||
}
|
||||
|
||||
type block struct {
|
||||
id uint64
|
||||
offset int64
|
||||
}
|
||||
|
||||
type blockStats struct {
|
||||
min, max int
|
||||
counts [][]int
|
||||
}
|
||||
|
||||
func (b *blockStats) inc(typ int, enc byte) {
|
||||
for len(b.counts) <= typ {
|
||||
b.counts = append(b.counts, []int{})
|
||||
}
|
||||
for len(b.counts[typ]) <= int(enc) {
|
||||
b.counts[typ] = append(b.counts[typ], 0)
|
||||
}
|
||||
b.counts[typ][enc] += 1
|
||||
}
|
||||
|
||||
func (b *blockStats) size(sz int) {
|
||||
if b.min == 0 || sz < b.min {
|
||||
b.min = sz
|
||||
}
|
||||
if b.min == 0 || sz > b.max {
|
||||
b.max = sz
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
fieldType = []string{
|
||||
"timestamp", "float", "int", "bool", "string",
|
||||
}
|
||||
blockTypes = []string{
|
||||
"float64", "int64", "bool", "string",
|
||||
}
|
||||
timeEnc = []string{
|
||||
"none", "s8b", "rle",
|
||||
}
|
||||
floatEnc = []string{
|
||||
"none", "gor",
|
||||
}
|
||||
intEnc = []string{
|
||||
"none", "s8b",
|
||||
}
|
||||
boolEnc = []string{
|
||||
"none", "bp",
|
||||
}
|
||||
stringEnc = []string{
|
||||
"none", "snpy",
|
||||
}
|
||||
encDescs = [][]string{
|
||||
timeEnc, floatEnc, intEnc, boolEnc, stringEnc,
|
||||
}
|
||||
)
|
||||
|
||||
func readFields(path string) (map[string]*tsdb.MeasurementFields, error) {
|
||||
fields := make(map[string]*tsdb.MeasurementFields)
|
||||
|
||||
f, err := os.OpenFile(filepath.Join(path, tsm1.FieldsFileExtension), os.O_RDONLY, 0666)
|
||||
if os.IsNotExist(err) {
|
||||
return fields, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b, err := ioutil.ReadAll(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := snappy.Decode(nil, b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &fields); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return fields, nil
|
||||
}
|
||||
|
||||
func readSeries(path string) (map[string]*tsdb.Series, error) {
|
||||
series := make(map[string]*tsdb.Series)
|
||||
|
||||
f, err := os.OpenFile(filepath.Join(path, tsm1.SeriesFileExtension), os.O_RDONLY, 0666)
|
||||
if os.IsNotExist(err) {
|
||||
return series, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.Close()
|
||||
b, err := ioutil.ReadAll(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := snappy.Decode(nil, b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &series); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return series, nil
|
||||
}
|
||||
|
||||
func readIds(path string) (map[string]uint64, error) {
|
||||
f, err := os.OpenFile(filepath.Join(path, tsm1.IDsFileExtension), os.O_RDONLY, 0666)
|
||||
if os.IsNotExist(err) {
|
||||
return nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b, err := ioutil.ReadAll(f)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
b, err = snappy.Decode(nil, b)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ids := make(map[string]uint64)
|
||||
if b != nil {
|
||||
if err := json.Unmarshal(b, &ids); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return ids, err
|
||||
}
|
||||
func readIndex(f *os.File) *tsmIndex {
|
||||
// Get the file size
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
// Seek to the series count
|
||||
f.Seek(-4, os.SEEK_END)
|
||||
b := make([]byte, 8)
|
||||
_, err = f.Read(b[:4])
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
seriesCount := binary.BigEndian.Uint32(b)
|
||||
|
||||
// Get the min time
|
||||
f.Seek(-20, os.SEEK_END)
|
||||
f.Read(b)
|
||||
minTime := time.Unix(0, int64(btou64(b)))
|
||||
|
||||
// Get max time
|
||||
f.Seek(-12, os.SEEK_END)
|
||||
f.Read(b)
|
||||
maxTime := time.Unix(0, int64(btou64(b)))
|
||||
|
||||
// Figure out where the index starts
|
||||
indexStart := stat.Size() - int64(seriesCount*12+20)
|
||||
|
||||
// Seek to the start of the index
|
||||
f.Seek(indexStart, os.SEEK_SET)
|
||||
count := int(seriesCount)
|
||||
index := &tsmIndex{
|
||||
offset: indexStart,
|
||||
minTime: minTime,
|
||||
maxTime: maxTime,
|
||||
series: count,
|
||||
}
|
||||
|
||||
// Read the index entries
|
||||
for i := 0; i < count; i++ {
|
||||
f.Read(b)
|
||||
id := binary.BigEndian.Uint64(b)
|
||||
f.Read(b[:4])
|
||||
pos := binary.BigEndian.Uint32(b[:4])
|
||||
index.blocks = append(index.blocks, &block{id: id, offset: int64(pos)})
|
||||
}
|
||||
|
||||
return index
|
||||
}
|
||||
|
||||
func dumpTsm1(path string) {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Get the file size
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
println(err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
b := make([]byte, 8)
|
||||
f.Read(b[:4])
|
||||
|
||||
// Verify magic number
|
||||
if binary.BigEndian.Uint32(b[:4]) != 0x16D116D1 {
|
||||
println("Not a tsm1 file.")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
ids, err := readIds(filepath.Dir(path))
|
||||
if err != nil {
|
||||
println("Failed to read series:", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
invIds := map[uint64]string{}
|
||||
for k, v := range ids {
|
||||
invIds[v] = k
|
||||
}
|
||||
|
||||
index := readIndex(f)
|
||||
blockStats := &blockStats{}
|
||||
|
||||
println("Summary:")
|
||||
fmt.Printf(" File: %s\n", path)
|
||||
fmt.Printf(" Time Range: %s - %s\n",
|
||||
index.minTime.UTC().Format(time.RFC3339Nano),
|
||||
index.maxTime.UTC().Format(time.RFC3339Nano),
|
||||
)
|
||||
fmt.Printf(" Duration: %s ", index.maxTime.Sub(index.minTime))
|
||||
fmt.Printf(" Series: %d ", index.series)
|
||||
fmt.Printf(" File Size: %d\n", stat.Size())
|
||||
println()
|
||||
|
||||
println("Index:")
|
||||
tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0)
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "ID", "Ofs", "Key", "Field"}, "\t"))
|
||||
for i, block := range index.blocks {
|
||||
key := invIds[block.id]
|
||||
split := strings.Split(key, "#!~#")
|
||||
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{
|
||||
strconv.FormatInt(int64(i), 10),
|
||||
strconv.FormatUint(block.id, 10),
|
||||
strconv.FormatInt(int64(block.offset), 10),
|
||||
split[0],
|
||||
split[1],
|
||||
}, "\t"))
|
||||
|
||||
}
|
||||
tw.Flush()
|
||||
println()
|
||||
println("Blocks:")
|
||||
|
||||
tw = tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0)
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Ofs", "Len", "ID", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t"))
|
||||
|
||||
// Staring at 4 because the magic number is 4 bytes
|
||||
i := int64(4)
|
||||
var blockCount, pointCount, blockSize int64
|
||||
indexSize := stat.Size() - index.offset
|
||||
|
||||
// Start at the beginning and read every block
|
||||
for i < index.offset {
|
||||
f.Seek(int64(i), 0)
|
||||
|
||||
f.Read(b)
|
||||
id := btou64(b)
|
||||
f.Read(b[:4])
|
||||
length := binary.BigEndian.Uint32(b[:4])
|
||||
buf := make([]byte, length)
|
||||
f.Read(buf)
|
||||
|
||||
blockSize += int64(len(buf)) + 12
|
||||
|
||||
startTime := time.Unix(0, int64(btou64(buf[:8])))
|
||||
blockType := buf[8]
|
||||
|
||||
encoded := buf[9:]
|
||||
|
||||
v, err := tsm1.DecodeBlock(buf)
|
||||
if err != nil {
|
||||
fmt.Printf("error: %v\n", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
pointCount += int64(len(v))
|
||||
|
||||
// Length of the timestamp block
|
||||
tsLen, j := binary.Uvarint(encoded)
|
||||
|
||||
// Unpack the timestamp bytes
|
||||
ts := encoded[int(j) : int(j)+int(tsLen)]
|
||||
|
||||
// Unpack the value bytes
|
||||
values := encoded[int(j)+int(tsLen):]
|
||||
|
||||
tsEncoding := timeEnc[int(ts[0]>>4)]
|
||||
vEncoding := encDescs[int(blockType+1)][values[0]>>4]
|
||||
|
||||
typeDesc := blockTypes[blockType]
|
||||
|
||||
blockStats.inc(0, ts[0]>>4)
|
||||
blockStats.inc(int(blockType+1), values[0]>>4)
|
||||
blockStats.size(len(buf))
|
||||
|
||||
fmt.Fprintln(tw, " "+strings.Join([]string{
|
||||
strconv.FormatInt(blockCount, 10),
|
||||
strconv.FormatInt(i, 10),
|
||||
strconv.FormatInt(int64(len(buf)), 10),
|
||||
strconv.FormatUint(id, 10),
|
||||
typeDesc,
|
||||
startTime.UTC().Format(time.RFC3339Nano),
|
||||
strconv.FormatInt(int64(len(v)), 10),
|
||||
fmt.Sprintf("%s/%s", tsEncoding, vEncoding),
|
||||
fmt.Sprintf("%d/%d", len(ts), len(values)),
|
||||
}, "\t"))
|
||||
|
||||
i += (12 + int64(length))
|
||||
blockCount += 1
|
||||
}
|
||||
|
||||
tw.Flush()
|
||||
println()
|
||||
|
||||
fmt.Printf("Statistics\n")
|
||||
fmt.Printf(" Blocks:\n")
|
||||
fmt.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n",
|
||||
blockCount, blockSize, blockStats.min, blockStats.max, blockSize/blockCount)
|
||||
fmt.Printf(" Index:\n")
|
||||
fmt.Printf(" Total: %d Size: %d\n", len(index.blocks), indexSize)
|
||||
fmt.Printf(" Points:\n")
|
||||
fmt.Printf(" Total: %d", pointCount)
|
||||
println()
|
||||
|
||||
println(" Encoding:")
|
||||
for i, counts := range blockStats.counts {
|
||||
if len(counts) == 0 {
|
||||
continue
|
||||
}
|
||||
fmt.Printf(" %s: ", strings.Title(fieldType[i]))
|
||||
for j, v := range counts {
|
||||
fmt.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100))
|
||||
}
|
||||
println()
|
||||
}
|
||||
fmt.Printf(" Compression:\n")
|
||||
fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount))
|
||||
fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount))
|
||||
|
||||
println()
|
||||
}
|
|
@ -368,7 +368,7 @@ func (m *Monitor) storeStatistics() {
|
|||
|
||||
points := make(models.Points, 0, len(stats))
|
||||
for _, s := range stats {
|
||||
points = append(points, models.NewPoint(s.Name, s.Tags, s.Values, time.Now()))
|
||||
points = append(points, models.NewPoint(s.Name, s.Tags, s.Values, time.Now().Truncate(time.Second)))
|
||||
}
|
||||
|
||||
err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
||||
|
|
|
@ -124,7 +124,7 @@ func (e *encoder) Bytes() ([]byte, error) {
|
|||
max, div, rle, dts := e.reduce()
|
||||
|
||||
// The deltas are all the same, so we can run-length encode them
|
||||
if rle && len(e.ts) > 60 {
|
||||
if rle && len(e.ts) > 1 {
|
||||
return e.encodeRLE(e.ts[0], e.ts[1], div, len(e.ts))
|
||||
}
|
||||
|
||||
|
|
|
@ -24,8 +24,8 @@ func Test_TimeEncoder(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != timeCompressedPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
if got := b[0] >> 4; got != timeCompressedRLE {
|
||||
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
|
||||
}
|
||||
|
||||
dec := NewTimeDecoder(b)
|
||||
|
@ -89,8 +89,8 @@ func Test_TimeEncoder_Two(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != timeCompressedPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
if got := b[0] >> 4; got != timeCompressedRLE {
|
||||
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
|
||||
}
|
||||
|
||||
dec := NewTimeDecoder(b)
|
||||
|
@ -115,7 +115,7 @@ func Test_TimeEncoder_Three(t *testing.T) {
|
|||
enc := NewTimeEncoder()
|
||||
t1 := time.Unix(0, 0)
|
||||
t2 := time.Unix(0, 1)
|
||||
t3 := time.Unix(0, 2)
|
||||
t3 := time.Unix(0, 3)
|
||||
|
||||
enc.Write(t1)
|
||||
enc.Write(t2)
|
||||
|
@ -127,7 +127,7 @@ func Test_TimeEncoder_Three(t *testing.T) {
|
|||
}
|
||||
|
||||
if got := b[0] >> 4; got != timeCompressedPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
|
||||
}
|
||||
|
||||
dec := NewTimeDecoder(b)
|
||||
|
@ -167,8 +167,8 @@ func Test_TimeEncoder_Large_Range(t *testing.T) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if got := b[0] >> 4; got != timeCompressedPackedSimple {
|
||||
t.Fatalf("Wrong encoding used: expected uncompressed, got %v", got)
|
||||
if got := b[0] >> 4; got != timeCompressedRLE {
|
||||
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
|
||||
}
|
||||
|
||||
dec := NewTimeDecoder(b)
|
||||
|
@ -285,7 +285,7 @@ func Test_TimeEncoder_Reverse(t *testing.T) {
|
|||
ts := []time.Time{
|
||||
time.Unix(0, 3),
|
||||
time.Unix(0, 2),
|
||||
time.Unix(0, 1),
|
||||
time.Unix(0, 0),
|
||||
}
|
||||
|
||||
for _, v := range ts {
|
||||
|
@ -390,6 +390,46 @@ func Test_TimeEncoder_Quick(t *testing.T) {
|
|||
}, nil)
|
||||
}
|
||||
|
||||
func Test_TimeEncoder_RLESeconds(t *testing.T) {
|
||||
enc := NewTimeEncoder()
|
||||
ts := make([]time.Time, 6)
|
||||
|
||||
ts[0] = time.Unix(0, 1444448158000000000)
|
||||
ts[1] = time.Unix(0, 1444448168000000000)
|
||||
ts[2] = time.Unix(0, 1444448178000000000)
|
||||
ts[3] = time.Unix(0, 1444448188000000000)
|
||||
ts[4] = time.Unix(0, 1444448198000000000)
|
||||
ts[5] = time.Unix(0, 1444448208000000000)
|
||||
|
||||
for _, v := range ts {
|
||||
enc.Write(v)
|
||||
}
|
||||
|
||||
b, err := enc.Bytes()
|
||||
if got := b[0] >> 4; got != timeCompressedRLE {
|
||||
t.Fatalf("Wrong encoding used: expected rle, got %v", got)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
dec := NewTimeDecoder(b)
|
||||
for i, v := range ts {
|
||||
if !dec.Next() {
|
||||
t.Fatalf("Next == false, expected true")
|
||||
}
|
||||
|
||||
if v != dec.Read() {
|
||||
t.Fatalf("Item %d mismatch, got %v, exp %v", i, dec.Read(), v)
|
||||
}
|
||||
}
|
||||
|
||||
if dec.Next() {
|
||||
t.Fatalf("unexpected extra values")
|
||||
}
|
||||
|
||||
}
|
||||
func BenchmarkTimeEncoder(b *testing.B) {
|
||||
enc := NewTimeEncoder()
|
||||
x := make([]time.Time, 1024)
|
||||
|
|
Loading…
Reference in New Issue