feat #9212: add ability to generate shard digests

pull/9213/head
David Norton 2017-11-13 20:35:53 -05:00
parent cfc742885d
commit 4e13248d85
8 changed files with 529 additions and 0 deletions

View File

@ -45,6 +45,7 @@ type Engine interface {
Export(w io.Writer, basePath string, start time.Time, end time.Time) error
Restore(r io.Reader, basePath string) error
Import(r io.Reader, basePath string) error
Digest() (io.ReadCloser, error)
CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
CreateCursor(ctx context.Context, r *CursorRequest) (Cursor, error)

136
tsdb/engine/tsm1/digest.go Normal file
View File

@ -0,0 +1,136 @@
package tsm1
import (
"bytes"
"fmt"
"io"
"math"
"os"
"path/filepath"
"sort"
)
type DigestOptions struct {
MinTime, MaxTime int64
MinKey, MaxKey []byte
}
// DigestWithOptions writes a digest of dir to w using options to filter by
// time and key range.
func DigestWithOptions(dir string, opts DigestOptions, w io.WriteCloser) error {
if dir == "" {
return fmt.Errorf("dir is required")
}
files, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("*.%s", TSMFileExtension)))
if err != nil {
return err
}
readers := make([]*TSMReader, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
readers = append(readers, r)
}
ch := make([]chan seriesKey, 0, len(files))
for _, fi := range files {
f, err := os.Open(fi)
if err != nil {
return err
}
r, err := NewTSMReader(f)
if err != nil {
return err
}
defer r.Close()
s := make(chan seriesKey)
ch = append(ch, s)
go func() {
for i := 0; i < r.KeyCount(); i++ {
key, typ := r.KeyAt(i)
if len(opts.MinKey) > 0 && bytes.Compare(key, opts.MinKey) < 0 {
continue
}
if len(opts.MaxKey) > 0 && bytes.Compare(key, opts.MaxKey) > 0 {
continue
}
s <- seriesKey{key: key, typ: typ}
}
close(s)
}()
}
dw, err := NewDigestWriter(w)
if err != nil {
return err
}
defer dw.Close()
var n int
for key := range merge(ch...) {
ts := &DigestTimeSpan{}
n++
kstr := string(key.key)
for _, r := range readers {
entries := r.Entries(key.key)
for _, entry := range entries {
crc, b, err := r.ReadBytes(&entry, nil)
if err != nil {
return err
}
// Filter blocks that are outside the time filter. If they overlap, we
// still include them.
if entry.MaxTime < opts.MinTime || entry.MinTime > opts.MaxTime {
continue
}
cnt := BlockCount(b)
ts.Add(entry.MinTime, entry.MaxTime, cnt, crc)
}
}
sort.Sort(ts)
if err := dw.WriteTimeSpan(kstr, ts); err != nil {
return err
}
}
return dw.Close()
}
// Digest writes a digest of dir to w of a full shard dir.
func Digest(dir string, w io.WriteCloser) error {
return DigestWithOptions(dir, DigestOptions{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
}, w)
}
type rwPair struct {
r *TSMReader
w TSMWriter
outf *os.File
}
func (rw *rwPair) close() {
rw.r.Close()
rw.w.Close()
rw.outf.Close()
}

View File

@ -0,0 +1,66 @@
package tsm1
import (
"bufio"
"compress/gzip"
"encoding/binary"
"io"
)
type DigestReader struct {
io.ReadCloser
}
func NewDigestReader(r io.ReadCloser) (*DigestReader, error) {
gr, err := gzip.NewReader(bufio.NewReader(r))
if err != nil {
return nil, err
}
return &DigestReader{ReadCloser: gr}, nil
}
func (w *DigestReader) ReadTimeSpan() (string, *DigestTimeSpan, error) {
var n uint16
if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil {
return "", nil, err
}
b := make([]byte, n)
if _, err := io.ReadFull(w.ReadCloser, b); err != nil {
return "", nil, err
}
var cnt uint32
if err := binary.Read(w.ReadCloser, binary.BigEndian, &cnt); err != nil {
return "", nil, err
}
ts := &DigestTimeSpan{}
for i := 0; i < int(cnt); i++ {
var min, max int64
var crc uint32
if err := binary.Read(w.ReadCloser, binary.BigEndian, &min); err != nil {
return "", nil, err
}
if err := binary.Read(w.ReadCloser, binary.BigEndian, &max); err != nil {
return "", nil, err
}
if err := binary.Read(w.ReadCloser, binary.BigEndian, &crc); err != nil {
return "", nil, err
}
if err := binary.Read(w.ReadCloser, binary.BigEndian, &n); err != nil {
return "", nil, err
}
ts.Add(min, max, int(n), crc)
}
return string(b), ts, nil
}
func (w *DigestReader) Close() error {
return w.ReadCloser.Close()
}

