Merge branch 'master-1.x' into new-http-headers

pull/18667/head
Ayan George 2020-06-30 11:03:04 -04:00 committed by GitHub
commit 04536858d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 966 additions and 56 deletions

View File

@ -6,6 +6,8 @@ v1.8.1 [unreleased]
### Bugfixes
- [#17638](https://github.com/influxdata/influxdb/pull/17638): Verify precision in write requests.
- [#18410](https://github.com/influxdata/influxdb/pull/18410): Enable CORS in InfluxDB 2.0 compatibility APIs.
- [#18429](https://github.com/influxdata/influxdb/pull/18429): Add option to authenticate prometheus remote read
v1.8.0 [unreleased]
-------------------

View File

@ -301,6 +301,10 @@
# endpoints. This setting has no effect if auth-enabled is set to false.
# ping-auth-enabled = false
# Enables authentication on prometheus remote read api. This setting has no
# effect if auth-enabled is set to false.
# prom-read-auth-enabled = false
# Determines whether HTTPS is enabled.
# https-enabled = false

View File

@ -11,6 +11,7 @@ import (
"context"
"io"
"sort"
"strings"
"sync"
"time"
@ -374,6 +375,7 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -411,11 +413,57 @@ func (itr *floatSortedMergeIterator) pop() (*FloatPoint, error) {
type floatSortedMergeHeap struct {
opt IteratorOptions
items []*floatSortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *floatSortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*floatSortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *floatSortedMergeHeap) Len() int { return len(h.items) }
func (h *floatSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *floatSortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -489,6 +537,8 @@ type floatSortedMergeHeapItem struct {
point *FloatPoint
err error
itr FloatIterator
// index for fast shortcut
fastIdx int
}
// floatIteratorScanner scans the results of a FloatIterator into a map.
@ -3038,6 +3088,7 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -3075,11 +3126,57 @@ func (itr *integerSortedMergeIterator) pop() (*IntegerPoint, error) {
type integerSortedMergeHeap struct {
opt IteratorOptions
items []*integerSortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *integerSortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*integerSortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *integerSortedMergeHeap) Len() int { return len(h.items) }
func (h *integerSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *integerSortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -3153,6 +3250,8 @@ type integerSortedMergeHeapItem struct {
point *IntegerPoint
err error
itr IntegerIterator
// index for fast shortcut
fastIdx int
}
// integerIteratorScanner scans the results of a IntegerIterator into a map.
@ -5702,6 +5801,7 @@ func (itr *unsignedSortedMergeIterator) pop() (*UnsignedPoint, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -5739,11 +5839,57 @@ func (itr *unsignedSortedMergeIterator) pop() (*UnsignedPoint, error) {
type unsignedSortedMergeHeap struct {
opt IteratorOptions
items []*unsignedSortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *unsignedSortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*unsignedSortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *unsignedSortedMergeHeap) Len() int { return len(h.items) }
func (h *unsignedSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *unsignedSortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -5817,6 +5963,8 @@ type unsignedSortedMergeHeapItem struct {
point *UnsignedPoint
err error
itr UnsignedIterator
// index for fast shortcut
fastIdx int
}
// unsignedIteratorScanner scans the results of a UnsignedIterator into a map.
@ -8366,6 +8514,7 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -8403,11 +8552,57 @@ func (itr *stringSortedMergeIterator) pop() (*StringPoint, error) {
type stringSortedMergeHeap struct {
opt IteratorOptions
items []*stringSortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *stringSortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*stringSortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *stringSortedMergeHeap) Len() int { return len(h.items) }
func (h *stringSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *stringSortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -8481,6 +8676,8 @@ type stringSortedMergeHeapItem struct {
point *StringPoint
err error
itr StringIterator
// index for fast shortcut
fastIdx int
}
// stringIteratorScanner scans the results of a StringIterator into a map.
@ -11016,6 +11213,7 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -11053,11 +11251,57 @@ func (itr *booleanSortedMergeIterator) pop() (*BooleanPoint, error) {
type booleanSortedMergeHeap struct {
opt IteratorOptions
items []*booleanSortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *booleanSortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*booleanSortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *booleanSortedMergeHeap) Len() int { return len(h.items) }
func (h *booleanSortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *booleanSortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -11131,6 +11375,8 @@ type booleanSortedMergeHeapItem struct {
point *BooleanPoint
err error
itr BooleanIterator
// index for fast shortcut
fastIdx int
}
// booleanIteratorScanner scans the results of a BooleanIterator into a map.

View File

@ -5,9 +5,9 @@ import (
"container/heap"
"io"
"sort"
"strings"
"sync"
"time"
"sync"
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxql"
@ -373,6 +373,7 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
}
itr.heap.items = append(itr.heap.items, item)
}
itr.heap.detectFast()
heap.Init(itr.heap)
itr.init = true
}
@ -410,11 +411,57 @@ func (itr *{{$k.name}}SortedMergeIterator) pop() (*{{$k.Name}}Point, error) {
type {{$k.name}}SortedMergeHeap struct {
opt IteratorOptions
items []*{{$k.name}}SortedMergeHeapItem
// if each input comes from a unique single time series, we can make a shortcut.
// detection of the shortcut introduces some overhead but it gets significant
// performance improvement in cases like SELECT * FROM m GROUP BY *
fast bool
}
func (h *{{$k.name}}SortedMergeHeap) detectFast() {
for _, item := range h.items {
if item.itr.Stats().SeriesN != 1 {
return
}
}
hasDup := false
s := make([]*{{$k.name}}SortedMergeHeapItem, len(h.items))
copy(s, h.items)
less := func(i, j int) bool {
x, y := s[i].point, s[j].point
ret := strings.Compare(x.Name, y.Name)
if ret == 0 {
ret = strings.Compare(x.Tags.ID(), y.Tags.ID())
}
if ret != 0 {
// TT
// ret | == -1 | h.opt.Ascending | result
// 1 | false | false | true
// -1 | true | false | false
// 1 | false | true | false
// -1 | true | true | true
return ret == -1 == h.opt.Ascending
}
hasDup = true
return false
}
sort.Slice(s, less)
if !hasDup {
h.fast = true
for i, item := range s {
item.fastIdx = i
}
}
}
func (h *{{$k.name}}SortedMergeHeap) Len() int { return len(h.items) }
func (h *{{$k.name}}SortedMergeHeap) Swap(i, j int) { h.items[i], h.items[j] = h.items[j], h.items[i] }
func (h *{{$k.name}}SortedMergeHeap) Less(i, j int) bool {
if h.fast {
return h.items[i].fastIdx < h.items[j].fastIdx
}
x, y := h.items[i].point, h.items[j].point
if h.opt.Ascending {
@ -488,6 +535,8 @@ type {{$k.name}}SortedMergeHeapItem struct {
point *{{$k.Name}}Point
err error
itr {{$k.Name}}Iterator
// index for fast shortcut
fastIdx int
}
// {{$k.name}}IteratorScanner scans the results of a {{$k.Name}}Iterator into a map.

306
query/iterator.gen_test.go Normal file
View File

@ -0,0 +1,306 @@
package query
import (
"testing"
"github.com/influxdata/influxql"
)
// a simple FloatIterator for testing
type floatIterator struct {
points []FloatPoint
closed bool
stats IteratorStats
}
func (itr *floatIterator) Stats() IteratorStats { return itr.stats }
func (itr *floatIterator) Close() error { itr.closed = true; return nil }
// Next returns the next value and shifts it off the beginning of the points slice.
func (itr *floatIterator) Next() (*FloatPoint, error) {
if len(itr.points) == 0 || itr.closed {
return nil, nil
}
v := &itr.points[0]
itr.points = itr.points[1:]
return v, nil
}
func TestSortedMergeHeap_DetectFast(t *testing.T) {
suite := []*struct {
inputs []FloatIterator
ascending bool
fast bool // expected status
}{
// case 0
{
inputs: []FloatIterator{
&floatIterator{
points: []FloatPoint{
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 50, Value: 8},
},
stats: IteratorStats{SeriesN: 3},
},
&floatIterator{
points: []FloatPoint{
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 50, Value: 8},
},
stats: IteratorStats{SeriesN: 3},
},
},
ascending: true,
fast: false,
},
// case 1
{
inputs: []FloatIterator{
&floatIterator{
points: []FloatPoint{
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 61, Value: 8},
},
stats: IteratorStats{SeriesN: 1},
},
&floatIterator{
points: []FloatPoint{
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 0, Value: 1},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 12, Value: 3},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 30, Value: 4},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 4, Value: 2},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 61, Value: 8},
},
stats: IteratorStats{SeriesN: 1},
},
},
ascending: false,
fast: true,
},
// case 2
{
inputs: []FloatIterator{
&floatIterator{
points: []FloatPoint{
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 0, Value: 1},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 12, Value: 3},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 30, Value: 4},
{Name: "cpu", Tags: NewTags(map[string]string{"host": "A"}), Time: 51, Value: 8},
},
stats: IteratorStats{SeriesN: 1},
},
&floatIterator{
points: []FloatPoint{
{Name: "cpu", Tags: NewTags(map[string]string{"host": "B"}), Time: 1, Value: 8},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 10, Value: 1},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 12, Value: 3},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 30, Value: 4},
{Name: "mem", Tags: NewTags(map[string]string{"host": "B"}), Time: 40, Value: 2},
},
stats: IteratorStats{SeriesN: 2},
},
},
ascending: true,
fast: false,
},
}
for i, c := range suite {
h := createFloatSortedMergeHeap(
c.inputs,
IteratorOptions{
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: c.ascending,
})
h.detectFast()
if h.fast != c.fast {
t.Fatalf("unexpected shortcut status for sorted merge heap, case %d", i)
}
}
}
func createFloatSortedMergeHeap(inputs []FloatIterator, opt IteratorOptions) *floatSortedMergeHeap {
h := &floatSortedMergeHeap{
items: make([]*floatSortedMergeHeapItem, 0, len(inputs)),
opt: opt,
}
items2 := make([]*floatSortedMergeHeapItem, 0, len(inputs))
for _, input := range inputs {
items2 = append(items2, &floatSortedMergeHeapItem{itr: input})
}
for _, item := range items2 {
var err error
if item.point, err = item.itr.Next(); err != nil {
panic(err)
} else if item.point == nil {
continue
}
h.items = append(h.items, item)
}
return h
}
// a simple iterator that has only a single series
type simpleFloatIterator struct {
point FloatPoint
size int
populated int
stats IteratorStats
}
func (itr *simpleFloatIterator) Stats() IteratorStats {
return itr.stats
}
func (itr *simpleFloatIterator) Close() error { itr.populated = itr.size; return nil }
func (itr *simpleFloatIterator) Next() (*FloatPoint, error) {
if itr.populated >= itr.size {
return nil, nil
}
p := itr.point.Clone()
p.Time += int64(itr.populated * 1000)
itr.populated++
return p, nil
}
func BenchmarkSortedMergeIterator_Fast(b *testing.B) {
for i := 0; i < b.N; i++ {
sortedMergeIterFast()
}
}
func BenchmarkSortedMergeIterator_NotFast(b *testing.B) {
for i := 0; i < b.N; i++ {
sortedMergeIterNotFast()
}
}
func sortedMergeIterFast() {
inputs := []Iterator{}
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "two"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "three"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
itr := NewSortedMergeIterator(inputs, IteratorOptions{}).(*floatSortedMergeIterator)
p, _ := itr.Next()
for p != nil {
p, _ = itr.Next()
}
}
func sortedMergeIterNotFast() {
inputs := []Iterator{}
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "four"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 2},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "five"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 2},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "fix"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 2},
})
opt := IteratorOptions{
Dimensions: []string{"taga", "tagb", "tagc"},
}
itr := NewSortedMergeIterator(inputs, opt).(*floatSortedMergeIterator)
p, _ := itr.Next()
for p != nil {
p, _ = itr.Next()
}
}
func BenchmarkSortedMergeIterator_FastCheckOverhead(b *testing.B) {
inputs := []FloatIterator{}
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "two"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "three"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "four"}), Time: 10, Value: 2},
size: 1000000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "five"}), Time: 10, Value: 2},
size: 1000000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "fix"}), Time: 10, Value: 2},
size: 1000000,
stats: IteratorStats{SeriesN: 1},
})
inputs = append(inputs,
&simpleFloatIterator{
point: FloatPoint{Name: "cpu", Tags: NewTags(map[string]string{"taga": "aaaaaaaaaa", "tagb": "bbbbbbbbbb", "tagc": "cccccccccc", "tagd": "dddddddddd", "tage": "eeeeeeeeee", "tagf": "one"}), Time: 10, Value: 2},
size: 10000,
stats: IteratorStats{SeriesN: 1},
})
h := createFloatSortedMergeHeap(
inputs,
IteratorOptions{
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
h.detectFast()
if h.fast {
panic("unexpected shortcut")
}
}
}

