Merge pull request #6587 from influxdata/jw-validate-fields

Fix for merge values
pull/6598/head
Jason Wilder 2016-05-10 11:56:07 -06:00
commit d8490f1170
3 changed files with 255 additions and 63 deletions

View File

@ -694,11 +694,19 @@ func (c *Client) DropShard(id uint64) error {
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
// Check under a read-lock
c.mu.RLock()
if sg, _ := c.cacheData.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
c.mu.RUnlock()
return sg, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
// Check again under the write lock
data := c.cacheData.Clone()
if sg, _ := data.ShardGroupByTimestamp(database, policy, timestamp); sg != nil {
return sg, nil
}

View File

@ -111,26 +111,45 @@ func (a Values) Merge(b Values) Values {
return a
}
var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
sort.Sort(b[j:])
if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}
return a
}
@ -442,24 +461,45 @@ func (a FloatValues) Merge(b FloatValues) FloatValues {
return a
}
var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}
return a
}
@ -655,24 +695,45 @@ func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
return a
}
var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}
return a
}
@ -828,24 +889,45 @@ func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
return a
}
var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}
return a
}
@ -1003,24 +1085,45 @@ func (a StringValues) Merge(b StringValues) StringValues {
return a
}
var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
for i := 0; i < len(a) && len(b) > 0; i++ {
av, bv := a[i].UnixNano(), b[0].UnixNano()
// Value in a is greater than B, we need to merge
if av > bv {
a[i], b[j] = b[j], a[i]
// Save value in a
temp := a[i]
// Overwrite a with b
a[i] = b[0]
// Slide all values of b down 1
copy(b, b[1:])
b = b[:len(b)-1]
// See where value we save from a should be inserted in b to keep b sorted
k := sort.Search(len(b), func(i int) bool { return b[i].UnixNano() >= temp.UnixNano() })
if k == len(b) {
// Last position?
b = append(b, temp)
} else if b[k].UnixNano() != temp.UnixNano() {
// Save the last element, since it will get overwritten
last := b[len(b)-1]
// Somewhere in the middle of b, insert it only if it's not a duplicate
copy(b[k+1:], b[k:])
// Add the last vale to the end
b = append(b, last)
b[k] = temp
}
} else if av == bv {
a[i] = b[j]
j++
// Value in a an b are the same, use b
a[i] = b[0]
b = b[1:]
}
}
if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
if len(b) > 0 {
return append(a, b...)
}
return a
}

View File

@ -386,6 +386,70 @@ func TestValues_MergeFloat(t *testing.T) {
tsm1.NewValue(1462498658288956853, 1.1),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(4, 4.0),
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
},
b: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
},
exp: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
tsm1.NewValue(4, 4.0),
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
},
b: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
tsm1.NewValue(4, 4.0),
tsm1.NewValue(7, 7.0),
tsm1.NewValue(8, 8.0),
},
exp: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
tsm1.NewValue(4, 4.0),
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
tsm1.NewValue(7, 7.0),
tsm1.NewValue(8, 8.0),
},
},
{
a: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
},
b: []tsm1.Value{
tsm1.NewValue(4, 4.0),
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
},
exp: []tsm1.Value{
tsm1.NewValue(1, 1.0),
tsm1.NewValue(2, 2.0),
tsm1.NewValue(3, 3.0),
tsm1.NewValue(4, 4.0),
tsm1.NewValue(5, 5.0),
tsm1.NewValue(6, 6.0),
},
},
}
for i, test := range tests {
@ -1226,3 +1290,20 @@ func BenchmarkValues_Deduplicate(b *testing.B) {
tsm1.Values(values).Deduplicate()
}
}
func BenchmarkValues_Merge(b *testing.B) {
valueCount := 1000
times := getTimes(valueCount, 60, time.Second)
a := make([]tsm1.Value, len(times))
c := make([]tsm1.Value, len(times))
for i, t := range times {
a[i] = tsm1.NewValue(t, float64(i))
c[i] = tsm1.NewValue(t+1, float64(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
tsm1.Values(a).Merge(c)
}
}