View File

@ -0,0 +1,95 @@
package tsm1
import (
"compress/gzip"
"encoding/binary"
"io"
)
type writeFlushCloser interface {
Close() error
Write(b []byte) (int, error)
Flush() error
}
// DigestWriter allows for writing a digest of a shard. A digest is a condensed
// representation of the contents of a shard. It can be scoped to one or more series
// keys, ranges of times or sets of files.
type DigestWriter struct {
F writeFlushCloser
}
func NewDigestWriter(w io.WriteCloser) (*DigestWriter, error) {
gw := gzip.NewWriter(w)
return &DigestWriter{F: gw}, nil
}
func (w *DigestWriter) WriteTimeSpan(key string, t *DigestTimeSpan) error {
if err := binary.Write(w.F, binary.BigEndian, uint16(len(key))); err != nil {
return err
}
if _, err := w.F.Write([]byte(key)); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, uint32(t.Len())); err != nil {
return err
}
for _, tr := range t.Ranges {
if err := binary.Write(w.F, binary.BigEndian, tr.Min); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, tr.Max); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, tr.CRC); err != nil {
return err
}
if err := binary.Write(w.F, binary.BigEndian, uint16(tr.N)); err != nil {
return err
}
}
return nil
}
func (w *DigestWriter) Flush() error {
return w.F.Flush()
}
func (w *DigestWriter) Close() error {
if err := w.Flush(); err != nil {
return err
}
return w.F.Close()
}
type DigestTimeSpan struct {
Ranges []DigestTimeRange
}
func (a DigestTimeSpan) Len() int { return len(a.Ranges) }
func (a DigestTimeSpan) Swap(i, j int) { a.Ranges[i], a.Ranges[j] = a.Ranges[j], a.Ranges[i] }
func (a DigestTimeSpan) Less(i, j int) bool {
return a.Ranges[i].Min < a.Ranges[j].Min
}
func (t *DigestTimeSpan) Add(min, max int64, n int, crc uint32) {
for _, v := range t.Ranges {
if v.Min == min && v.Max == max && v.N == n && v.CRC == crc {
return
}
}
t.Ranges = append(t.Ranges, DigestTimeRange{Min: min, Max: max, N: n, CRC: crc})
}
type DigestTimeRange struct {
Min, Max int64
N int
CRC uint32
}

View File