View File

@ -78,6 +78,7 @@ func NewConfig() Config {
PprofAuthEnabled: false,
DebugPprofEnabled: false,
PingAuthEnabled: false,
PromReadAuthEnabled: false,
HTTPSEnabled: false,
HTTPSCertificate: "/etc/ssl/influxdb.pem",
MaxRowLimit: 0,

View File

@ -194,6 +194,10 @@ func NewHandler(c Config) *Handler {
"write", // Data-ingest route.
"POST", "/api/v2/write", true, writeLogEnabled, h.serveWriteV2,
},
Route{ // Enable CORS
"write-options",
"OPTIONS", "/api/v2/write", false, true, h.serveOptions,
},
Route{
"prometheus-write", // Prometheus remote write
"POST", "/api/v1/prom/write", false, true, h.servePromWrite,
@ -218,6 +222,14 @@ func NewHandler(c Config) *Handler {
"status-head",
"HEAD", "/status", false, true, authWrapper(h.serveStatus),
},
Route{ // Health
"health",
"GET", "/health", false, true, authWrapper(h.serveHealth),
},
Route{ // Enable CORS
"health-options",
"OPTIONS", "/health", false, true, h.serveOptions,
},
Route{
"prometheus-metrics",
"GET", "/metrics", false, true, authWrapper(promhttp.Handler().ServeHTTP),
@ -269,6 +281,10 @@ func NewHandler(c Config) *Handler {
"flux-read",
"POST", "/api/v2/query", true, true, nil,
}
fluxRouteCors := Route{
"flux-read-options",
"OPTIONS", "/api/v2/query", false, true, h.serveOptions,
}
if !c.FluxEnabled {
fluxRoute.HandlerFunc = func(w http.ResponseWriter, r *http.Request) {
@ -277,7 +293,7 @@ func NewHandler(c Config) *Handler {
} else {
fluxRoute.HandlerFunc = h.serveFluxQuery
}
h.AddRoutes(fluxRoute)
h.AddRoutes(fluxRoute, fluxRouteCors)
return h
}
@ -1002,6 +1018,24 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
}
}
// serveHealth maps v2 health endpoint to ping endpoint
func (h *Handler) serveHealth(w http.ResponseWriter, r *http.Request) {
resp := map[string]interface{}{
"name": "influxdb",
"message": "ready for queries and writes",
"status": "pass",
"checks": []string{},
"version": h.Version,
}
b, _ := json.Marshal(resp)
h.writeHeader(w, http.StatusOK)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if _, err := w.Write(b); err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
}
// serveStatus has been deprecated.
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
h.Logger.Info("WARNING: /status has been deprecated. Use /ping instead.")
@ -1171,6 +1205,8 @@ func (h *Handler) servePromWrite(w http.ResponseWriter, r *http.Request, user me
// servePromRead will convert a Prometheus remote read request into a storage
// query and returns data in Prometheus remote read protobuf format.
func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user meta.User) {
atomic.AddInt64(&h.stats.PromReadRequests, 1)
h.requestTracker.Add(r, user)
compressed, err := ioutil.ReadAll(r.Body)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
@ -1193,6 +1229,17 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
db := r.FormValue("db")
rp := r.FormValue("rp")
if h.Config.AuthEnabled && h.Config.PromReadAuthEnabled {
if user == nil {
h.httpError(w, fmt.Sprintf("user is required to read from database %q", db), http.StatusForbidden)
return
}
if !user.AuthorizeDatabase(influxql.ReadPrivilege, db) {
h.httpError(w, fmt.Sprintf("user %q is not authorized to read from database %q", user.ID(), db), http.StatusForbidden)
return
}
}
readRequest, err := prometheus.ReadRequestToInfluxStorageRequest(&req, db, rp)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
@ -1825,6 +1872,7 @@ func cors(inner http.Handler) http.Handler {
`Authorization`,
`Content-Length`,
`Content-Type`,
`User-Agent`,
`X-CSRF-Token`,
`X-HTTP-Method-Override`,
}, ", "))

View File

@ -35,6 +35,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/pkg/testing/assert"
"github.com/influxdata/influxdb/prometheus/remote"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/httpd"
@ -1476,6 +1477,29 @@ func TestHandler_Ping(t *testing.T) {
}
}
// Ensure the handler handles health requests correctly.
func TestHandler_Health(t *testing.T) {
h := NewHandler(false)
w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("GET", "/health", nil))
if w.Code != http.StatusOK {
t.Fatalf("unexpected status: %d", w.Code)
}
var got map[string]interface{}
if err := json.Unmarshal(w.Body.Bytes(), &got); err != nil {
t.Fatal(err)
}
assert.Equal(t, got["name"], "influxdb", "invalid name")
assert.Equal(t, got["message"], "ready for queries and writes", "invalid message")
assert.Equal(t, got["status"], "pass", "invalid status")
assert.Equal(t, got["version"], "0.0.0", "invalid version")
if _, present := got["checks"]; !present {
t.Fatal("missing checks")
}
}
// Ensure the handler returns the version correctly from the different endpoints.
func TestHandler_Version(t *testing.T) {
h := NewHandler(false)

View File

@ -930,7 +930,11 @@ func (e *Engine) Backup(w io.Writer, basePath string, since time.Time) error {
e.logger.Warn("Snapshotter busy: Backup proceeding without snapshot contents.")
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
defer func() {
if err := os.RemoveAll(path); err != nil {
e.logger.Warn("backup could not remove temporary snapshot directory", zap.String("path", path), zap.Error(err))
}
}()
return intar.Stream(w, path, basePath, intar.SinceFilterTarFile(since))
}
@ -996,7 +1000,11 @@ func (e *Engine) Export(w io.Writer, basePath string, start time.Time, end time.
return err
}
// Remove the temporary snapshot dir
defer os.RemoveAll(path)
defer func() {
if err := os.RemoveAll(path); err != nil {
e.logger.Warn("export could not remove temporary snapshot directory", zap.String("path", path), zap.Error(err))
}
}()
return intar.Stream(w, path, basePath, e.timeStampFilterTarFile(start, end))
}
@ -1673,6 +1681,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
ids := tsdb.NewSeriesIDSet()
measurements := make(map[string]struct{}, 1)
deleteIDList := make([]uint64, 0, 10000)
deleteKeyList := make([][]byte, 0, 10000)
for _, k := range seriesKeys {
if len(k) == 0 {
continue // This key was wiped because it shouldn't be removed from index.
@ -1702,15 +1713,18 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
continue
}
// Insert deleting series info into queue
measurements[string(name)] = struct{}{}
// Remove the series from the local index.
if err := e.index.DropSeries(sid, k, false); err != nil {
return err
}
deleteIDList = append(deleteIDList, sid)
deleteKeyList = append(deleteKeyList, k)
// Add the id to the set of delete ids.
ids.Add(sid)
}
// Remove the series from the local index.
if err := e.index.DropSeriesList(deleteIDList, deleteKeyList, false); err != nil {
return err
}
fielsetChanged := false
for k := range measurements {

View File

@ -1050,6 +1050,24 @@ func (f *FileStore) locations(key []byte, t int64, ascending bool) []*location {
return locations
}
// MakeSnapshotLinks creates hardlinks from the supplied TSMFiles to
// corresponding files under a supplied directory.
func (f *FileStore) MakeSnapshotLinks(destPath string, files []TSMFile) error {
for _, tsmf := range files {
newpath := filepath.Join(destPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return fmt.Errorf("error creating tsm hard link: %q", err)
}
for _, tf := range tsmf.TombstoneFiles() {
newpath := filepath.Join(destPath, filepath.Base(tf.Path))
if err := os.Link(tf.Path, newpath); err != nil {
return fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}
return nil
}
// CreateSnapshot creates hardlinks for all tsm and tombstone files
// in the path provided.
func (f *FileStore) CreateSnapshot() (string, error) {
@ -1069,29 +1087,24 @@ func (f *FileStore) CreateSnapshot() (string, error) {
// increment and keep track of the current temp dir for when we drop the lock.
// this ensures we are the only writer to the directory.
f.currentTempDirID += 1
tmpPath := fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension)
tmpPath = filepath.Join(f.dir, tmpPath)
tmpPath := filepath.Join(f.dir, fmt.Sprintf("%d.%s", f.currentTempDirID, TmpTSMFileExtension))
f.mu.Unlock()
// create the tmp directory and add the hard links. there is no longer any shared
// mutable state.
err := os.Mkdir(tmpPath, 0777)
if err != nil {
// create the tmp directory and add the hard links. there is no longer any
// shared mutable state.
if err := os.Mkdir(tmpPath, 0777); err != nil {
return "", err
}
for _, tsmf := range files {
newpath := filepath.Join(tmpPath, filepath.Base(tsmf.Path()))
if err := os.Link(tsmf.Path(), newpath); err != nil {
return "", fmt.Errorf("error creating tsm hard link: %q", err)
}
for _, tf := range tsmf.TombstoneFiles() {
newpath := filepath.Join(tmpPath, filepath.Base(tf.Path))
if err := os.Link(tf.Path, newpath); err != nil {
return "", fmt.Errorf("error creating tombstone hard link: %q", err)
}
}
}
if err := f.MakeSnapshotLinks(tmpPath, files); err != nil {
// remove temporary directory since we couldn't create our hard links.
if err := os.RemoveAll(tmpPath); err != nil {
// report if, for some reason, we couldn't remove our temporary
// directory.
return "", fmt.Errorf("CreateSnapshot() failed to create links and failed to remove temporary direcotry %v: %w", tmpPath, err)
}
return "", err
}
return tmpPath, nil
}

View File

@ -43,6 +43,7 @@ type Index interface {
CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
DropSeries(seriesID uint64, key []byte, cascade bool) error
DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error
DropMeasurementIfSeriesNotExist(name []byte) (bool, error)
// Used to clean up series in inmem index that were dropped with a shard.

View File

@ -1135,6 +1135,30 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error {
return nil
}
// DropSeriesList removes the provided series ids from the local bitset that tracks
// series in this shard only.
func (idx *ShardIndex) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}
idx.seriesIDSet.Lock()
for i, seriesID := range seriesIDs {
if idx.seriesIDSet.ContainsNoLock(seriesID) {
idx.seriesIDSet.RemoveNoLock(seriesID)
name := models.ParseName(keys[i])
if curr := idx.measurements[string(name)]; curr <= 1 {
delete(idx.measurements, string(name))
} else {
idx.measurements[string(name)] = curr - 1
}
}
}
idx.seriesIDSet.Unlock()
return nil
}
// DropMeasurementIfSeriesNotExist drops a measurement only if there are no more
// series for the measurment.
func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) {

View File

@ -824,6 +824,64 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error {
return nil
}
// DropSeries drops the provided series from the index. If cascade is true
// and this is the last series to the measurement, the measurment will also be dropped.
func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error {
// All slices must be of equal length.
if len(seriesIDs) != len(keys) {
return errors.New("seriesIDs/keys length mismatch in index")
}
// We need to move different series into collections for each partition
// to process.
pSeriesIDs := make([][]uint64, i.PartitionN)
pKeys := make([][][]byte, i.PartitionN)
for idx, key := range keys {
pidx := i.partitionIdx(key)
pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx])
pKeys[pidx] = append(pKeys[pidx], key)
}
// Process each subset of series on each partition.
n := i.availableThreads()
// Store errors.
errC := make(chan error, i.PartitionN)
var pidx uint32 // Index of maximum Partition being worked on.
for k := 0; k < n; k++ {
go func() {
for {
idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on.
if idx >= len(i.partitions) {
return // No more work.
}
// Drop from partition.
err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx])
errC <- err
}
}()
}
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
// Add sketch tombstone.
i.mu.Lock()
for _, key := range keys {
i.sTSketch.Add(key)
}
i.mu.Unlock()
return nil
}
// DropSeriesGlobal is a no-op on the tsi1 index.
func (i *Index) DropSeriesGlobal(key []byte) error { return nil }

