Merge pull request #5833 from jonseymour/jss-5832-snapshot-may-not-be-sorted
tsm: cache: need to check that snapshot has been sortedpull/5841/head
commit
55a503671d
tsdb/engine/tsm1
|
@ -46,6 +46,7 @@
|
||||||
- [#5753](https://github.com/influxdata/influxdb/pull/5753): Ensures that drop-type commands work correctly in a cluster
|
- [#5753](https://github.com/influxdata/influxdb/pull/5753): Ensures that drop-type commands work correctly in a cluster
|
||||||
- [#5814](https://github.com/influxdata/influxdb/issues/5814): Run CQs with the same name from different databases
|
- [#5814](https://github.com/influxdata/influxdb/issues/5814): Run CQs with the same name from different databases
|
||||||
- [#5699](https://github.com/influxdata/influxdb/issues/5699): Fix potential thread safety issue in cache @jonseymour
|
- [#5699](https://github.com/influxdata/influxdb/issues/5699): Fix potential thread safety issue in cache @jonseymour
|
||||||
|
- [#5832](https://github.com/influxdata/influxdb/issues/5832): tsm: cache: need to check that snapshot has been sorted @jonseymour
|
||||||
|
|
||||||
## v0.10.1 [2016-02-18]
|
## v0.10.1 [2016-02-18]
|
||||||
|
|
||||||
|
|
|
@ -316,6 +316,7 @@ func (c *Cache) merged(key string) Values {
|
||||||
if c.snapshot != nil {
|
if c.snapshot != nil {
|
||||||
snapshotEntries := c.snapshot.store[key]
|
snapshotEntries := c.snapshot.store[key]
|
||||||
if snapshotEntries != nil {
|
if snapshotEntries != nil {
|
||||||
|
snapshotEntries.deduplicate() // guarantee we are deduplicated
|
||||||
entries = append(entries, snapshotEntries)
|
entries = append(entries, snapshotEntries)
|
||||||
sz += snapshotEntries.count()
|
sz += snapshotEntries.count()
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,8 @@ func TestCache_CacheSnapshot(t *testing.T) {
|
||||||
v3 := NewValue(time.Unix(5, 0).UTC(), 4.0)
|
v3 := NewValue(time.Unix(5, 0).UTC(), 4.0)
|
||||||
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
|
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
|
||||||
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)
|
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)
|
||||||
|
v6 := NewValue(time.Unix(7, 0).UTC(), 5.0)
|
||||||
|
v7 := NewValue(time.Unix(2, 0).UTC(), 5.0)
|
||||||
|
|
||||||
c := NewCache(512, "")
|
c := NewCache(512, "")
|
||||||
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
|
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
|
||||||
|
@ -178,10 +180,31 @@ func TestCache_CacheSnapshot(t *testing.T) {
|
||||||
|
|
||||||
// Clear snapshot, ensuring non-snapshot data untouched.
|
// Clear snapshot, ensuring non-snapshot data untouched.
|
||||||
c.ClearSnapshot(true)
|
c.ClearSnapshot(true)
|
||||||
|
|
||||||
expValues = Values{v5, v4}
|
expValues = Values{v5, v4}
|
||||||
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
||||||
t.Fatalf("post-clear values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
t.Fatalf("post-clear values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create another snapshot
|
||||||
|
snapshot = c.Snapshot()
|
||||||
|
|
||||||
|
if err := c.Write("foo", Values{v4, v5}); err != nil {
|
||||||
|
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
c.ClearSnapshot(true)
|
||||||
|
|
||||||
|
snapshot = c.Snapshot()
|
||||||
|
|
||||||
|
if err := c.Write("foo", Values{v6, v7}); err != nil {
|
||||||
|
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
expValues = Values{v5, v7, v4, v6}
|
||||||
|
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
|
||||||
|
t.Fatalf("post-snapshot out-of-order write values for foo incorrect, exp: %v, got %v", expValues, deduped)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCache_CacheEmptySnapshot(t *testing.T) {
|
func TestCache_CacheEmptySnapshot(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue