Merge pull request #3841 from influxdb/jw-file-utils

Add inspect tool
pull/4008/head
Jason Wilder 2015-09-04 14:12:05 -06:00
commit 13dbc8f0ba
5 changed files with 213 additions and 0 deletions

142
cmd/inspect/main.go Normal file
View File

@ -0,0 +1,142 @@
package main
import (
"encoding/binary"
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"strings"
"text/tabwriter"
"github.com/influxdb/influxdb/tsdb"
_ "github.com/influxdb/influxdb/tsdb/engine"
)
func main() {
var path string
flag.StringVar(&path, "p", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]")
flag.Parse()
tstore := tsdb.NewStore(filepath.Join(path, "data"))
tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
tstore.EngineOptions.Config.Dir = filepath.Join(path, "data")
tstore.EngineOptions.Config.WALLoggingEnabled = false
tstore.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
if err := tstore.Open(); err != nil {
fmt.Printf("Failed to open dir: %v\n", err)
os.Exit(1)
}
size, err := tstore.DiskSize()
if err != nil {
fmt.Printf("Failed to determine disk usage: %v\n", err)
}
// Summary stats
fmt.Printf("Shards: %d, Indexes: %d, Databases: %d, Disk Size: %d, Series: %d\n",
tstore.ShardN(), tstore.DatabaseIndexN(), len(tstore.Databases()), size, countSeries(tstore))
fmt.Println()
tw := tabwriter.NewWriter(os.Stdout, 16, 8, 0, '\t', 0)
fmt.Fprintln(tw, strings.Join([]string{"Shard", "DB", "Measurement", "Tags [#K/#V]", "Fields [Name:Type]", "Series"}, "\t"))
shardIDs := tstore.ShardIDs()
databases := tstore.Databases()
sort.Strings(databases)
for _, db := range databases {
index := tstore.DatabaseIndex(db)
measurements := index.Measurements()
sort.Sort(measurements)
for _, m := range measurements {
tags := m.TagKeys()
tagValues := 0
for _, tag := range tags {
tagValues += len(m.TagValues(tag))
}
fields := m.FieldNames()
sort.Strings(fields)
series := m.SeriesKeys()
sort.Strings(series)
sort.Sort(ShardIDs(shardIDs))
// Sample a point from each measurement to determine the field types
for _, shardID := range shardIDs {
shard := tstore.Shard(shardID)
tx, err := shard.ReadOnlyTx()
if err != nil {
fmt.Printf("Failed to get transaction: %v", err)
}
for _, key := range series {
fieldSummary := []string{}
cursor := tx.Cursor(key, tsdb.Forward)
// Series doesn't exist in this shard
if cursor == nil {
continue
}
// Seek to the beginning
_, value := cursor.Seek([]byte{})
codec := shard.FieldCodec(m.Name)
if codec != nil {
fields, err := codec.DecodeFieldsWithNames(value)
if err != nil {
fmt.Printf("Failed to decode values: %v", err)
}
for field, value := range fields {
fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value))
}
sort.Strings(fieldSummary)
}
fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues,
len(fields), strings.Join(fieldSummary, ","), len(series))
break
}
tx.Rollback()
}
}
}
tw.Flush()
}
func countSeries(tstore *tsdb.Store) int {
var count int
for _, shardID := range tstore.ShardIDs() {
shard := tstore.Shard(shardID)
cnt, err := shard.SeriesCount()
if err != nil {
fmt.Printf("series count failed: %v\n", err)
continue
}
count += cnt
}
return count
}
func btou64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// u64tob converts a uint64 into an 8-byte slice.
func u64tob(v uint64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, v)
return b
}
type ShardIDs []uint64
func (a ShardIDs) Len() int { return len(a) }
func (a ShardIDs) Less(i, j int) bool { return a[i] < a[j] }
func (a ShardIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

View File

@ -222,6 +222,20 @@ func (l *Log) Open() error {
return nil
}
func (l *Log) DiskSize() (int64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
var size int64
for _, partition := range l.partitions {
stat, err := os.Stat(partition.path)
if err != nil {
return 0, err
}
size += stat.Size()
}
return size, nil
}
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
l.mu.RLock()

View File

@ -1196,6 +1196,17 @@ func (m *Measurement) TagKeys() []string {
return keys
}
// TagValues returns all the values for the given tag key
func (m *Measurement) TagValues(key string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
values := []string{}
for v := range m.seriesByTagKeyValue[key] {
values = append(values, v)
}
return values
}
// SetFieldName adds the field name to the measurement.
func (m *Measurement) SetFieldName(name string) {
m.mu.Lock()

View File

@ -127,6 +127,25 @@ func (s *Shard) close() error {
return nil
}
// DiskSize returns the size on disk of this shard
func (s *Shard) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stats, err := os.Stat(s.path)
var size int64
if err != nil {
return 0, err
}
size += stats.Size()
return size, nil
}
// ReadOnlyTx returns a read-only transaction for the shard. The transaction must be rolled back to
// release resources.
func (s *Shard) ReadOnlyTx() (Tx, error) {
return s.engine.Begin(false)
}
// TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored
// into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme.
func (s *Shard) FieldCodec(measurementName string) *FieldCodec {

View File

@ -174,6 +174,17 @@ func (s *Store) DatabaseIndex(name string) *DatabaseIndex {
return s.databaseIndexes[name]
}
// Databases returns all the databases in the indexes
func (s *Store) Databases() []string {
s.mu.RLock()
defer s.mu.RUnlock()
databases := []string{}
for db := range s.databaseIndexes {
databases = append(databases, db)
}
return databases
}
func (s *Store) Measurement(database, name string) *Measurement {
s.mu.RLock()
db := s.databaseIndexes[database]
@ -184,6 +195,22 @@ func (s *Store) Measurement(database, name string) *Measurement {
return db.Measurement(name)
}
// DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size.
func (s *Store) DiskSize() (int64, error) {
s.mu.RLock()
defer s.mu.RUnlock()
var size int64
for _, shardID := range s.ShardIDs() {
shard := s.Shard(shardID)
sz, err := shard.DiskSize()
if err != nil {
return 0, err
}
size += sz
}
return size, nil
}
// deleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys
func (s *Store) deleteSeries(keys []string) error {
s.mu.RLock()