add bz1 testing/quick coverage
parent
4077148245
commit
f7111e037b
|
@ -255,6 +255,9 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
}
|
||||
c := bkt.Cursor()
|
||||
|
||||
// Ensure the slice is sorted before retrieving the time range.
|
||||
sort.Sort(byteSlices(a))
|
||||
|
||||
// Determine time range of new data.
|
||||
tmin, tmax := int64(btou64(a[0][0:8])), int64(btou64(a[len(a)-1][0:8]))
|
||||
|
||||
|
@ -347,7 +350,8 @@ func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
|
|||
return fmt.Errorf("put: ts=%d-%d, err=%s", tmin, tmax, err)
|
||||
}
|
||||
|
||||
// Reset the time range.
|
||||
// Reset the block & time range.
|
||||
block = nil
|
||||
tmin, tmax = int64(math.MaxInt64), int64(math.MinInt64)
|
||||
}
|
||||
}
|
||||
|
@ -509,7 +513,7 @@ func (c *Cursor) setBuf(block []byte) {
|
|||
|
||||
// Otherwise decode block into buffer.
|
||||
// Skip over the first 8 bytes since they are the max timestamp.
|
||||
buf, err := snappy.Decode(c.buf, block[8:])
|
||||
buf, err := snappy.Decode(nil, block[8:])
|
||||
if err != nil {
|
||||
c.buf = c.buf[0:0]
|
||||
log.Printf("block decode error: %s", err)
|
||||
|
|
|
@ -3,10 +3,15 @@ package bz1_test
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
|
@ -117,6 +122,20 @@ func TestEngine_WritePoints_PointsWriter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the engine can return errors from the points writer.
|
||||
func TestEngine_WritePoints_ErrPointsWriter(t *testing.T) {
|
||||
e := OpenDefaultEngine()
|
||||
defer e.Close()
|
||||
|
||||
// Ensure points writer returns an error.
|
||||
e.PointsWriter.WritePointsFn = func(a []tsdb.Point) error { return errors.New("marker") }
|
||||
|
||||
// Write to engine.
|
||||
if err := e.WritePoints(nil, nil, nil); err == nil || err.Error() != `write points: marker` {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the engine can write points to the index.
|
||||
func TestEngine_WriteIndex_Append(t *testing.T) {
|
||||
e := OpenDefaultEngine()
|
||||
|
@ -216,6 +235,73 @@ func TestEngine_WriteIndex_Insert(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure the engine ignores writes without keys.
|
||||
func TestEngine_WriteIndex_NoKeys(t *testing.T) {
|
||||
e := OpenDefaultEngine()
|
||||
defer e.Close()
|
||||
if err := e.WriteIndex(nil); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the engine ignores writes without points in a key.
|
||||
func TestEngine_WriteIndex_NoPoints(t *testing.T) {
|
||||
e := OpenDefaultEngine()
|
||||
defer e.Close()
|
||||
if err := e.WriteIndex(map[string][][]byte{"cpu": nil}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the engine ignores writes without points in a key.
|
||||
func TestEngine_WriteIndex_Quick(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("short mode")
|
||||
}
|
||||
|
||||
quick.Check(func(points Points, blockSize int) bool {
|
||||
e := OpenDefaultEngine()
|
||||
e.BlockSize = blockSize % 1024 // 1KB max block size
|
||||
defer e.Close()
|
||||
|
||||
// Write points to index.
|
||||
if err := e.WriteIndex(map[string][][]byte(points)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Retrieve a sorted list of keys so results are deterministic.
|
||||
keys := points.Keys()
|
||||
|
||||
// Start transaction to read index.
|
||||
tx := e.MustBegin(false)
|
||||
defer tx.Rollback()
|
||||
|
||||
// Iterate over results to ensure they are correct.
|
||||
for _, key := range keys {
|
||||
c := tx.Cursor(key)
|
||||
|
||||
// Read list of key/values.
|
||||
var got [][]byte
|
||||
for k, v := c.Seek(u64tob(0)); k != nil; k, v = c.Next() {
|
||||
got = append(got, append(copyBytes(k), v...))
|
||||
}
|
||||
|
||||
// Generate expected values.
|
||||
// We need to remove the data length from the slice.
|
||||
var exp [][]byte
|
||||
for _, b := range points[key] {
|
||||
exp = append(exp, append(copyBytes(b[0:8]), b[12:]...)) // remove data len
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("points: block size=%d, key=%s:\n\ngot=%x\n\nexp=%x\n\n", e.BlockSize, key, got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// Engine represents a test wrapper for bz1.Engine.
|
||||
type Engine struct {
|
||||
*bz1.Engine
|
||||
|
@ -274,6 +360,39 @@ func (w *EnginePointsWriter) WritePoints(points []tsdb.Point) error {
|
|||
return w.WritePointsFn(points)
|
||||
}
|
||||
|
||||
// Points represents a set of encoded points by key. Implements quick.Generator.
|
||||
type Points map[string][][]byte
|
||||
|
||||
// Keys returns a sorted list of keys.
|
||||
func (m Points) Keys() []string {
|
||||
var keys []string
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
|
||||
func (Points) Generate(rand *rand.Rand, size int) reflect.Value {
|
||||
// Generate series with a random number of points in each.
|
||||
m := make(map[string][][]byte)
|
||||
for i, seriesN := 0, rand.Intn(size); i < seriesN; i++ {
|
||||
key := strconv.Itoa(rand.Intn(1000))
|
||||
|
||||
// Generate points for the series.
|
||||
for j, pointN := 0, rand.Intn(size); j < pointN; j++ {
|
||||
timestamp := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC).Add(time.Duration(rand.Int63n(int64(365 * 24 * time.Hour))))
|
||||
data, ok := quick.Value(reflect.TypeOf([]byte(nil)), rand)
|
||||
if !ok {
|
||||
panic("cannot generate data")
|
||||
}
|
||||
m[key] = append(m[key], bz1.MarshalEntry(timestamp.UnixNano(), data.Interface().([]byte)))
|
||||
}
|
||||
}
|
||||
|
||||
return reflect.ValueOf(Points(m))
|
||||
}
|
||||
|
||||
// copyBytes returns a copy of a byte slice.
|
||||
func copyBytes(b []byte) []byte {
|
||||
if b == nil {
|
||||
|
|
Loading…
Reference in New Issue