Merge pull request #2398 from influxdb/snapshot-backup-err-loggin

Track more stats and report errors for shards
pull/2404/head
Todd Persen 2015-04-22 19:08:32 -07:00
commit 6b76befa98
5 changed files with 56 additions and 7 deletions

View File

@ -1188,6 +1188,8 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
// Generate an ID for each shard.
for _, sh := range g.Shards {
sh.ID = tx.nextShardID()
sh.stats = NewStats(fmt.Sprintf("shard %d", sh.ID))
sh.stats.Inc("create")
}
// Assign data nodes to shards via round robin.
@ -2893,10 +2895,12 @@ func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, us
// Server stats.
serverRow := &influxql.Row{Columns: []string{}}
serverRow.Name = s.stats.Name()
var values []interface{}
s.stats.Walk(func(k string, v int64) {
serverRow.Columns = append(serverRow.Columns, k)
serverRow.Values = append(serverRow.Values, []interface{}{v})
values = append(values, v)
})
serverRow.Values = append(serverRow.Values, values)
rows = append(rows, serverRow)
// Shard-level stats.
@ -2908,10 +2912,12 @@ func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, us
row := &influxql.Row{Columns: []string{}}
row.Name = sh.stats.Name()
var values []interface{}
sh.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
values = append(values, v)
})
row.Values = append(row.Values, values)
rows = append(rows, row)
}

View File

@ -86,18 +86,21 @@ func (s *Shard) open(path string, conn MessagingConn) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.stats == nil {
s.stats = NewStats(fmt.Sprintf("shard %d", s.ID))
}
s.stats.Inc("open")
// Return an error if the shard is already open.
if s.store != nil {
s.stats.Inc("errAlreadyOpen")
return errors.New("shard already open")
}
if s.stats == nil {
s.stats = NewStats("shard")
}
// Open store on shard.
store, err := bolt.Open(path, 0666, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
s.stats.Inc("errBoltOpenFailure")
return err
}
s.store = store
@ -117,6 +120,7 @@ func (s *Shard) open(path string, conn MessagingConn) error {
return nil
}); err != nil {
s.stats.Inc("errBoltStoreUpdateFailure")
_ = s.close()
return fmt.Errorf("init: %s", err)
}
@ -176,6 +180,9 @@ func (s *Shard) close() error {
if s.store != nil {
_ = s.store.Close()
}
if s.stats != nil {
s.stats.Inc("close")
}
return nil
}

View File

@ -600,7 +600,7 @@ func createShardSnapshotFile(sh *Shard) (*SnapshotFile, SnapshotFileWriter, erro
// Begin transaction.
tx, err := sh.store.Begin(false)
if err != nil {
return nil, nil, fmt.Errorf("begin: %s", err)
return nil, nil, fmt.Errorf("begin - stats %s, err %s", sh.stats, err)
}
// Create and return file and writer.

View File

@ -1,6 +1,8 @@
package influxdb
import (
"fmt"
"sort"
"sync"
)
@ -112,3 +114,25 @@ func (s *Stats) Snapshot() *Stats {
})
return snap
}
func (s *Stats) String() string {
var out string
stat := s.Snapshot()
var keys []string
for k, _ := range stat.m {
keys = append(keys, k)
}
sort.Strings(keys)
out += `{"` + stat.name + `":[`
var j int
for _, k := range keys {
v := stat.m[k].i
out += `{"` + k + `":` + fmt.Sprintf("%d", v) + `}`
j++
if j != len(keys) {
out += `,`
}
}
out += `]}`
return out
}

View File

@ -86,3 +86,15 @@ func TestStats_Snapshot(t *testing.T) {
t.Fatalf("stats snapshot returned unexpected result: %#v", bar)
}
}
func TestStats_String(t *testing.T) {
foo := influxdb.NewStats("server")
foo.Set("a", 100)
foo.Set("b", 600)
if exp, got := `{"server":[{"a":100},{"b":600}]}`, foo.String(); exp != got {
t.Log("exp: ", exp)
t.Log("got: ", got)
t.Fatalf("failed to get string")
}
}