Basic instrumentation for shards

pull/4006/head
Philip O'Toole 2015-09-04 15:43:57 -07:00
parent 2b0a40e262
commit 3db9cc9b76
1 changed files with 31 additions and 1 deletions

View File

@ -4,12 +4,14 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"math"
"os"
"sync"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb/internal"
@ -17,6 +19,15 @@ import (
"github.com/gogo/protobuf/proto"
)
const (
statWriteReq = "write_req"
statSeriesCreate = "series_create"
statFieldsCreate = "fields_create"
statWritePointsFail = "write_points_fail"
statWritePointsOK = "write_points_ok"
statWriteBytes = "write_bytes"
)
var (
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors.New("field overflow")
@ -49,12 +60,20 @@ type Shard struct {
mu sync.RWMutex
measurementFields map[string]*MeasurementFields // measurement name to their fields
// expvar-based stats.
statMap *expvar.Map
// The writer used by the logger.
LogOutput io.Writer
}
// NewShard returns a new initialized Shard. walPath doesn't apply to the b1 type index
func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, options EngineOptions) *Shard {
// Configure statistics collection.
key := fmt.Sprintf("shard:%s:%d", path, id)
tags := map[string]string{"path": path, "id": fmt.Sprintf("%d", id), "engine": options.EngineVersion}
statMap := influxdb.NewStatistics(key, "shard", tags)
return &Shard{
index: index,
path: path,
@ -63,6 +82,7 @@ func NewShard(id uint64, index *DatabaseIndex, path string, walPath string, opti
options: options,
measurementFields: make(map[string]*MeasurementFields),
statMap: statMap,
LogOutput: os.Stderr,
}
}
@ -172,10 +192,14 @@ type SeriesCreate struct {
// WritePoints will write the raw data points and any new metadata to the index in the shard
func (s *Shard) WritePoints(points []Point) error {
s.statMap.Add(statWriteReq, 1)
seriesToCreate, fieldsToCreate, seriesToAddShardTo, err := s.validateSeriesAndFields(points)
if err != nil {
return err
}
s.statMap.Add(statSeriesCreate, int64(len(seriesToCreate)))
s.statMap.Add(statFieldsCreate, int64(len(fieldsToCreate)))
// add any new series to the in-memory index
if len(seriesToCreate) > 0 {
@ -229,8 +253,10 @@ func (s *Shard) WritePoints(points []Point) error {
// Write to the engine.
if err := s.engine.WritePoints(points, measurementFieldsToSave, seriesToCreate); err != nil {
s.statMap.Add(statWritePointsFail, 1)
return fmt.Errorf("engine: %s", err)
}
s.statMap.Add(statWritePointsOK, int64(len(points)))
return nil
}
@ -402,7 +428,11 @@ func (s *Shard) validateSeriesAndFields(points []Point) ([]*SeriesCreate, []*Fie
func (s *Shard) SeriesCount() (int, error) { return s.engine.SeriesCount() }
// WriteTo writes the shard's data to w.
func (s *Shard) WriteTo(w io.Writer) (int64, error) { return s.engine.WriteTo(w) }
func (s *Shard) WriteTo(w io.Writer) (int64, error) {
n, err := s.engine.WriteTo(w)
s.statMap.Add(statWriteBytes, int64(n))
return n, err
}
type MeasurementFields struct {
Fields map[string]*Field `json:"fields"`