From 6b4926257af77dc21a9d27ccf614f11f707ed195 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 25 Aug 2015 15:44:42 -0600 Subject: [PATCH 1/3] Add inspect tool Start of a lower-level file inspection tool. This currently dumps summary statistics for the shards, index and WAL that can be used to understand the shape of the data is in the local shards. This util operates on the shards itself and not through the server and is intended more for debugging/troubleshooting. --- cmd/inspect/main.go | 129 +++++++++++++++++++++++++++++++++++++++++ tsdb/engine/wal/wal.go | 14 +++++ tsdb/meta.go | 11 ++++ tsdb/shard.go | 19 ++++++ tsdb/store.go | 27 +++++++++ 5 files changed, 200 insertions(+) create mode 100644 cmd/inspect/main.go diff --git a/cmd/inspect/main.go b/cmd/inspect/main.go new file mode 100644 index 0000000000..72d5940a92 --- /dev/null +++ b/cmd/inspect/main.go @@ -0,0 +1,129 @@ +package main + +import ( + "encoding/binary" + "flag" + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "sort" + "strings" + "text/tabwriter" + + "github.com/influxdb/influxdb/tsdb" + _ "github.com/influxdb/influxdb/tsdb/engine" +) + +func main() { + + var path string + flag.StringVar(&path, "p", os.Getenv("HOME")+"/.influxdb", "Root storage path. [$HOME/.influxdb]") + flag.Parse() + + tstore := tsdb.NewStore(filepath.Join(path, "data")) + tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags) + tstore.EngineOptions.Config.WALEnableLogging = false + tstore.EngineOptions.Config.WALDir = filepath.Join(path, "wal") + if err := tstore.Open(); err != nil { + fmt.Printf("Failed to open dir: %v\n", err) + os.Exit(1) + } + + size, err := tstore.DiskSize() + if err != nil { + fmt.Printf("Failed to determine disk usage: %v\n", err) + } + + // Summary stats + fmt.Printf("Shards: %d, Indexes: %d, Databases: %d, Disk Size: %d, Series: %d\n", + tstore.ShardN(), tstore.DatabaseIndexN(), len(tstore.Databases()), size, countSeries(tstore)) + fmt.Println() + + tw := tabwriter.NewWriter(os.Stdout, 16, 8, 0, '\t', 0) + + fmt.Fprintln(tw, strings.Join([]string{"DB", "Measurement", "Tags [#K/#V]", "Fields [Name:Type]", "Series"}, "\t")) + + shardIDs := tstore.ShardIDs() + + databases := tstore.Databases() + sort.Strings(databases) + + for _, db := range databases { + index := tstore.DatabaseIndex(db) + measurements := index.Measurements() + sort.Sort(measurements) + for _, m := range measurements { + tags := m.TagKeys() + tagValues := 0 + for _, tag := range tags { + tagValues += len(m.TagValues(tag)) + } + fields := m.FieldNames() + sort.Strings(fields) + series := m.SeriesKeys() + sort.Strings(series) + + // Sample a point from each measurement to determine the field types + fieldSummary := []string{} + for _, shardID := range shardIDs { + shard := tstore.Shard(shardID) + tx, err := shard.ReadOnlyTx() + if err != nil { + fmt.Printf("Failed to get transaction: %v", err) + } + + if len(series) > 0 { + cursor := tx.Cursor(series[0]) + + // Seek to the beginning + _, value := cursor.Seek([]byte{}) + codec := shard.FieldCodec(m.Name) + fields, err := codec.DecodeFieldsWithNames(value) + if err != nil { + fmt.Printf("Failed to decode values: %v", err) + } + + for field, value := range fields { + fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value)) + } + sort.Strings(fieldSummary) + } + tx.Rollback() + break + } + + fmt.Fprintf(tw, "%s\t%s\t%d/%d\t%d [%s]\t%d\n", db, m.Name, len(tags), tagValues, + len(fields), strings.Join(fieldSummary, ","), len(series)) + + } + } + tw.Flush() + +} + +func countSeries(tstore *tsdb.Store) int { + var count int + for _, shardID := range tstore.ShardIDs() { + shard := tstore.Shard(shardID) + cnt, err := shard.SeriesCount() + if err != nil { + fmt.Printf("series count failed: %v\n", err) + continue + } + count += cnt + } + return count +} + +func btou64(b []byte) uint64 { + return binary.BigEndian.Uint64(b) +} + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 4c212cab68..bc7f2fcf7d 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -222,6 +222,20 @@ func (l *Log) Open() error { return nil } +func (l *Log) DiskSize() (int64, error) { + l.mu.RLock() + defer l.mu.RUnlock() + var size int64 + for _, partition := range l.partitions { + stat, err := os.Stat(partition.path) + if err != nil { + return 0, err + } + size += stat.Size() + } + return size, nil +} + // Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { l.mu.RLock() diff --git a/tsdb/meta.go b/tsdb/meta.go index 9dac618517..eb05376c55 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -1196,6 +1196,17 @@ func (m *Measurement) TagKeys() []string { return keys } +// TagValues returns all the values for the given tag key +func (m *Measurement) TagValues(key string) []string { + m.mu.RLock() + defer m.mu.RUnlock() + values := []string{} + for v := range m.seriesByTagKeyValue[key] { + values = append(values, v) + } + return values +} + // SetFieldName adds the field name to the measurement. func (m *Measurement) SetFieldName(name string) { m.mu.Lock() diff --git a/tsdb/shard.go b/tsdb/shard.go index 85f94ef322..72e14bddd2 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -127,6 +127,25 @@ func (s *Shard) close() error { return nil } +// DiskSize returns the size on disk of this shard +func (s *Shard) DiskSize() (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + stats, err := os.Stat(s.path) + var size int64 + if err != nil { + return 0, err + } + size += stats.Size() + return size, nil +} + +// ReadOnlyTx returns a read-only transaction for the shard. The transaction must be rolled back to +// release resources. +func (s *Shard) ReadOnlyTx() (Tx, error) { + return s.engine.Begin(false) +} + // TODO: this is temporarily exported to make tx.go work. When the query engine gets refactored // into the tsdb package this should be removed. No one outside tsdb should know the underlying field encoding scheme. func (s *Shard) FieldCodec(measurementName string) *FieldCodec { diff --git a/tsdb/store.go b/tsdb/store.go index 1d37f654aa..90a996628f 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -174,6 +174,17 @@ func (s *Store) DatabaseIndex(name string) *DatabaseIndex { return s.databaseIndexes[name] } +// Databases returns all the databases in the indexes +func (s *Store) Databases() []string { + s.mu.RLock() + defer s.mu.RUnlock() + databases := []string{} + for db := range s.databaseIndexes { + databases = append(databases, db) + } + return databases +} + func (s *Store) Measurement(database, name string) *Measurement { s.mu.RLock() db := s.databaseIndexes[database] @@ -184,6 +195,22 @@ func (s *Store) Measurement(database, name string) *Measurement { return db.Measurement(name) } +// DiskSize returns the size of all the shard files in bytes. This size does not include the WAL size. +func (s *Store) DiskSize() (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + var size int64 + for _, shardID := range s.ShardIDs() { + shard := s.Shard(shardID) + sz, err := shard.DiskSize() + if err != nil { + return 0, err + } + size += sz + } + return size, nil +} + // deleteSeries loops through the local shards and deletes the series data and metadata for the passed in series keys func (s *Store) deleteSeries(keys []string) error { s.mu.RLock() From 23d4ce2efa96320f5ebe8170d381bdea35e6beaa Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Thu, 27 Aug 2015 09:37:10 -0600 Subject: [PATCH 2/3] Fix panic if a series does not exists in a shard --- cmd/inspect/main.go | 47 +++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/cmd/inspect/main.go b/cmd/inspect/main.go index 72d5940a92..a7bf193183 100644 --- a/cmd/inspect/main.go +++ b/cmd/inspect/main.go @@ -24,6 +24,7 @@ func main() { tstore := tsdb.NewStore(filepath.Join(path, "data")) tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags) + tstore.EngineOptions.Config.Dir = filepath.Join(path, "data") tstore.EngineOptions.Config.WALEnableLogging = false tstore.EngineOptions.Config.WALDir = filepath.Join(path, "wal") if err := tstore.Open(); err != nil { @@ -43,7 +44,7 @@ func main() { tw := tabwriter.NewWriter(os.Stdout, 16, 8, 0, '\t', 0) - fmt.Fprintln(tw, strings.Join([]string{"DB", "Measurement", "Tags [#K/#V]", "Fields [Name:Type]", "Series"}, "\t")) + fmt.Fprintln(tw, strings.Join([]string{"Shard", "DB", "Measurement", "Tags [#K/#V]", "Fields [Name:Type]", "Series"}, "\t")) shardIDs := tstore.ShardIDs() @@ -64,9 +65,9 @@ func main() { sort.Strings(fields) series := m.SeriesKeys() sort.Strings(series) + sort.Sort(ShardIDs(shardIDs)) // Sample a point from each measurement to determine the field types - fieldSummary := []string{} for _, shardID := range shardIDs { shard := tstore.Shard(shardID) tx, err := shard.ReadOnlyTx() @@ -74,33 +75,39 @@ func main() { fmt.Printf("Failed to get transaction: %v", err) } - if len(series) > 0 { - cursor := tx.Cursor(series[0]) + for _, key := range series { + fieldSummary := []string{} + + cursor := tx.Cursor(key) + + // Series doesn't exist in this shard + if cursor == nil { + continue + } // Seek to the beginning _, value := cursor.Seek([]byte{}) codec := shard.FieldCodec(m.Name) - fields, err := codec.DecodeFieldsWithNames(value) - if err != nil { - fmt.Printf("Failed to decode values: %v", err) - } + if codec != nil { + fields, err := codec.DecodeFieldsWithNames(value) + if err != nil { + fmt.Printf("Failed to decode values: %v", err) + } - for field, value := range fields { - fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value)) + for field, value := range fields { + fieldSummary = append(fieldSummary, fmt.Sprintf("%s:%T", field, value)) + } + sort.Strings(fieldSummary) } - sort.Strings(fieldSummary) + fmt.Fprintf(tw, "%d\t%s\t%s\t%d/%d\t%d [%s]\t%d\n", shardID, db, m.Name, len(tags), tagValues, + len(fields), strings.Join(fieldSummary, ","), len(series)) + break } tx.Rollback() - break } - - fmt.Fprintf(tw, "%s\t%s\t%d/%d\t%d [%s]\t%d\n", db, m.Name, len(tags), tagValues, - len(fields), strings.Join(fieldSummary, ","), len(series)) - } } tw.Flush() - } func countSeries(tstore *tsdb.Store) int { @@ -127,3 +134,9 @@ func u64tob(v uint64) []byte { binary.BigEndian.PutUint64(b, v) return b } + +type ShardIDs []uint64 + +func (a ShardIDs) Len() int { return len(a) } +func (a ShardIDs) Less(i, j int) bool { return a[i] < a[j] } +func (a ShardIDs) Swap(i, j int) { a[i], a[j] = a[j], a[i] } From 4a0d31d28478f49c195d5b4226e4fc727d329de1 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Fri, 4 Sep 2015 10:50:11 -0600 Subject: [PATCH 3/3] Fix broken rebase --- cmd/inspect/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/inspect/main.go b/cmd/inspect/main.go index a7bf193183..43b233f54f 100644 --- a/cmd/inspect/main.go +++ b/cmd/inspect/main.go @@ -25,7 +25,7 @@ func main() { tstore := tsdb.NewStore(filepath.Join(path, "data")) tstore.Logger = log.New(ioutil.Discard, "", log.LstdFlags) tstore.EngineOptions.Config.Dir = filepath.Join(path, "data") - tstore.EngineOptions.Config.WALEnableLogging = false + tstore.EngineOptions.Config.WALLoggingEnabled = false tstore.EngineOptions.Config.WALDir = filepath.Join(path, "wal") if err := tstore.Open(); err != nil { fmt.Printf("Failed to open dir: %v\n", err) @@ -78,7 +78,7 @@ func main() { for _, key := range series { fieldSummary := []string{} - cursor := tx.Cursor(key) + cursor := tx.Cursor(key, tsdb.Forward) // Series doesn't exist in this shard if cursor == nil {