View File

@ -466,6 +466,69 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) {
})
}
func TestIndex_DropSeriesList(t *testing.T) {
idx := MustOpenDefaultIndex() // Uses the single series creation method CreateSeriesIfNotExists
defer idx.Close()
// Add some series.
data := []struct {
Key string
Name string
Tags map[string]string
}{
{"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}},
{"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}},
{"cpu,region=west,server=c", "cpu", map[string]string{"region": "west", "server": "c"}},
{"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}},
{"cpu,region=east,server=c", "cpu", map[string]string{"region": "east", "server": "c"}},
{"cpu,region=east,server=d", "cpu", map[string]string{"region": "east", "server": "d"}},
{"cpu,region=north,server=b", "cpu", map[string]string{"region": "north", "server": "b"}},
{"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}},
{"cpu,region=north,server=d", "cpu", map[string]string{"region": "north", "server": "d"}},
{"cpu,region=south,server=a", "cpu", map[string]string{"region": "south", "server": "a"}},
{"cpu,region=south,server=d", "cpu", map[string]string{"region": "south", "server": "d"}},
}
keys := make([][]byte, 0, 15)
seriesIDs := make([]uint64, 0, 15)
for _, pt := range data {
if err := idx.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil {
t.Fatal(err)
}
keys = append(keys, []byte(pt.Key))
seriesIDs = append(seriesIDs, idx.Index.SeriesFile().SeriesID([]byte(pt.Name), models.NewTags(pt.Tags), nil))
}
// Drop series list
if err := idx.DropSeriesList(seriesIDs[0:len(seriesIDs)-2], keys[0:len(keys)-2], false); err != nil {
t.Fatal(err)
}
// Verify series still exists.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if !v {
t.Fatal("expected series to still exist")
}
})
// Drop series list lefted
if err := idx.DropSeriesList(seriesIDs[len(seriesIDs)-2:], keys[len(keys)-2:], false); err != nil {
t.Fatal(err)
}
// Verify series is now deleted.
idx.Run(t, func(t *testing.T) {
if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil {
t.Fatal(err)
} else if v {
t.Fatal("expected series to be deleted")
}
})
}
// Index is a test wrapper for tsi1.Index.
type Index struct {
*tsi1.Index

View File

@ -578,6 +578,23 @@ func (f *LogFile) DeleteSeriesID(id uint64) error {
return f.FlushAndSync()
}
// DeleteSeriesIDList adds a tombstone for seriesIDList
func (f *LogFile) DeleteSeriesIDList(ids []uint64) error {
f.mu.Lock()
defer f.mu.Unlock()
for _, id := range ids {
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
// SeriesN returns the total number of series in the file.
func (f *LogFile) SeriesN() (n uint64) {
f.mu.RLock()
@ -1056,6 +1073,21 @@ func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error)
return sketch, tSketch, nil
}
func (f *LogFile) Writes(entries []LogEntry) error {
f.mu.RLock()
defer f.mu.RUnlock()
for i := range entries {
entry := &entries[i]
if err := f.appendEntry(entry); err != nil {
return err
}
f.execEntry(entry)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
// LogEntry represents a single log entry in the write-ahead log.
type LogEntry struct {
Flag byte // flag

View File

@ -585,31 +585,20 @@ func (p *Partition) DropMeasurement(name []byte) error {
}
defer fs.Release()
entries := make([]LogEntry, 0, 100)
// Delete all keys and values.
if kitr := fs.TagKeyIterator(name); kitr != nil {
for k := kitr.Next(); k != nil; k = kitr.Next() {
// Delete key if not already deleted.
if !k.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: k.Key()})
}
// Delete each value in key.
if vitr := k.TagValueIterator(); vitr != nil {
for v := vitr.Next(); v != nil; v = vitr.Next() {
if !v.Deleted() {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: k.Key(), Value: v.Value()})
}
}
}
@ -626,13 +615,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
} else if elem.SeriesID == 0 {
break
}
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesID(elem.SeriesID)
}(); err != nil {
return err
}
entries = append(entries, LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: elem.SeriesID})
}
if err = itr.Close(); err != nil {
return err
@ -640,13 +623,14 @@ func (p *Partition) DropMeasurement(name []byte) error {
}
// Mark measurement as deleted.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteMeasurement(name)
}(); err != nil {
entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name})
p.mu.RLock()
if err := p.activeLogFile.Writes(entries); err != nil {
p.mu.RUnlock()
return err
}
p.mu.RUnlock()
// Check if the log file needs to be swapped.
if err := p.CheckLogFile(); err != nil {
@ -705,6 +689,28 @@ func (p *Partition) DropSeries(seriesID uint64) error {
return p.CheckLogFile()
}
func (p *Partition) DropSeriesList(seriesIDs []uint64) error {
if len(seriesIDs) == 0 {
return nil
}
// Delete series from index.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteSeriesIDList(seriesIDs)
}(); err != nil {
return err
}
for _, seriesID := range seriesIDs {
p.seriesIDSet.Remove(seriesID)
}
// Swap log file, if necessary.
return p.CheckLogFile()
}
// MeasurementsSketches returns the two sketches for the partition by merging all
// instances of the type sketch types in all the index files.
func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {

View File

@ -714,19 +714,20 @@ func (s *Store) DeleteShard(shardID uint64) error {
return nil
}
delete(s.shards, shardID)
delete(s.epochs, shardID)
s.pendingShardDeletes[shardID] = struct{}{}
db := sh.Database()
// Determine if the shard contained any series that are not present in any
// other shards in the database.
shards := s.filterShards(byDatabase(db))
epoch := s.epochs[shardID]
s.mu.Unlock()
// Ensure the pending deletion flag is cleared on exit.
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.epochs, shardID)
delete(s.pendingShardDeletes, shardID)
s.databases[db].removeIndexType(sh.IndexType())
}()
@ -787,6 +788,15 @@ func (s *Store) DeleteShard(shardID uint64) error {
}
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before closing the shard
for _, guard := range guards {
guard.Wait()
}
// Close the shard.
if err := sh.Close(); err != nil {
return err
@ -808,9 +818,8 @@ func (s *Store) DeleteDatabase(name string) error {
// no files locally, so nothing to do
return nil
}
shards := s.filterShards(func(sh *Shard) bool {
return sh.database == name
})
shards := s.filterShards(byDatabase(name))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
if err := s.walkShards(shards, func(sh *Shard) error {
@ -818,6 +827,16 @@ func (s *Store) DeleteDatabase(name string) error {
return nil
}
epoch := epochs[sh.id]
// enter the epoch tracker
guards, gen := epoch.StartWrite()
defer epoch.EndWrite(gen)
// wait for any guards before closing the shard
for _, guard := range guards {
guard.Wait()
}
return sh.Close()
}); err != nil {
return err