Removing dead code from every package except influxql
The tsdb package had a substantial amount of dead code related to the old query engine still in there. It is no longer used, so it was removed since it was left unmaintained. There is likely still more code that is the same, but wasn't found as part of this code cleanup. influxql has dead code show up because of the code generation so it is not included in this pruning.pull/6879/head
parent
79b09747f0
commit
497db2a6d3
|
@ -17,17 +17,14 @@ import (
|
|||
|
||||
// The statistics generated by the "write" mdoule
|
||||
const (
|
||||
statWriteReq = "req"
|
||||
statPointWriteReq = "pointReq"
|
||||
statPointWriteReqLocal = "pointReqLocal"
|
||||
statPointWriteReqRemote = "pointReqRemote"
|
||||
statWriteOK = "writeOk"
|
||||
statWritePartial = "writePartial"
|
||||
statWriteTimeout = "writeTimeout"
|
||||
statWriteErr = "writeError"
|
||||
statWritePointReqHH = "pointReqHH"
|
||||
statSubWriteOK = "subWriteOk"
|
||||
statSubWriteDrop = "subWriteDrop"
|
||||
statWriteReq = "req"
|
||||
statPointWriteReq = "pointReq"
|
||||
statPointWriteReqLocal = "pointReqLocal"
|
||||
statWriteOK = "writeOk"
|
||||
statWriteTimeout = "writeTimeout"
|
||||
statWriteErr = "writeError"
|
||||
statSubWriteOK = "subWriteOk"
|
||||
statSubWriteDrop = "subWriteDrop"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
|
@ -1242,12 +1242,6 @@ func (s stringSet) intersect(o stringSet) stringSet {
|
|||
return ns
|
||||
}
|
||||
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (a uint64Slice) Len() int { return len(a) }
|
||||
func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] }
|
||||
|
||||
type keyValue struct {
|
||||
key, value string
|
||||
}
|
||||
|
|
|
@ -299,7 +299,6 @@ func (h *floatMergeHeap) Pop() interface{} {
|
|||
|
||||
type floatMergeHeapItem struct {
|
||||
itr *bufFloatIterator
|
||||
err error
|
||||
}
|
||||
|
||||
// floatSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
||||
|
@ -308,7 +307,6 @@ type floatSortedMergeIterator struct {
|
|||
opt IteratorOptions
|
||||
heap floatSortedMergeHeap
|
||||
init bool
|
||||
point FloatPoint
|
||||
}
|
||||
|
||||
// newFloatSortedMergeIterator returns an instance of floatSortedMergeIterator.
|
||||
|
@ -2357,7 +2355,6 @@ func (h *integerMergeHeap) Pop() interface{} {
|
|||
|
||||
type integerMergeHeapItem struct {
|
||||
itr *bufIntegerIterator
|
||||
err error
|
||||
}
|
||||
|
||||
// integerSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
||||
|
@ -2366,7 +2363,6 @@ type integerSortedMergeIterator struct {
|
|||
opt IteratorOptions
|
||||
heap integerSortedMergeHeap
|
||||
init bool
|
||||
point IntegerPoint
|
||||
}
|
||||
|
||||
// newIntegerSortedMergeIterator returns an instance of integerSortedMergeIterator.
|
||||
|
@ -4412,7 +4408,6 @@ func (h *stringMergeHeap) Pop() interface{} {
|
|||
|
||||
type stringMergeHeapItem struct {
|
||||
itr *bufStringIterator
|
||||
err error
|
||||
}
|
||||
|
||||
// stringSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
||||
|
@ -4421,7 +4416,6 @@ type stringSortedMergeIterator struct {
|
|||
opt IteratorOptions
|
||||
heap stringSortedMergeHeap
|
||||
init bool
|
||||
point StringPoint
|
||||
}
|
||||
|
||||
// newStringSortedMergeIterator returns an instance of stringSortedMergeIterator.
|
||||
|
@ -6467,7 +6461,6 @@ func (h *booleanMergeHeap) Pop() interface{} {
|
|||
|
||||
type booleanMergeHeapItem struct {
|
||||
itr *bufBooleanIterator
|
||||
err error
|
||||
}
|
||||
|
||||
// booleanSortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
||||
|
@ -6476,7 +6469,6 @@ type booleanSortedMergeIterator struct {
|
|||
opt IteratorOptions
|
||||
heap booleanSortedMergeHeap
|
||||
init bool
|
||||
point BooleanPoint
|
||||
}
|
||||
|
||||
// newBooleanSortedMergeIterator returns an instance of booleanSortedMergeIterator.
|
||||
|
|
|
@ -297,7 +297,6 @@ func (h *{{$k.name}}MergeHeap) Pop() interface{} {
|
|||
|
||||
type {{$k.name}}MergeHeapItem struct {
|
||||
itr *buf{{$k.Name}}Iterator
|
||||
err error
|
||||
}
|
||||
|
||||
// {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one.
|
||||
|
@ -306,7 +305,6 @@ type {{$k.name}}SortedMergeIterator struct {
|
|||
opt IteratorOptions
|
||||
heap {{$k.name}}SortedMergeHeap
|
||||
init bool
|
||||
point {{$k.Name}}Point
|
||||
}
|
||||
|
||||
// new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator.
|
||||
|
|
|
@ -548,15 +548,6 @@ func less(buf []byte, indices []int, i, j int) bool {
|
|||
return bytes.Compare(a, b) < 0
|
||||
}
|
||||
|
||||
func isFieldEscapeChar(b byte) bool {
|
||||
for c := range escape.Codes {
|
||||
if c == b {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// scanFields scans buf, starting at i for the fields section of a point. It returns
|
||||
// the ending position and the byte slice of the fields within buf
|
||||
func scanFields(buf []byte, i int) (int, []byte, error) {
|
||||
|
@ -1589,22 +1580,3 @@ func (p Fields) MarshalBinary() []byte {
|
|||
}
|
||||
return b
|
||||
}
|
||||
|
||||
type indexedSlice struct {
|
||||
indices []int
|
||||
b []byte
|
||||
}
|
||||
|
||||
func (s *indexedSlice) Less(i, j int) bool {
|
||||
_, a := scanTo(s.b, s.indices[i], '=')
|
||||
_, b := scanTo(s.b, s.indices[j], '=')
|
||||
return bytes.Compare(a, b) < 0
|
||||
}
|
||||
|
||||
func (s *indexedSlice) Swap(i, j int) {
|
||||
s.indices[i], s.indices[j] = s.indices[j], s.indices[i]
|
||||
}
|
||||
|
||||
func (s *indexedSlice) Len() int {
|
||||
return len(s.indices)
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@ import (
|
|||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
const leaderWaitTimeout = 30 * time.Second
|
||||
|
||||
// Policy constants.
|
||||
const (
|
||||
MonitorRetentionPolicy = "monitor"
|
||||
|
@ -405,15 +403,6 @@ type Statistic struct {
|
|||
Values map[string]interface{} `json:"values"`
|
||||
}
|
||||
|
||||
// newStatistic returns a new statistic object.
|
||||
func newStatistic(name string, tags map[string]string, values map[string]interface{}) *Statistic {
|
||||
return &Statistic{
|
||||
Name: name,
|
||||
Tags: tags,
|
||||
Values: values,
|
||||
}
|
||||
}
|
||||
|
||||
// valueNames returns a sorted list of the value names, if any.
|
||||
func (s *Statistic) ValueNames() []string {
|
||||
a := make([]string, 0, len(s.Values))
|
||||
|
|
|
@ -18,8 +18,6 @@ import (
|
|||
"github.com/kimor79/gollectd"
|
||||
)
|
||||
|
||||
const leaderWaitTimeout = 30 * time.Second
|
||||
|
||||
// statistics gathered by the collectd service.
|
||||
const (
|
||||
statPointsReceived = "pointsRx"
|
||||
|
@ -306,10 +304,3 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point {
|
|||
}
|
||||
return points
|
||||
}
|
||||
|
||||
// assert will panic with a given formatted message if the given condition is false.
|
||||
func assert(condition bool, msg string, v ...interface{}) {
|
||||
if !condition {
|
||||
panic(fmt.Sprintf("assert failed: "+msg, v...))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,9 +25,8 @@ const (
|
|||
|
||||
// Statistics for the CQ service.
|
||||
const (
|
||||
statQueryOK = "queryOk"
|
||||
statQueryFail = "queryFail"
|
||||
statPointsWritten = "pointsWritten"
|
||||
statQueryOK = "queryOk"
|
||||
statQueryFail = "queryFail"
|
||||
)
|
||||
|
||||
// ContinuousQuerier represents a service that executes continuous queries.
|
||||
|
|
|
@ -20,10 +20,7 @@ import (
|
|||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
udpBufferSize = 65536
|
||||
leaderWaitTimeout = 30 * time.Second
|
||||
)
|
||||
const udpBufferSize = 65536
|
||||
|
||||
// statistics gathered by the graphite package.
|
||||
const (
|
||||
|
@ -31,7 +28,6 @@ const (
|
|||
statBytesReceived = "bytesRx"
|
||||
statPointsParseFail = "pointsParseFail"
|
||||
statPointsNaNFail = "pointsNaNFail"
|
||||
statPointsUnsupported = "pointsUnsupportedFail"
|
||||
statBatchesTransmitted = "batchesTx"
|
||||
statPointsTransmitted = "pointsTx"
|
||||
statBatchesTransmitFail = "batchesTxFail"
|
||||
|
|
|
@ -10,12 +10,6 @@ import (
|
|||
"github.com/influxdata/influxdb/influxql"
|
||||
)
|
||||
|
||||
type loggingResponseWriter interface {
|
||||
http.ResponseWriter
|
||||
Status() int
|
||||
Size() int
|
||||
}
|
||||
|
||||
// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status
|
||||
// code and body size
|
||||
type responseLogger struct {
|
||||
|
|
|
@ -24,14 +24,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// errSleep is the time to sleep after we've failed on every metaserver
|
||||
// before making another pass
|
||||
errSleep = time.Second
|
||||
|
||||
// maxRetries is the maximum number of attemps to make before returning
|
||||
// a failure to the caller
|
||||
maxRetries = 10
|
||||
|
||||
// SaltBytes is the number of bytes used for salts
|
||||
SaltBytes = 32
|
||||
|
||||
|
@ -1047,14 +1039,6 @@ func (c *Client) Load() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type errCommand struct {
|
||||
msg string
|
||||
}
|
||||
|
||||
func (e errCommand) Error() string {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
type uint64Slice []uint64
|
||||
|
||||
func (a uint64Slice) Len() int { return len(a) }
|
||||
|
|
|
@ -1,161 +0,0 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type loggingResponseWriter interface {
|
||||
http.ResponseWriter
|
||||
Status() int
|
||||
Size() int
|
||||
}
|
||||
|
||||
// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status
|
||||
// code and body size
|
||||
type responseLogger struct {
|
||||
w http.ResponseWriter
|
||||
status int
|
||||
size int
|
||||
}
|
||||
|
||||
func (l *responseLogger) CloseNotify() <-chan bool {
|
||||
if notifier, ok := l.w.(http.CloseNotifier); ok {
|
||||
return notifier.CloseNotify()
|
||||
}
|
||||
// needed for response recorder for testing
|
||||
return make(<-chan bool)
|
||||
}
|
||||
|
||||
func (l *responseLogger) Header() http.Header {
|
||||
return l.w.Header()
|
||||
}
|
||||
|
||||
func (l *responseLogger) Flush() {
|
||||
l.w.(http.Flusher).Flush()
|
||||
}
|
||||
|
||||
func (l *responseLogger) Write(b []byte) (int, error) {
|
||||
if l.status == 0 {
|
||||
// Set status if WriteHeader has not been called
|
||||
l.status = http.StatusOK
|
||||
}
|
||||
size, err := l.w.Write(b)
|
||||
l.size += size
|
||||
return size, err
|
||||
}
|
||||
|
||||
func (l *responseLogger) WriteHeader(s int) {
|
||||
l.w.WriteHeader(s)
|
||||
l.status = s
|
||||
}
|
||||
|
||||
func (l *responseLogger) Status() int {
|
||||
if l.status == 0 {
|
||||
// This can happen if we never actually write data, but only set response headers.
|
||||
l.status = http.StatusOK
|
||||
}
|
||||
return l.status
|
||||
}
|
||||
|
||||
func (l *responseLogger) Size() int {
|
||||
return l.size
|
||||
}
|
||||
|
||||
// redact any occurrence of a password parameter, 'p'
|
||||
func redactPassword(r *http.Request) {
|
||||
q := r.URL.Query()
|
||||
if p := q.Get("p"); p != "" {
|
||||
q.Set("p", "[REDACTED]")
|
||||
r.URL.RawQuery = q.Encode()
|
||||
}
|
||||
}
|
||||
|
||||
// Common Log Format: http://en.wikipedia.org/wiki/Common_Log_Format
|
||||
|
||||
// buildLogLine creates a common log format
|
||||
// in addition to the common fields, we also append referrer, user agent and request ID
|
||||
func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string {
|
||||
|
||||
redactPassword(r)
|
||||
|
||||
username := parseUsername(r)
|
||||
|
||||
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||
|
||||
if err != nil {
|
||||
host = r.RemoteAddr
|
||||
}
|
||||
|
||||
uri := r.URL.RequestURI()
|
||||
|
||||
referer := r.Referer()
|
||||
|
||||
userAgent := r.UserAgent()
|
||||
|
||||
fields := []string{
|
||||
host,
|
||||
"-",
|
||||
detect(username, "-"),
|
||||
fmt.Sprintf("[%s]", start.Format("02/Jan/2006:15:04:05 -0700")),
|
||||
r.Method,
|
||||
uri,
|
||||
r.Proto,
|
||||
detect(strconv.Itoa(l.Status()), "-"),
|
||||
strconv.Itoa(l.Size()),
|
||||
detect(referer, "-"),
|
||||
detect(userAgent, "-"),
|
||||
r.Header.Get("Request-Id"),
|
||||
fmt.Sprintf("%s", time.Since(start)),
|
||||
}
|
||||
|
||||
return strings.Join(fields, " ")
|
||||
}
|
||||
|
||||
// detect detects the first presense of a non blank string and returns it
|
||||
func detect(values ...string) string {
|
||||
for _, v := range values {
|
||||
if v != "" {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// parses the username either from the url or auth header
|
||||
func parseUsername(r *http.Request) string {
|
||||
var (
|
||||
username = ""
|
||||
url = r.URL
|
||||
)
|
||||
|
||||
// get username from the url if passed there
|
||||
if url.User != nil {
|
||||
if name := url.User.Username(); name != "" {
|
||||
username = name
|
||||
}
|
||||
}
|
||||
|
||||
// Try to get the username from the query param 'u'
|
||||
q := url.Query()
|
||||
if u := q.Get("u"); u != "" {
|
||||
username = u
|
||||
}
|
||||
|
||||
// Try to get it from the authorization header if set there
|
||||
if username == "" {
|
||||
if u, _, ok := r.BasicAuth(); ok {
|
||||
username = u
|
||||
}
|
||||
}
|
||||
return username
|
||||
}
|
||||
|
||||
// Sanitize passwords from query string for logging.
|
||||
func sanitize(r *http.Request, s string) {
|
||||
r.URL.RawQuery = strings.Replace(r.URL.RawQuery, s, "[REDACTED]", -1)
|
||||
}
|
|
@ -22,8 +22,6 @@ import (
|
|||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const leaderWaitTimeout = 30 * time.Second
|
||||
|
||||
// statistics gathered by the openTSDB package.
|
||||
const (
|
||||
statHTTPConnectionsHandled = "httpConnsHandled"
|
||||
|
|
308
tsdb/cursor.go
308
tsdb/cursor.go
|
@ -1,12 +1,5 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
)
|
||||
|
||||
// EOF represents a "not found" key returned by a Cursor.
|
||||
const EOF = int64(-1)
|
||||
|
||||
|
@ -16,304 +9,3 @@ type Cursor interface {
|
|||
Next() (key int64, value interface{})
|
||||
Ascending() bool
|
||||
}
|
||||
|
||||
// MultiCursor returns a single cursor that combines the results of all cursors in order.
|
||||
//
|
||||
// If the same key is returned from multiple cursors then the first cursor
|
||||
// specified will take precendence. A key will only be returned once from the
|
||||
// returned cursor.
|
||||
func MultiCursor(cursors ...Cursor) Cursor {
|
||||
return &multiCursor{
|
||||
cursors: cursors,
|
||||
}
|
||||
}
|
||||
|
||||
// multiCursor represents a cursor that combines multiple cursors into one.
|
||||
type multiCursor struct {
|
||||
cursors []Cursor
|
||||
heap cursorHeap
|
||||
prev int64 // previously read key
|
||||
}
|
||||
|
||||
// Seek moves the cursor to a given key.
|
||||
func (mc *multiCursor) SeekTo(seek int64) (int64, interface{}) {
|
||||
// Initialize heap.
|
||||
h := make(cursorHeap, 0, len(mc.cursors))
|
||||
for i, c := range mc.cursors {
|
||||
// Move cursor to position. Skip if it's empty.
|
||||
k, v := c.SeekTo(seek)
|
||||
if k == EOF {
|
||||
continue
|
||||
}
|
||||
|
||||
// Append cursor to heap.
|
||||
h = append(h, &cursorHeapItem{
|
||||
key: k,
|
||||
value: v,
|
||||
cursor: c,
|
||||
priority: len(mc.cursors) - i,
|
||||
})
|
||||
}
|
||||
|
||||
heap.Init(&h)
|
||||
mc.heap = h
|
||||
mc.prev = EOF
|
||||
|
||||
return mc.pop()
|
||||
}
|
||||
|
||||
// Ascending returns the direction of the first cursor.
|
||||
func (mc *multiCursor) Ascending() bool {
|
||||
if len(mc.cursors) == 0 {
|
||||
return true
|
||||
}
|
||||
return mc.cursors[0].Ascending()
|
||||
}
|
||||
|
||||
// Next returns the next key/value from the cursor.
|
||||
func (mc *multiCursor) Next() (int64, interface{}) { return mc.pop() }
|
||||
|
||||
// pop returns the next item from the heap.
|
||||
// Reads the next key/value from item's cursor and puts it back on the heap.
|
||||
func (mc *multiCursor) pop() (key int64, value interface{}) {
|
||||
// Read items until we have a key that doesn't match the previously read one.
|
||||
// This is to perform deduplication when there's multiple items with the same key.
|
||||
// The highest priority cursor will be read first and then remaining keys will be dropped.
|
||||
for {
|
||||
// Return EOF marker if there are no more items left.
|
||||
if len(mc.heap) == 0 {
|
||||
return EOF, nil
|
||||
}
|
||||
|
||||
// Read the next item from the heap.
|
||||
item := heap.Pop(&mc.heap).(*cursorHeapItem)
|
||||
|
||||
// Save the key/value for return.
|
||||
key, value = item.key, item.value
|
||||
|
||||
// Read the next item from the cursor. Push back to heap if one exists.
|
||||
if item.key, item.value = item.cursor.Next(); item.key != EOF {
|
||||
heap.Push(&mc.heap, item)
|
||||
}
|
||||
|
||||
// Skip if this key matches the previously returned one.
|
||||
if key == mc.prev {
|
||||
continue
|
||||
}
|
||||
|
||||
mc.prev = key
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// cursorHeap represents a heap of cursorHeapItems.
|
||||
type cursorHeap []*cursorHeapItem
|
||||
|
||||
func (h cursorHeap) Len() int { return len(h) }
|
||||
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||
func (h cursorHeap) Less(i, j int) bool {
|
||||
// Use priority if the keys are the same.
|
||||
if h[i].key == h[j].key {
|
||||
return h[i].priority > h[j].priority
|
||||
}
|
||||
|
||||
// Otherwise compare based on cursor direction.
|
||||
if h[i].cursor.Ascending() {
|
||||
return h[i].key < h[j].key
|
||||
}
|
||||
return h[i].key > h[j].key
|
||||
}
|
||||
|
||||
func (h *cursorHeap) Push(x interface{}) {
|
||||
*h = append(*h, x.(*cursorHeapItem))
|
||||
}
|
||||
|
||||
func (h *cursorHeap) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return item
|
||||
}
|
||||
|
||||
// cursorHeapItem is something we manage in a priority queue.
|
||||
type cursorHeapItem struct {
|
||||
key int64
|
||||
value interface{}
|
||||
cursor Cursor
|
||||
priority int
|
||||
}
|
||||
|
||||
// bufCursor represents a buffered cursor that is initialized at a time.
|
||||
// This cursor does not allow seeking after initial seek.
|
||||
type bufCursor struct {
|
||||
cur Cursor
|
||||
buf *struct {
|
||||
key int64
|
||||
value interface{}
|
||||
}
|
||||
}
|
||||
|
||||
// newBufCursor returns a new instance of bufCursor that wraps cur.
|
||||
func newBufCursor(cur Cursor, seek int64) *bufCursor {
|
||||
c := &bufCursor{cur: cur}
|
||||
|
||||
// Limit min seek to zero.
|
||||
if seek < 0 {
|
||||
seek = 0
|
||||
}
|
||||
|
||||
// Fill buffer, if seekable.
|
||||
k, v := cur.SeekTo(seek)
|
||||
if k != EOF {
|
||||
c.buf = &struct {
|
||||
key int64
|
||||
value interface{}
|
||||
}{k, v}
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// SeekTo panics if called. Cursor can only be seeked on initialization.
|
||||
func (c *bufCursor) SeekTo(seek int64) (key int64, value interface{}) { panic("unseekable") }
|
||||
|
||||
// Next returns the next key & value from the underlying cursor.
|
||||
func (c *bufCursor) Next() (key int64, value interface{}) {
|
||||
if c.buf != nil {
|
||||
key, value = c.buf.key, c.buf.value
|
||||
c.buf = nil
|
||||
return
|
||||
}
|
||||
return c.cur.Next()
|
||||
}
|
||||
|
||||
// Ascending returns true if the cursor traverses in ascending order.
|
||||
func (c *bufCursor) Ascending() bool { return c.cur.Ascending() }
|
||||
|
||||
// FloatCursorIterator represents a wrapper for Cursor to produce an influxql.FloatIterator.
|
||||
type FloatCursorIterator struct {
|
||||
cursor *bufCursor
|
||||
opt influxql.IteratorOptions
|
||||
ref *influxql.VarRef
|
||||
tags influxql.Tags
|
||||
point influxql.FloatPoint // reuseable point to emit
|
||||
}
|
||||
|
||||
// NewFloatCursorIterator returns a new instance of FloatCursorIterator.
|
||||
func NewFloatCursorIterator(name string, tagMap map[string]string, cur Cursor, opt influxql.IteratorOptions) *FloatCursorIterator {
|
||||
// Extract variable reference if available.
|
||||
var ref *influxql.VarRef
|
||||
if opt.Expr != nil {
|
||||
ref = opt.Expr.(*influxql.VarRef)
|
||||
}
|
||||
|
||||
// Only allocate aux values if we have any requested.
|
||||
var aux []interface{}
|
||||
if len(opt.Aux) > 0 {
|
||||
aux = make([]interface{}, len(opt.Aux))
|
||||
}
|
||||
|
||||
// Convert to influxql tags.
|
||||
tags := influxql.NewTags(tagMap)
|
||||
|
||||
// Determine initial seek position based on sort direction.
|
||||
seek := opt.StartTime
|
||||
if !opt.Ascending {
|
||||
seek = opt.EndTime
|
||||
}
|
||||
|
||||
return &FloatCursorIterator{
|
||||
point: influxql.FloatPoint{
|
||||
Name: name,
|
||||
Tags: tags.Subset(opt.Dimensions),
|
||||
Aux: aux,
|
||||
},
|
||||
opt: opt,
|
||||
ref: ref,
|
||||
tags: tags,
|
||||
cursor: newBufCursor(cur, seek),
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the iterator.
|
||||
func (itr *FloatCursorIterator) Close() error { return nil }
|
||||
|
||||
// Next returns the next point from the cursor.
|
||||
func (itr *FloatCursorIterator) Next() *influxql.FloatPoint {
|
||||
for {
|
||||
// Read next key/value and emit nil if at the end.
|
||||
timestamp, value := itr.cursor.Next()
|
||||
if timestamp == EOF {
|
||||
return nil
|
||||
} else if itr.opt.Ascending && timestamp > itr.opt.EndTime {
|
||||
return nil
|
||||
} else if !itr.opt.Ascending && timestamp < itr.opt.StartTime {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set timestamp on point.
|
||||
itr.point.Time = timestamp
|
||||
|
||||
// Retrieve tags key/value map.
|
||||
tags := itr.tags.KeyValues()
|
||||
|
||||
// If value is a map then extract all the fields.
|
||||
if m, ok := value.(map[string]interface{}); ok {
|
||||
// If filter fails then skip to the next value.
|
||||
if itr.opt.Condition != nil && !influxql.EvalBool(itr.opt.Condition, m) {
|
||||
continue
|
||||
}
|
||||
|
||||
if itr.ref != nil {
|
||||
fv, ok := m[itr.ref.Val].(float64)
|
||||
if !ok {
|
||||
continue // read next point
|
||||
}
|
||||
itr.point.Value = fv
|
||||
} else {
|
||||
itr.point.Value = math.NaN()
|
||||
}
|
||||
|
||||
// Read all auxilary fields.
|
||||
for i, ref := range itr.opt.Aux {
|
||||
if v, ok := m[ref.Val]; ok {
|
||||
itr.point.Aux[i] = v
|
||||
} else if s, ok := tags[ref.Val]; ok {
|
||||
itr.point.Aux[i] = s
|
||||
} else {
|
||||
itr.point.Aux[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return &itr.point
|
||||
}
|
||||
|
||||
// Otherwise expect value to be of an appropriate type.
|
||||
if itr.ref != nil {
|
||||
// If filter fails then skip to the next value.
|
||||
if itr.opt.Condition != nil && !influxql.EvalBool(itr.opt.Condition, map[string]interface{}{itr.ref.Val: value}) {
|
||||
continue
|
||||
}
|
||||
|
||||
fv, ok := value.(float64)
|
||||
if !ok {
|
||||
continue // read next point
|
||||
}
|
||||
itr.point.Value = fv
|
||||
} else {
|
||||
itr.point.Value = math.NaN()
|
||||
}
|
||||
|
||||
// Read all auxilary fields.
|
||||
for i, ref := range itr.opt.Aux {
|
||||
if tagValue, ok := tags[ref.Val]; ok {
|
||||
itr.point.Aux[i] = tagValue
|
||||
} else {
|
||||
itr.point.Aux[i] = value
|
||||
}
|
||||
}
|
||||
|
||||
return &itr.point
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,514 +0,0 @@
|
|||
package tsdb_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"testing/quick"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/pkg/deep"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across a single subcursor.
|
||||
func TestMultiCursor_Single(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 1, Value: 10},
|
||||
{Key: 2, Value: 20},
|
||||
}, true))
|
||||
|
||||
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across a single subcursor in reverse order.
|
||||
func TestMultiCursor_Single_Reverse(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 1, Value: 10},
|
||||
{Key: 2, Value: 20},
|
||||
}, false))
|
||||
|
||||
if k, v := mc.SeekTo(2); k != 2 || v.(int) != 20 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 0 || v.(int) != 0 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 3, Value: 30},
|
||||
{Key: 4, Value: 40},
|
||||
}, true),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 1, Value: 10},
|
||||
{Key: 2, Value: 20},
|
||||
}, true),
|
||||
)
|
||||
|
||||
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 3 || v.(int) != 30 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 4 || v.(int) != 40 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_NonOverlapping_Reverse(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 3, Value: 30},
|
||||
{Key: 4, Value: 40},
|
||||
}, false),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 1, Value: 10},
|
||||
{Key: 2, Value: 20},
|
||||
}, false),
|
||||
)
|
||||
|
||||
if k, v := mc.SeekTo(4); k != 4 || v.(int) != 40 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 3 || v.(int) != 30 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 2 || v.(int) != 20 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 1 || v.(int) != 10 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 0 || v.(int) != 00 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_Overlapping(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 3, Value: 3},
|
||||
{Key: 4, Value: 4},
|
||||
}, true),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0xF0},
|
||||
{Key: 2, Value: 0xF2},
|
||||
{Key: 4, Value: 0xF4},
|
||||
}, true),
|
||||
)
|
||||
|
||||
if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 3 || v.(int) != 3 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 4 || v.(int) != 4 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors.
|
||||
func TestMultiCursor_Multiple_Overlapping_Reverse(t *testing.T) {
|
||||
mc := tsdb.MultiCursor(
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0},
|
||||
{Key: 3, Value: 3},
|
||||
{Key: 4, Value: 4},
|
||||
}, false),
|
||||
NewCursor([]CursorItem{
|
||||
{Key: 0, Value: 0xF0},
|
||||
{Key: 2, Value: 0xF2},
|
||||
{Key: 4, Value: 0xF4},
|
||||
}, false),
|
||||
)
|
||||
|
||||
if k, v := mc.SeekTo(4); k != 4 || v.(int) != 4 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 3 || v.(int) != 3 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != 0 || v.(int) != 0 {
|
||||
t.Fatalf("unexpected key/value: %x / %x", k, v)
|
||||
} else if k, v = mc.Next(); k != tsdb.EOF {
|
||||
t.Fatalf("expected eof, got: %x / %x", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the multi-cursor can handle randomly generated data.
|
||||
func TestMultiCursor_Quick(t *testing.T) {
|
||||
quick.Check(func(useek uint64, cursors []Cursor) bool {
|
||||
var got, exp []CursorItem
|
||||
seek := int64(useek) % 100
|
||||
|
||||
// Merge all cursor data to determine expected output.
|
||||
// First seen key overrides all other items with the same key.
|
||||
m := make(map[int64]CursorItem)
|
||||
for _, c := range cursors {
|
||||
for _, item := range c.items {
|
||||
if item.Key < seek {
|
||||
continue
|
||||
}
|
||||
if _, ok := m[item.Key]; ok {
|
||||
continue
|
||||
}
|
||||
m[item.Key] = item
|
||||
}
|
||||
}
|
||||
|
||||
// Convert map back to single item list.
|
||||
for _, item := range m {
|
||||
exp = append(exp, item)
|
||||
}
|
||||
sort.Sort(CursorItems(exp))
|
||||
|
||||
// Create multi-cursor and iterate over all items.
|
||||
mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...)
|
||||
for k, v := mc.SeekTo(seek); k != tsdb.EOF; k, v = mc.Next() {
|
||||
got = append(got, CursorItem{k, v.(int)})
|
||||
}
|
||||
|
||||
// Verify results.
|
||||
if !reflect.DeepEqual(got, exp) {
|
||||
t.Fatalf("mismatch: seek=%d\n\ngot=%+v\n\nexp=%+v", seek, got, exp)
|
||||
}
|
||||
|
||||
return true
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// Ensure a cursor with a single ref value can be converted into an iterator.
|
||||
func TestFloatCursorIterator_SingleValue(t *testing.T) {
|
||||
cur := NewCursor([]CursorItem{
|
||||
{Key: 0, Value: float64(100)},
|
||||
{Key: 3, Value: float64(200)},
|
||||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Expr: &influxql.VarRef{Val: "value"},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
}
|
||||
itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt)
|
||||
defer itr.Close()
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 0,
|
||||
Value: float64(100),
|
||||
}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 3,
|
||||
Value: float64(200),
|
||||
}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a cursor with a ref and multiple aux values can be converted into an iterator.
|
||||
func TestFloatCursorIterator_MultipleValues(t *testing.T) {
|
||||
cur := NewCursor([]CursorItem{
|
||||
{Key: 0, Value: map[string]interface{}{"val1": float64(100), "val2": "foo"}},
|
||||
{Key: 3, Value: map[string]interface{}{"val1": float64(200), "val2": "bar"}},
|
||||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Expr: &influxql.VarRef{Val: "val1"}, Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
}
|
||||
itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt)
|
||||
defer itr.Close()
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 0,
|
||||
Value: 100,
|
||||
Aux: []interface{}{float64(100), "foo"},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 3,
|
||||
Value: 200,
|
||||
Aux: []interface{}{float64(200), "bar"},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a cursor with a single value can be converted into an iterator.
|
||||
func TestFloatCursorIterator_Aux_SingleValue(t *testing.T) {
|
||||
cur := NewCursor([]CursorItem{
|
||||
{Key: 0, Value: float64(100)},
|
||||
{Key: 3, Value: float64(200)},
|
||||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Aux: []influxql.VarRef{{Val: "val1"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
}
|
||||
itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt)
|
||||
defer itr.Close()
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 0,
|
||||
Value: math.NaN(),
|
||||
Aux: []interface{}{float64(100)},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 3,
|
||||
Value: math.NaN(),
|
||||
Aux: []interface{}{float64(200)},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a cursor with multiple values can be converted into an iterator.
|
||||
func TestFloatCursorIterator_Aux_MultipleValues(t *testing.T) {
|
||||
cur := NewCursor([]CursorItem{
|
||||
{Key: 0, Value: map[string]interface{}{"val1": float64(100), "val2": "foo"}},
|
||||
{Key: 3, Value: map[string]interface{}{"val1": float64(200), "val2": "bar"}},
|
||||
}, true)
|
||||
|
||||
opt := influxql.IteratorOptions{
|
||||
Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}},
|
||||
Ascending: true,
|
||||
StartTime: influxql.MinTime,
|
||||
EndTime: influxql.MaxTime,
|
||||
}
|
||||
itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt)
|
||||
defer itr.Close()
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 0,
|
||||
Value: math.NaN(),
|
||||
Aux: []interface{}{float64(100), "foo"},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{
|
||||
Name: "series0",
|
||||
Time: 3,
|
||||
Value: math.NaN(),
|
||||
Aux: []interface{}{float64(200), "bar"},
|
||||
}) {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
|
||||
if p := itr.Next(); p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure a cursor iterator does not go past the end time.
|
||||
func TestFloatCursorIterator_EndTime(t *testing.T) {
|
||||
cur := NewCursor([]CursorItem{
|
||||
{Key: 0, Value: float64(100)},
|
||||
{Key: 3, Value: float64(200)},
|
||||
{Key: 4, Value: float64(300)},
|
||||
}, true)
|
||||
|
||||
itr := tsdb.NewFloatCursorIterator("x", nil, cur, influxql.IteratorOptions{
|
||||
Expr: &influxql.VarRef{Val: "value"},
|
||||
Ascending: true,
|
||||
EndTime: 3,
|
||||
})
|
||||
defer itr.Close()
|
||||
|
||||
// Verify that only two points are emitted.
|
||||
if p := itr.Next(); p == nil || p.Time != 0 {
|
||||
t.Fatalf("unexpected point(0): %s", spew.Sdump(p))
|
||||
}
|
||||
if p := itr.Next(); p == nil || p.Time != 3 {
|
||||
t.Fatalf("unexpected point(1): %s", spew.Sdump(p))
|
||||
}
|
||||
if p := itr.Next(); p != nil {
|
||||
t.Fatalf("expected eof, got: %s", spew.Sdump(p))
|
||||
}
|
||||
}
|
||||
|
||||
// Cursor represents an in-memory test cursor.
|
||||
type Cursor struct {
|
||||
items []CursorItem
|
||||
index int
|
||||
ascending bool
|
||||
}
|
||||
|
||||
// NewCursor returns a new instance of Cursor.
|
||||
func NewCursor(items []CursorItem, ascending bool) *Cursor {
|
||||
index := 0
|
||||
sort.Sort(CursorItems(items))
|
||||
|
||||
if !ascending {
|
||||
index = len(items)
|
||||
}
|
||||
return &Cursor{
|
||||
items: items,
|
||||
index: index,
|
||||
ascending: ascending,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cursor) Ascending() bool { return c.ascending }
|
||||
|
||||
// Seek seeks to an item by key.
|
||||
func (c *Cursor) SeekTo(seek int64) (key int64, value interface{}) {
|
||||
if c.ascending {
|
||||
return c.seekForward(seek)
|
||||
}
|
||||
return c.seekReverse(seek)
|
||||
}
|
||||
|
||||
func (c *Cursor) seekForward(seek int64) (key int64, value interface{}) {
|
||||
for c.index = 0; c.index < len(c.items); c.index++ {
|
||||
if c.items[c.index].Key < seek { // skip keys less than seek
|
||||
continue
|
||||
}
|
||||
key, value = c.items[c.index].Key, c.items[c.index].Value
|
||||
c.index++
|
||||
return key, value
|
||||
}
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
|
||||
func (c *Cursor) seekReverse(seek int64) (key int64, value interface{}) {
|
||||
for c.index = len(c.items) - 1; c.index >= 0; c.index-- {
|
||||
if c.items[c.index].Key > seek { // skip keys greater than seek
|
||||
continue
|
||||
}
|
||||
key, value = c.items[c.index].Key, c.items[c.index].Value
|
||||
c.index--
|
||||
return key, value
|
||||
}
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
|
||||
// Next returns the next key/value pair.
|
||||
func (c *Cursor) Next() (key int64, value interface{}) {
|
||||
if !c.ascending && c.index < 0 {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
|
||||
if c.ascending && c.index >= len(c.items) {
|
||||
return tsdb.EOF, nil
|
||||
}
|
||||
|
||||
k, v := c.items[c.index].Key, c.items[c.index].Value
|
||||
|
||||
if c.ascending {
|
||||
c.index++
|
||||
} else {
|
||||
c.index--
|
||||
}
|
||||
return k, v
|
||||
}
|
||||
|
||||
// Generate returns a randomly generated cursor. Implements quick.Generator.
|
||||
func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value {
|
||||
c.index = 0
|
||||
c.ascending = true
|
||||
|
||||
c.items = make([]CursorItem, rand.Intn(size))
|
||||
for i := range c.items {
|
||||
c.items[i] = CursorItem{
|
||||
Key: rand.Int63n(int64(size)),
|
||||
Value: rand.Int(),
|
||||
}
|
||||
}
|
||||
|
||||
// Sort items by key.
|
||||
sort.Sort(CursorItems(c.items))
|
||||
|
||||
return reflect.ValueOf(c)
|
||||
}
|
||||
|
||||
// tsdbCursorSlice converts a Cursor slice to a tsdb.Cursor slice.
|
||||
func tsdbCursorSlice(a []Cursor) []tsdb.Cursor {
|
||||
var other []tsdb.Cursor
|
||||
for i := range a {
|
||||
other = append(other, &a[i])
|
||||
}
|
||||
return other
|
||||
}
|
||||
|
||||
// CursorItem represents a key/value pair in a cursor.
|
||||
type CursorItem struct {
|
||||
Key int64
|
||||
Value interface{}
|
||||
}
|
||||
|
||||
type CursorItems []CursorItem
|
||||
|
||||
func (a CursorItems) Len() int { return len(a) }
|
||||
func (a CursorItems) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a CursorItems) Less(i, j int) bool { return a[i].Key < a[j].Key }
|
||||
|
||||
// byteSlices represents a sortable slice of byte slices.
|
||||
type byteSlices [][]byte
|
||||
|
||||
func (a byteSlices) Len() int { return len(a) }
|
||||
func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
|
@ -1,7 +1,6 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -92,7 +91,7 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
|
|||
}
|
||||
|
||||
// If it's a dir then it's a tsm1 engine
|
||||
format := "tsm1"
|
||||
format := DefaultEngine
|
||||
if fi, err := os.Stat(path); err != nil {
|
||||
return nil, err
|
||||
} else if !fi.Mode().IsDir() {
|
||||
|
@ -124,30 +123,3 @@ func NewEngineOptions() EngineOptions {
|
|||
Config: NewConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
// DedupeEntries returns slices with unique keys (the first 8 bytes).
|
||||
func DedupeEntries(a [][]byte) [][]byte {
|
||||
// Convert to a map where the last slice is used.
|
||||
m := make(map[string][]byte)
|
||||
for _, b := range a {
|
||||
m[string(b[0:8])] = b
|
||||
}
|
||||
|
||||
// Convert map back to a slice of byte slices.
|
||||
other := make([][]byte, 0, len(m))
|
||||
for _, v := range m {
|
||||
other = append(other, v)
|
||||
}
|
||||
|
||||
// Sort entries.
|
||||
sort.Sort(ByteSlices(other))
|
||||
|
||||
return other
|
||||
}
|
||||
|
||||
// ByteSlices wraps a list of byte-slices for sorting.
|
||||
type ByteSlices [][]byte
|
||||
|
||||
func (a ByteSlices) Len() int { return len(a) }
|
||||
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
|
||||
|
|
|
@ -470,7 +470,6 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq
|
|||
_, tags, _ := models.ParseKey(seriesKey)
|
||||
|
||||
s := tsdb.NewSeries(seriesKey, tags)
|
||||
s.InitializeShards()
|
||||
index.CreateSeriesIndexIfNotExists(measurement, s)
|
||||
s.AssignShard(shardID)
|
||||
|
||||
|
|
92
tsdb/meta.go
92
tsdb/meta.go
|
@ -6,7 +6,6 @@ import (
|
|||
"regexp"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
|
@ -20,8 +19,6 @@ import (
|
|||
//go:generate protoc --gogo_out=. internal/meta.proto
|
||||
|
||||
const (
|
||||
maxStringLength = 64 * 1024
|
||||
|
||||
statDatabaseSeries = "numSeries" // number of series in this database
|
||||
statDatabaseMeasurements = "numMeasurements" // number of measurements in this database
|
||||
)
|
||||
|
@ -494,10 +491,6 @@ func (d *DatabaseIndex) DropSeries(keys []string) {
|
|||
d.statMap.Add(statDatabaseSeries, -nDeleted)
|
||||
}
|
||||
|
||||
const (
|
||||
statMeasurementSeries = "numSeries" // number of series contained in this measurement
|
||||
)
|
||||
|
||||
// Measurement represents a collection of time series in a database. It also contains in memory
|
||||
// structures for indexing tags. Exported functions are goroutine safe while un-exported functions
|
||||
// assume the caller will use the appropriate locks
|
||||
|
@ -1338,40 +1331,6 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
|
|||
return out
|
||||
}
|
||||
|
||||
// SelectFields returns a list of fields in the SELECT section of stmt.
|
||||
func (m *Measurement) SelectFields(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInSelect() {
|
||||
if m.HasField(name) {
|
||||
set.add(name)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
// SelectTags returns a list of non-field tags in the SELECT section of stmt.
|
||||
func (m *Measurement) SelectTags(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInSelect() {
|
||||
if !m.HasField(name) && m.HasTagKey(name) {
|
||||
set.add(name)
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
// WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
|
||||
func (m *Measurement) WhereFields(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInWhere() {
|
||||
if name != "time" && m.HasField(name) {
|
||||
set.add(name)
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
// Measurements represents a list of *Measurement.
|
||||
type Measurements []*Measurement
|
||||
|
||||
|
@ -1379,45 +1338,6 @@ func (a Measurements) Len() int { return len(a) }
|
|||
func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name }
|
||||
func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// SelectFields returns a list of fields in the SELECT section of stmt.
|
||||
func (a Measurements) SelectFields(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInSelect() {
|
||||
for _, m := range a {
|
||||
if m.HasField(name) {
|
||||
set.add(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
// SelectTags returns a list of non-field tags in the SELECT section of stmt.
|
||||
func (a Measurements) SelectTags(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInSelect() {
|
||||
for _, m := range a {
|
||||
if !m.HasField(name) && m.HasTagKey(name) {
|
||||
set.add(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
// WhereFields returns a list of non-"time" fields in the WHERE section of stmt.
|
||||
func (a Measurements) WhereFields(stmt *influxql.SelectStatement) []string {
|
||||
set := newStringSet()
|
||||
for _, name := range stmt.NamesInWhere() {
|
||||
for _, m := range a {
|
||||
if name != "time" && m.HasField(name) {
|
||||
set.add(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return set.list()
|
||||
}
|
||||
|
||||
func (a Measurements) intersect(other Measurements) Measurements {
|
||||
l := a
|
||||
r := other
|
||||
|
@ -1552,13 +1472,6 @@ func (s *Series) UnmarshalBinary(buf []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// InitializeShards initializes the list of shards.
|
||||
func (s *Series) InitializeShards() {
|
||||
s.mu.Lock()
|
||||
s.shardIDs = make(map[uint64]bool)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// SeriesIDs is a convenience type for sorting, checking equality, and doing
|
||||
// union and intersection of collections of series ids.
|
||||
type SeriesIDs []uint64
|
||||
|
@ -1714,11 +1627,6 @@ func MarshalTags(tags map[string]string) []byte {
|
|||
return b
|
||||
}
|
||||
|
||||
// timeBetweenInclusive returns true if t is between min and max, inclusive.
|
||||
func timeBetweenInclusive(t, min, max time.Time) bool {
|
||||
return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max))
|
||||
}
|
||||
|
||||
// TagKeys returns a list of the measurement's tag names.
|
||||
func (m *Measurement) TagKeys() []string {
|
||||
m.mu.RLock()
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
|
@ -1240,33 +1239,3 @@ func (itr *measurementKeysIterator) Next() (*influxql.FloatPoint, error) {
|
|||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
|
||||
func IsNumeric(c *influxql.Call) bool {
|
||||
switch c.Name {
|
||||
case "count", "first", "last", "distinct":
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// mustMarshal encodes a value to JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid marshal will cause corruption and a panic is appropriate.
|
||||
func mustMarshalJSON(v interface{}) []byte {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
panic("marshal: " + err.Error())
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// mustUnmarshalJSON decodes a value from JSON.
|
||||
// This will panic if an error occurs. This should only be used internally when
|
||||
// an invalid unmarshal will cause corruption and a panic is appropriate.
|
||||
func mustUnmarshalJSON(b []byte, v interface{}) {
|
||||
if err := json.Unmarshal(b, v); err != nil {
|
||||
panic("unmarshal: " + err.Error())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -26,10 +25,6 @@ var (
|
|||
ErrStoreClosed = fmt.Errorf("store is closed")
|
||||
)
|
||||
|
||||
const (
|
||||
maintenanceCheckInterval = time.Minute
|
||||
)
|
||||
|
||||
// Store manages shards and indexes for databases.
|
||||
type Store struct {
|
||||
mu sync.RWMutex
|
||||
|
@ -780,18 +775,6 @@ func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) mode
|
|||
return filteredSeries
|
||||
}
|
||||
|
||||
// IsRetryable returns true if this error is temporary and could be retried
|
||||
func IsRetryable(err error) bool {
|
||||
if err == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "field type conflict") {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// DecodeStorePath extracts the database and retention policy names
|
||||
// from a given shard or WAL path.
|
||||
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {
|
||||
|
|
Loading…
Reference in New Issue