commit
bd512762db
|
@ -256,6 +256,7 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
c := bkt.Cursor()
|
||||
|
||||
// Ensure the slice is sorted before retrieving the time range.
|
||||
a = DedupeEntries(a)
|
||||
sort.Sort(byteSlices(a))
|
||||
|
||||
// Determine time range of new data.
|
||||
|
@ -278,36 +279,38 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
}
|
||||
|
||||
// If time range overlaps existing blocks then unpack full range and reinsert.
|
||||
var existing [][]byte
|
||||
for k, v := c.First(); k != nil; k, v = c.Next() {
|
||||
// Determine block range.
|
||||
bmin, bmax := int64(btou64(k)), int64(btou64(v[0:8]))
|
||||
|
||||
// Exit loop if the lowest block time is greater than the max insert time.
|
||||
if bmin > tmax {
|
||||
// Skip over all blocks before the time range.
|
||||
// Exit once we reach a block that is beyond our time range.
|
||||
if bmax < tmin {
|
||||
continue
|
||||
} else if bmin > tmax {
|
||||
break
|
||||
}
|
||||
|
||||
// If range overlaps with inserted range then read out data and delete block.
|
||||
if (bmin >= tmin && bmin <= tmax) || (bmax >= tmin && bmax <= tmax) {
|
||||
// Decode block.
|
||||
buf, err := snappy.Decode(nil, v[8:])
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode block: %s", err)
|
||||
}
|
||||
|
||||
// Copy out any entries that aren't being overwritten.
|
||||
for _, entry := range SplitEntries(buf) {
|
||||
if _, ok := m[int64(btou64(entry[0:8]))]; !ok {
|
||||
a = append(a, entry)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete block in database.
|
||||
c.Delete()
|
||||
// Decode block.
|
||||
buf, err := snappy.Decode(nil, v[8:])
|
||||
if err != nil {
|
||||
return fmt.Errorf("decode block: %s", err)
|
||||
}
|
||||
|
||||
// Copy out any entries that aren't being overwritten.
|
||||
for _, entry := range SplitEntries(buf) {
|
||||
if _, ok := m[int64(btou64(entry[0:8]))]; !ok {
|
||||
existing = append(existing, entry)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete block in database.
|
||||
c.Delete()
|
||||
}
|
||||
|
||||
// Sort entries before rewriting.
|
||||
// Merge entries before rewriting.
|
||||
a = append(existing, a...)
|
||||
sort.Sort(byteSlices(a))
|
||||
|
||||
// Rewrite points to new blocks.
|
||||
|
@ -322,6 +325,9 @@ func (e *Engine) writeIndex(tx *bolt.Tx, key string, a [][]byte) error {
|
|||
func (e *Engine) writeBlocks(bkt *bolt.Bucket, a [][]byte) error {
|
||||
var block []byte
|
||||
|
||||
// Dedupe points by key.
|
||||
a = DedupeEntries(a)
|
||||
|
||||
// Group points into blocks by size.
|
||||
tmin, tmax := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
for i, p := range a {
|
||||
|
@ -578,6 +584,26 @@ func SplitEntries(b []byte) [][]byte {
|
|||
}
|
||||
}
|
||||
|
||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||
func DedupeEntries(a [][]byte) [][]byte {
|
||||
// Convert to a map where the last slice is used.
|
||||
m := make(map[string][]byte)
|
||||
for _, b := range a {
|
||||
m[string(b[0:8])] = b
|
||||
}
|
||||
|
||||
// Convert map back to a slice of byte slices.
|
||||
other := make([][]byte, 0, len(m))
|
||||
for _, v := range m {
|
||||
other = append(other, v)
|
||||
}
|
||||
|
||||
// Sort entries.
|
||||
sort.Sort(byteSlices(other))
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
// entryHeaderSize is the number of bytes required for the header.
|
||||
const entryHeaderSize = 8 + 4
|
||||
|
||||
|
|
|
@ -259,16 +259,21 @@ func TestEngine_WriteIndex_Quick(t *testing.T) {
|
|||
t.Skip("short mode")
|
||||
}
|
||||
|
||||
quick.Check(func(points Points, blockSize int) bool {
|
||||
quick.Check(func(sets []Points, blockSize int) bool {
|
||||
e := OpenDefaultEngine()
|
||||
e.BlockSize = blockSize % 1024 // 1KB max block size
|
||||
defer e.Close()
|
||||
|
||||
// Write points to index.
|
||||
if err := e.WriteIndex(map[string][][]byte(points)); err != nil {
|
||||
t.Fatal(err)
|
||||
// Write points to index in multiple sets.
|
||||
for _, set := range sets {
|
||||
if err := e.WriteIndex(map[string][][]byte(set)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Merge all points together.
|
||||
points := MergePoints(sets)
|
||||
|
||||
// Retrieve a sorted list of keys so results are deterministic.
|
||||
keys := points.Keys()
|
||||
|
||||
|
@ -377,11 +382,11 @@ func (Points) Generate(rand *rand.Rand, size int) reflect.Value {
|
|||
// Generate series with a random number of points in each.
|
||||
m := make(map[string][][]byte)
|
||||
for i, seriesN := 0, rand.Intn(size); i < seriesN; i++ {
|
||||
key := strconv.Itoa(rand.Intn(1000))
|
||||
key := strconv.Itoa(rand.Intn(20))
|
||||
|
||||
// Generate points for the series.
|
||||
for j, pointN := 0, rand.Intn(size); j < pointN; j++ {
|
||||
timestamp := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC).Add(time.Duration(rand.Int63n(int64(365 * 24 * time.Hour))))
|
||||
timestamp := time.Unix(0, 0).Add(time.Duration(rand.Intn(100)))
|
||||
data, ok := quick.Value(reflect.TypeOf([]byte(nil)), rand)
|
||||
if !ok {
|
||||
panic("cannot generate data")
|
||||
|
@ -393,6 +398,25 @@ func (Points) Generate(rand *rand.Rand, size int) reflect.Value {
|
|||
return reflect.ValueOf(Points(m))
|
||||
}
|
||||
|
||||
// MergePoints returns a map of all points merged together by key.
|
||||
// Later points will overwrite earlier ones.
|
||||
func MergePoints(a []Points) Points {
|
||||
// Combine all points into one set.
|
||||
m := make(Points)
|
||||
for _, set := range a {
|
||||
for key, values := range set {
|
||||
m[key] = append(m[key], values...)
|
||||
}
|
||||
}
|
||||
|
||||
// Dedupe points.
|
||||
for key, values := range m {
|
||||
m[key] = bz1.DedupeEntries(values)
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// copyBytes returns a copy of a byte slice.
|
||||
func copyBytes(b []byte) []byte {
|
||||
if b == nil {
|
||||
|
|
Loading…
Reference in New Issue