@ -226,6 +226,48 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string,
return e
}
// Digest returns a reader for the shard's digest.
func (e *Engine) Digest() (io.ReadCloser, error) {
digestPath := filepath.Join(e.path, "digest.tsd")
// See if there's an existing digest file on disk.
f, err := os.Open(digestPath)
if err == nil {
// There is an existing digest file. Now see if it is still fresh.
fi, err := f.Stat()
if err != nil {
return nil, err
}
if !e.LastModified().After(fi.ModTime()) {
// Existing digest is still fresh so return a reader for it.
return f, nil
}
}
// Either no digest existed or the existing one was stale
// so generate a new digest.
// Create a tmp file to write the digest to.
tf, err := os.Create(digestPath + ".tmp")
if err != nil {
return nil, err
}
// Write the new digest to the tmp file.
if err := Digest(e.path, tf); err != nil {
tf.Close()
os.Remove(tf.Name())
return nil, err
}
// Rename the temporary digest file to the actual digest file.
renameFile(tf.Name(), digestPath)
// Create and return a reader for the new digest file.
return os.Open(digestPath)
}
// SetEnabled sets whether the engine is enabled.
func (e *Engine) SetEnabled(enabled bool) {
e.enableCompactionsOnOpen = enabled

View File

@ -148,6 +148,165 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
}
}
// Ensure that the engine can write & read shard digest files.
func TestEngine_Digest(t *testing.T) {
// Create a tmp directory for test files.
tmpDir, err := ioutil.TempDir("", "TestEngine_Digest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
walPath := filepath.Join(tmpDir, "wal")
os.MkdirAll(walPath, 0777)
idxPath := filepath.Join(tmpDir, "index")
// Create an engine to write a tsm file.
dbName := "db0"
opt := tsdb.NewEngineOptions()
opt.InmemIndex = inmem.NewIndex(dbName)
idx := tsdb.MustOpenIndex(1, dbName, idxPath, opt)
defer idx.Close()
e := tsm1.NewEngine(1, idx, dbName, tmpDir, walPath, opt).(*tsm1.Engine)
if err := e.Open(); err != nil {
t.Fatalf("failed to open tsm1 engine: %s", err.Error())
}
// Create a few points.
points := []models.Point{
MustParsePointString("cpu,host=A value=1.1 1000000000"),
MustParsePointString("cpu,host=B value=1.2 2000000000"),
}
if err := e.WritePoints(points); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Force a compaction.
e.ScheduleFullCompaction()
digest := func() ([]span, error) {
// Get a reader for the shard's digest.
r, err := e.Digest()
if err != nil {
return nil, err
}
// Make sure the digest can be read.
dr, err := tsm1.NewDigestReader(r)
if err != nil {
return nil, err
}
got := []span{}
for {
k, s, err := dr.ReadTimeSpan()
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
got = append(got, span{
key: k,
tspan: s,
})
}
if err := dr.Close(); err != nil {
return nil, err
}
return got, nil
}
exp := []span{
span{
key: "cpu,host=A#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 1000000000,
Max: 1000000000,
N: 1,
CRC: 1048747083,
},
},
},
},
span{
key: "cpu,host=B#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 2000000000,
Max: 2000000000,
N: 1,
CRC: 734984746,
},
},
},
},
}
for n := 0; n < 2; n++ {
got, err := digest()
if err != nil {
t.Fatalf("n = %d: %s", n, err)
}
// Make sure the data in the digest was valid.
if !reflect.DeepEqual(exp, got) {
t.Fatalf("n = %d\nexp = %v\ngot = %v\n", n, exp, got)
}
}
// Test that writing more points causes the digest to be updated.
points = []models.Point{
MustParsePointString("cpu,host=C value=1.1 3000000000"),
}
if err := e.WritePoints(points); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}
// Force a compaction.
e.ScheduleFullCompaction()
// Get new digest.
got, err := digest()
if err != nil {
t.Fatal(err)
}
exp = append(exp, span{
key: "cpu,host=C#!~#value",
tspan: &tsm1.DigestTimeSpan{
Ranges: []tsm1.DigestTimeRange{
tsm1.DigestTimeRange{
Min: 3000000000,
Max: 3000000000,
N: 1,
CRC: 2553233514,
},
},
},
})
if !reflect.DeepEqual(exp, got) {
t.Fatalf("\nexp = %v\ngot = %v\n", exp, got)
}
}
type span struct {
key string
tspan *tsm1.DigestTimeSpan
}
// Ensure that the engine will backup any TSM files created since the passed in time
func TestEngine_Backup(t *testing.T) {
// Generate temporary file.

View File

@ -71,6 +71,10 @@ var (
// the file's magic number.
ErrUnknownFieldsFormat = errors.New("unknown field index format")
// ErrShardNotIdle is returned when an operation requring the shard to be idle/cold is
// attempted on a hot shard.
ErrShardNotIdle = errors.New("shard not idle")
// fieldsIndexMagicNumber is the file magic number for the fields index file.
fieldsIndexMagicNumber = []byte{0, 6, 1, 3}
)
@ -1172,6 +1176,22 @@ func (s *Shard) TagKeyCardinality(name, key []byte) int {
return engine.TagKeyCardinality(name, key)
}
// Digest returns a digest of the shard.
func (s *Shard) Digest() (io.ReadCloser, error) {
engine, err := s.engine()
if err != nil {
return nil, err
}
// Make sure the shard is idle/cold. (No use creating a digest of a
// hot shard that is rapidly changing.)
if !engine.IsIdle() {
return nil, ErrShardNotIdle
}
return engine.Digest()
}
// engine safely (under an RLock) returns a reference to the shard's Engine, or
// an error if the Engine is closed, or the shard is currently disabled.
//

View File

@ -379,6 +379,16 @@ func (s *Store) ShardN() int {
return len(s.shards)
}
// ShardDigest returns a digest of the shard with the specified ID.
func (s *Store) ShardDigest(id uint64) (io.ReadCloser, error) {
sh := s.Shard(id)
if sh == nil {
return nil, ErrShardNotFound
}
return sh.Digest()
}
// CreateShard creates a shard with the given id and retention policy on a database.
func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
s.mu.Lock()