Merge pull request #4007 from influxdb/instrument_bz1

Basic instrumentation for bz1 engine
pull/4006/head
Philip O'Toole 2015-09-08 19:56:06 -07:00
commit 2b0a40e262
1 changed files with 27 additions and 0 deletions

View File

@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"log"
@ -15,6 +16,7 @@ import (
"github.com/boltdb/bolt"
"github.com/golang/snappy"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/tsdb/engine/wal"
)
@ -29,6 +31,15 @@ const (
Format = "bz1"
)
const (
statSlowInsert = "slow_insert"
statPointsWrite = "points_write"
statPointsWriteDedupe = "points_write_dedupe"
statBlocksWrite = "blks_write"
statBlocksWriteBytes = "blks_write_bytes"
statBlocksWriteBytesCompress = "blks_write_bytes_c"
)
func init() {
tsdb.RegisterEngine(Format, NewEngine)
}
@ -47,6 +58,9 @@ type Engine struct {
path string
db *bolt.DB
// expvar-based statistics collection.
statMap *expvar.Map
// Write-ahead log storage.
WAL WAL
@ -67,6 +81,11 @@ type WAL interface {
// NewEngine returns a new instance of Engine.
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
// Configure statistics collection.
key := fmt.Sprintf("engine:%s:%s", opt.EngineVersion, path)
tags := map[string]string{"path": path, "version": opt.EngineVersion}
statMap := influxdb.NewStatistics(key, "engine", tags)
// create the writer with a directory of the same name as the shard, but with the wal extension
w := wal.NewLog(walPath)
@ -81,6 +100,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
e := &Engine{
path: path,
statMap: statMap,
BlockSize: DefaultBlockSize,
WAL: w,
}
@ -337,6 +357,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
if len(a) == 0 {
return nil
}
e.statMap.Add(statPointsWrite, int64(len(a)))
// Create or retrieve series bucket.
bkt, err := tx.Bucket([]byte("points")).CreateBucketIfNotExists([]byte(key))
@ -347,6 +368,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
// Ensure the slice is sorted before retrieving the time range.
a = tsdb.DedupeEntries(a)
e.statMap.Add(statPointsWriteDedupe, int64(len(a)))
// Convert the raw time and byte slices to entries with lengths
for i, p := range a {
@ -385,6 +407,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
}
// Otherwise fallthrough to slower insert mode.
e.statMap.Add(statSlowInsert, 1)
}
// Generate map of inserted keys.
@ -458,6 +481,9 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
// If the block is larger than the target block size or this is the
// last point then flush the block to the bucket.
if len(block) >= e.BlockSize || i == len(a)-1 {
e.statMap.Add(statBlocksWrite, 1)
e.statMap.Add(statBlocksWriteBytes, int64(len(block)))
// Encode block in the following format:
// tmax int64
// data []byte (snappy compressed)
@ -467,6 +493,7 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
if err := bkt.Put(u64tob(uint64(tmin)), value); err != nil {
return fmt.Errorf("put: ts=%d-%d, err=%s", tmin, tmax, err)
}
e.statMap.Add(statBlocksWriteBytesCompress, int64(len(value)))
// Reset the block & time range.
block = nil