From 497db2a6d3c2393562bb805dcb5ec02603c2961c Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Mon, 20 Jun 2016 22:41:07 -0500 Subject: [PATCH] Removing dead code from every package except influxql The tsdb package had a substantial amount of dead code related to the old query engine still in there. It is no longer used, so it was removed since it was left unmaintained. There is likely still more code that is the same, but wasn't found as part of this code cleanup. influxql has dead code show up because of the code generation so it is not included in this pruning. --- coordinator/points_writer.go | 19 +- coordinator/statement_executor.go | 6 - influxql/iterator.gen.go | 8 - influxql/iterator.gen.go.tmpl | 2 - models/points.go | 28 -- monitor/service.go | 11 - services/collectd/service.go | 9 - services/continuous_querier/service.go | 5 +- services/graphite/service.go | 6 +- services/httpd/response_logger.go | 6 - services/meta/client.go | 16 - services/meta/response_logger.go | 161 -------- services/opentsdb/service.go | 2 - tsdb/cursor.go | 308 --------------- tsdb/cursor_test.go | 514 ------------------------- tsdb/engine.go | 30 +- tsdb/engine/tsm1/engine.go | 1 - tsdb/meta.go | 92 ----- tsdb/shard.go | 31 -- tsdb/store.go | 17 - 20 files changed, 12 insertions(+), 1260 deletions(-) delete mode 100644 services/meta/response_logger.go delete mode 100644 tsdb/cursor_test.go diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 30fe74fd25..ed018d7b3a 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -17,17 +17,14 @@ import ( // The statistics generated by the "write" mdoule const ( - statWriteReq = "req" - statPointWriteReq = "pointReq" - statPointWriteReqLocal = "pointReqLocal" - statPointWriteReqRemote = "pointReqRemote" - statWriteOK = "writeOk" - statWritePartial = "writePartial" - statWriteTimeout = "writeTimeout" - statWriteErr = "writeError" - statWritePointReqHH = "pointReqHH" - statSubWriteOK = "subWriteOk" - statSubWriteDrop = "subWriteDrop" + statWriteReq = "req" + statPointWriteReq = "pointReq" + statPointWriteReqLocal = "pointReqLocal" + statWriteOK = "writeOk" + statWriteTimeout = "writeTimeout" + statWriteErr = "writeError" + statSubWriteOK = "subWriteOk" + statSubWriteDrop = "subWriteDrop" ) var ( diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 481316cfa0..a4b11a861e 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -1242,12 +1242,6 @@ func (s stringSet) intersect(o stringSet) stringSet { return ns } -type uint64Slice []uint64 - -func (a uint64Slice) Len() int { return len(a) } -func (a uint64Slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a uint64Slice) Less(i, j int) bool { return a[i] < a[j] } - type keyValue struct { key, value string } diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 7fbc1cf581..81d810d9df 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -299,7 +299,6 @@ func (h *floatMergeHeap) Pop() interface{} { type floatMergeHeapItem struct { itr *bufFloatIterator - err error } // floatSortedMergeIterator is an iterator that sorts and merges multiple iterators into one. @@ -308,7 +307,6 @@ type floatSortedMergeIterator struct { opt IteratorOptions heap floatSortedMergeHeap init bool - point FloatPoint } // newFloatSortedMergeIterator returns an instance of floatSortedMergeIterator. @@ -2357,7 +2355,6 @@ func (h *integerMergeHeap) Pop() interface{} { type integerMergeHeapItem struct { itr *bufIntegerIterator - err error } // integerSortedMergeIterator is an iterator that sorts and merges multiple iterators into one. @@ -2366,7 +2363,6 @@ type integerSortedMergeIterator struct { opt IteratorOptions heap integerSortedMergeHeap init bool - point IntegerPoint } // newIntegerSortedMergeIterator returns an instance of integerSortedMergeIterator. @@ -4412,7 +4408,6 @@ func (h *stringMergeHeap) Pop() interface{} { type stringMergeHeapItem struct { itr *bufStringIterator - err error } // stringSortedMergeIterator is an iterator that sorts and merges multiple iterators into one. @@ -4421,7 +4416,6 @@ type stringSortedMergeIterator struct { opt IteratorOptions heap stringSortedMergeHeap init bool - point StringPoint } // newStringSortedMergeIterator returns an instance of stringSortedMergeIterator. @@ -6467,7 +6461,6 @@ func (h *booleanMergeHeap) Pop() interface{} { type booleanMergeHeapItem struct { itr *bufBooleanIterator - err error } // booleanSortedMergeIterator is an iterator that sorts and merges multiple iterators into one. @@ -6476,7 +6469,6 @@ type booleanSortedMergeIterator struct { opt IteratorOptions heap booleanSortedMergeHeap init bool - point BooleanPoint } // newBooleanSortedMergeIterator returns an instance of booleanSortedMergeIterator. diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index be84a38f3a..3fa4c672ae 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -297,7 +297,6 @@ func (h *{{$k.name}}MergeHeap) Pop() interface{} { type {{$k.name}}MergeHeapItem struct { itr *buf{{$k.Name}}Iterator - err error } // {{$k.name}}SortedMergeIterator is an iterator that sorts and merges multiple iterators into one. @@ -306,7 +305,6 @@ type {{$k.name}}SortedMergeIterator struct { opt IteratorOptions heap {{$k.name}}SortedMergeHeap init bool - point {{$k.Name}}Point } // new{{$k.Name}}SortedMergeIterator returns an instance of {{$k.name}}SortedMergeIterator. diff --git a/models/points.go b/models/points.go index 758005dd6a..5e406d2d73 100644 --- a/models/points.go +++ b/models/points.go @@ -548,15 +548,6 @@ func less(buf []byte, indices []int, i, j int) bool { return bytes.Compare(a, b) < 0 } -func isFieldEscapeChar(b byte) bool { - for c := range escape.Codes { - if c == b { - return true - } - } - return false -} - // scanFields scans buf, starting at i for the fields section of a point. It returns // the ending position and the byte slice of the fields within buf func scanFields(buf []byte, i int) (int, []byte, error) { @@ -1589,22 +1580,3 @@ func (p Fields) MarshalBinary() []byte { } return b } - -type indexedSlice struct { - indices []int - b []byte -} - -func (s *indexedSlice) Less(i, j int) bool { - _, a := scanTo(s.b, s.indices[i], '=') - _, b := scanTo(s.b, s.indices[j], '=') - return bytes.Compare(a, b) < 0 -} - -func (s *indexedSlice) Swap(i, j int) { - s.indices[i], s.indices[j] = s.indices[j], s.indices[i] -} - -func (s *indexedSlice) Len() int { - return len(s.indices) -} diff --git a/monitor/service.go b/monitor/service.go index 9acae874ab..d907cb10ea 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -17,8 +17,6 @@ import ( "github.com/influxdata/influxdb/services/meta" ) -const leaderWaitTimeout = 30 * time.Second - // Policy constants. const ( MonitorRetentionPolicy = "monitor" @@ -405,15 +403,6 @@ type Statistic struct { Values map[string]interface{} `json:"values"` } -// newStatistic returns a new statistic object. -func newStatistic(name string, tags map[string]string, values map[string]interface{}) *Statistic { - return &Statistic{ - Name: name, - Tags: tags, - Values: values, - } -} - // valueNames returns a sorted list of the value names, if any. func (s *Statistic) ValueNames() []string { a := make([]string, 0, len(s.Values)) diff --git a/services/collectd/service.go b/services/collectd/service.go index c5c9d3f1fa..da68f37bb1 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -18,8 +18,6 @@ import ( "github.com/kimor79/gollectd" ) -const leaderWaitTimeout = 30 * time.Second - // statistics gathered by the collectd service. const ( statPointsReceived = "pointsRx" @@ -306,10 +304,3 @@ func (s *Service) UnmarshalCollectd(packet *gollectd.Packet) []models.Point { } return points } - -// assert will panic with a given formatted message if the given condition is false. -func assert(condition bool, msg string, v ...interface{}) { - if !condition { - panic(fmt.Sprintf("assert failed: "+msg, v...)) - } -} diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index fc6798bab2..5648e0643b 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -25,9 +25,8 @@ const ( // Statistics for the CQ service. const ( - statQueryOK = "queryOk" - statQueryFail = "queryFail" - statPointsWritten = "pointsWritten" + statQueryOK = "queryOk" + statQueryFail = "queryFail" ) // ContinuousQuerier represents a service that executes continuous queries. diff --git a/services/graphite/service.go b/services/graphite/service.go index 9518d3398f..abf5a47b46 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -20,10 +20,7 @@ import ( "github.com/influxdata/influxdb/tsdb" ) -const ( - udpBufferSize = 65536 - leaderWaitTimeout = 30 * time.Second -) +const udpBufferSize = 65536 // statistics gathered by the graphite package. const ( @@ -31,7 +28,6 @@ const ( statBytesReceived = "bytesRx" statPointsParseFail = "pointsParseFail" statPointsNaNFail = "pointsNaNFail" - statPointsUnsupported = "pointsUnsupportedFail" statBatchesTransmitted = "batchesTx" statPointsTransmitted = "pointsTx" statBatchesTransmitFail = "batchesTxFail" diff --git a/services/httpd/response_logger.go b/services/httpd/response_logger.go index aa237382b9..32201a4496 100644 --- a/services/httpd/response_logger.go +++ b/services/httpd/response_logger.go @@ -10,12 +10,6 @@ import ( "github.com/influxdata/influxdb/influxql" ) -type loggingResponseWriter interface { - http.ResponseWriter - Status() int - Size() int -} - // responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status // code and body size type responseLogger struct { diff --git a/services/meta/client.go b/services/meta/client.go index 8f08ea145a..9c4fed5028 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -24,14 +24,6 @@ import ( ) const ( - // errSleep is the time to sleep after we've failed on every metaserver - // before making another pass - errSleep = time.Second - - // maxRetries is the maximum number of attemps to make before returning - // a failure to the caller - maxRetries = 10 - // SaltBytes is the number of bytes used for salts SaltBytes = 32 @@ -1047,14 +1039,6 @@ func (c *Client) Load() error { return nil } -type errCommand struct { - msg string -} - -func (e errCommand) Error() string { - return e.msg -} - type uint64Slice []uint64 func (a uint64Slice) Len() int { return len(a) } diff --git a/services/meta/response_logger.go b/services/meta/response_logger.go deleted file mode 100644 index a515e23441..0000000000 --- a/services/meta/response_logger.go +++ /dev/null @@ -1,161 +0,0 @@ -package meta - -import ( - "fmt" - "net" - "net/http" - "strconv" - "strings" - "time" -) - -type loggingResponseWriter interface { - http.ResponseWriter - Status() int - Size() int -} - -// responseLogger is wrapper of http.ResponseWriter that keeps track of its HTTP status -// code and body size -type responseLogger struct { - w http.ResponseWriter - status int - size int -} - -func (l *responseLogger) CloseNotify() <-chan bool { - if notifier, ok := l.w.(http.CloseNotifier); ok { - return notifier.CloseNotify() - } - // needed for response recorder for testing - return make(<-chan bool) -} - -func (l *responseLogger) Header() http.Header { - return l.w.Header() -} - -func (l *responseLogger) Flush() { - l.w.(http.Flusher).Flush() -} - -func (l *responseLogger) Write(b []byte) (int, error) { - if l.status == 0 { - // Set status if WriteHeader has not been called - l.status = http.StatusOK - } - size, err := l.w.Write(b) - l.size += size - return size, err -} - -func (l *responseLogger) WriteHeader(s int) { - l.w.WriteHeader(s) - l.status = s -} - -func (l *responseLogger) Status() int { - if l.status == 0 { - // This can happen if we never actually write data, but only set response headers. - l.status = http.StatusOK - } - return l.status -} - -func (l *responseLogger) Size() int { - return l.size -} - -// redact any occurrence of a password parameter, 'p' -func redactPassword(r *http.Request) { - q := r.URL.Query() - if p := q.Get("p"); p != "" { - q.Set("p", "[REDACTED]") - r.URL.RawQuery = q.Encode() - } -} - -// Common Log Format: http://en.wikipedia.org/wiki/Common_Log_Format - -// buildLogLine creates a common log format -// in addition to the common fields, we also append referrer, user agent and request ID -func buildLogLine(l *responseLogger, r *http.Request, start time.Time) string { - - redactPassword(r) - - username := parseUsername(r) - - host, _, err := net.SplitHostPort(r.RemoteAddr) - - if err != nil { - host = r.RemoteAddr - } - - uri := r.URL.RequestURI() - - referer := r.Referer() - - userAgent := r.UserAgent() - - fields := []string{ - host, - "-", - detect(username, "-"), - fmt.Sprintf("[%s]", start.Format("02/Jan/2006:15:04:05 -0700")), - r.Method, - uri, - r.Proto, - detect(strconv.Itoa(l.Status()), "-"), - strconv.Itoa(l.Size()), - detect(referer, "-"), - detect(userAgent, "-"), - r.Header.Get("Request-Id"), - fmt.Sprintf("%s", time.Since(start)), - } - - return strings.Join(fields, " ") -} - -// detect detects the first presense of a non blank string and returns it -func detect(values ...string) string { - for _, v := range values { - if v != "" { - return v - } - } - return "" -} - -// parses the username either from the url or auth header -func parseUsername(r *http.Request) string { - var ( - username = "" - url = r.URL - ) - - // get username from the url if passed there - if url.User != nil { - if name := url.User.Username(); name != "" { - username = name - } - } - - // Try to get the username from the query param 'u' - q := url.Query() - if u := q.Get("u"); u != "" { - username = u - } - - // Try to get it from the authorization header if set there - if username == "" { - if u, _, ok := r.BasicAuth(); ok { - username = u - } - } - return username -} - -// Sanitize passwords from query string for logging. -func sanitize(r *http.Request, s string) { - r.URL.RawQuery = strings.Replace(r.URL.RawQuery, s, "[REDACTED]", -1) -} diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 9e887ab43e..5b71796da9 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -22,8 +22,6 @@ import ( "github.com/influxdata/influxdb/tsdb" ) -const leaderWaitTimeout = 30 * time.Second - // statistics gathered by the openTSDB package. const ( statHTTPConnectionsHandled = "httpConnsHandled" diff --git a/tsdb/cursor.go b/tsdb/cursor.go index 9b31253b6e..cbdf8cfa86 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -1,12 +1,5 @@ package tsdb -import ( - "container/heap" - "math" - - "github.com/influxdata/influxdb/influxql" -) - // EOF represents a "not found" key returned by a Cursor. const EOF = int64(-1) @@ -16,304 +9,3 @@ type Cursor interface { Next() (key int64, value interface{}) Ascending() bool } - -// MultiCursor returns a single cursor that combines the results of all cursors in order. -// -// If the same key is returned from multiple cursors then the first cursor -// specified will take precendence. A key will only be returned once from the -// returned cursor. -func MultiCursor(cursors ...Cursor) Cursor { - return &multiCursor{ - cursors: cursors, - } -} - -// multiCursor represents a cursor that combines multiple cursors into one. -type multiCursor struct { - cursors []Cursor - heap cursorHeap - prev int64 // previously read key -} - -// Seek moves the cursor to a given key. -func (mc *multiCursor) SeekTo(seek int64) (int64, interface{}) { - // Initialize heap. - h := make(cursorHeap, 0, len(mc.cursors)) - for i, c := range mc.cursors { - // Move cursor to position. Skip if it's empty. - k, v := c.SeekTo(seek) - if k == EOF { - continue - } - - // Append cursor to heap. - h = append(h, &cursorHeapItem{ - key: k, - value: v, - cursor: c, - priority: len(mc.cursors) - i, - }) - } - - heap.Init(&h) - mc.heap = h - mc.prev = EOF - - return mc.pop() -} - -// Ascending returns the direction of the first cursor. -func (mc *multiCursor) Ascending() bool { - if len(mc.cursors) == 0 { - return true - } - return mc.cursors[0].Ascending() -} - -// Next returns the next key/value from the cursor. -func (mc *multiCursor) Next() (int64, interface{}) { return mc.pop() } - -// pop returns the next item from the heap. -// Reads the next key/value from item's cursor and puts it back on the heap. -func (mc *multiCursor) pop() (key int64, value interface{}) { - // Read items until we have a key that doesn't match the previously read one. - // This is to perform deduplication when there's multiple items with the same key. - // The highest priority cursor will be read first and then remaining keys will be dropped. - for { - // Return EOF marker if there are no more items left. - if len(mc.heap) == 0 { - return EOF, nil - } - - // Read the next item from the heap. - item := heap.Pop(&mc.heap).(*cursorHeapItem) - - // Save the key/value for return. - key, value = item.key, item.value - - // Read the next item from the cursor. Push back to heap if one exists. - if item.key, item.value = item.cursor.Next(); item.key != EOF { - heap.Push(&mc.heap, item) - } - - // Skip if this key matches the previously returned one. - if key == mc.prev { - continue - } - - mc.prev = key - return - } -} - -// cursorHeap represents a heap of cursorHeapItems. -type cursorHeap []*cursorHeapItem - -func (h cursorHeap) Len() int { return len(h) } -func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h cursorHeap) Less(i, j int) bool { - // Use priority if the keys are the same. - if h[i].key == h[j].key { - return h[i].priority > h[j].priority - } - - // Otherwise compare based on cursor direction. - if h[i].cursor.Ascending() { - return h[i].key < h[j].key - } - return h[i].key > h[j].key -} - -func (h *cursorHeap) Push(x interface{}) { - *h = append(*h, x.(*cursorHeapItem)) -} - -func (h *cursorHeap) Pop() interface{} { - old := *h - n := len(old) - item := old[n-1] - *h = old[0 : n-1] - return item -} - -// cursorHeapItem is something we manage in a priority queue. -type cursorHeapItem struct { - key int64 - value interface{} - cursor Cursor - priority int -} - -// bufCursor represents a buffered cursor that is initialized at a time. -// This cursor does not allow seeking after initial seek. -type bufCursor struct { - cur Cursor - buf *struct { - key int64 - value interface{} - } -} - -// newBufCursor returns a new instance of bufCursor that wraps cur. -func newBufCursor(cur Cursor, seek int64) *bufCursor { - c := &bufCursor{cur: cur} - - // Limit min seek to zero. - if seek < 0 { - seek = 0 - } - - // Fill buffer, if seekable. - k, v := cur.SeekTo(seek) - if k != EOF { - c.buf = &struct { - key int64 - value interface{} - }{k, v} - } - - return c -} - -// SeekTo panics if called. Cursor can only be seeked on initialization. -func (c *bufCursor) SeekTo(seek int64) (key int64, value interface{}) { panic("unseekable") } - -// Next returns the next key & value from the underlying cursor. -func (c *bufCursor) Next() (key int64, value interface{}) { - if c.buf != nil { - key, value = c.buf.key, c.buf.value - c.buf = nil - return - } - return c.cur.Next() -} - -// Ascending returns true if the cursor traverses in ascending order. -func (c *bufCursor) Ascending() bool { return c.cur.Ascending() } - -// FloatCursorIterator represents a wrapper for Cursor to produce an influxql.FloatIterator. -type FloatCursorIterator struct { - cursor *bufCursor - opt influxql.IteratorOptions - ref *influxql.VarRef - tags influxql.Tags - point influxql.FloatPoint // reuseable point to emit -} - -// NewFloatCursorIterator returns a new instance of FloatCursorIterator. -func NewFloatCursorIterator(name string, tagMap map[string]string, cur Cursor, opt influxql.IteratorOptions) *FloatCursorIterator { - // Extract variable reference if available. - var ref *influxql.VarRef - if opt.Expr != nil { - ref = opt.Expr.(*influxql.VarRef) - } - - // Only allocate aux values if we have any requested. - var aux []interface{} - if len(opt.Aux) > 0 { - aux = make([]interface{}, len(opt.Aux)) - } - - // Convert to influxql tags. - tags := influxql.NewTags(tagMap) - - // Determine initial seek position based on sort direction. - seek := opt.StartTime - if !opt.Ascending { - seek = opt.EndTime - } - - return &FloatCursorIterator{ - point: influxql.FloatPoint{ - Name: name, - Tags: tags.Subset(opt.Dimensions), - Aux: aux, - }, - opt: opt, - ref: ref, - tags: tags, - cursor: newBufCursor(cur, seek), - } -} - -// Close closes the iterator. -func (itr *FloatCursorIterator) Close() error { return nil } - -// Next returns the next point from the cursor. -func (itr *FloatCursorIterator) Next() *influxql.FloatPoint { - for { - // Read next key/value and emit nil if at the end. - timestamp, value := itr.cursor.Next() - if timestamp == EOF { - return nil - } else if itr.opt.Ascending && timestamp > itr.opt.EndTime { - return nil - } else if !itr.opt.Ascending && timestamp < itr.opt.StartTime { - return nil - } - - // Set timestamp on point. - itr.point.Time = timestamp - - // Retrieve tags key/value map. - tags := itr.tags.KeyValues() - - // If value is a map then extract all the fields. - if m, ok := value.(map[string]interface{}); ok { - // If filter fails then skip to the next value. - if itr.opt.Condition != nil && !influxql.EvalBool(itr.opt.Condition, m) { - continue - } - - if itr.ref != nil { - fv, ok := m[itr.ref.Val].(float64) - if !ok { - continue // read next point - } - itr.point.Value = fv - } else { - itr.point.Value = math.NaN() - } - - // Read all auxilary fields. - for i, ref := range itr.opt.Aux { - if v, ok := m[ref.Val]; ok { - itr.point.Aux[i] = v - } else if s, ok := tags[ref.Val]; ok { - itr.point.Aux[i] = s - } else { - itr.point.Aux[i] = nil - } - } - - return &itr.point - } - - // Otherwise expect value to be of an appropriate type. - if itr.ref != nil { - // If filter fails then skip to the next value. - if itr.opt.Condition != nil && !influxql.EvalBool(itr.opt.Condition, map[string]interface{}{itr.ref.Val: value}) { - continue - } - - fv, ok := value.(float64) - if !ok { - continue // read next point - } - itr.point.Value = fv - } else { - itr.point.Value = math.NaN() - } - - // Read all auxilary fields. - for i, ref := range itr.opt.Aux { - if tagValue, ok := tags[ref.Val]; ok { - itr.point.Aux[i] = tagValue - } else { - itr.point.Aux[i] = value - } - } - - return &itr.point - } -} diff --git a/tsdb/cursor_test.go b/tsdb/cursor_test.go deleted file mode 100644 index c1e9d26b8b..0000000000 --- a/tsdb/cursor_test.go +++ /dev/null @@ -1,514 +0,0 @@ -package tsdb_test - -import ( - "bytes" - "math" - "math/rand" - "reflect" - "sort" - "testing" - "testing/quick" - - "github.com/davecgh/go-spew/spew" - "github.com/influxdata/influxdb/influxql" - "github.com/influxdata/influxdb/pkg/deep" - "github.com/influxdata/influxdb/tsdb" -) - -// Ensure the multi-cursor can correctly iterate across a single subcursor. -func TestMultiCursor_Single(t *testing.T) { - mc := tsdb.MultiCursor(NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 1, Value: 10}, - {Key: 2, Value: 20}, - }, true)) - - if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 1 || v.(int) != 10 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 2 || v.(int) != 20 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can correctly iterate across a single subcursor in reverse order. -func TestMultiCursor_Single_Reverse(t *testing.T) { - mc := tsdb.MultiCursor(NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 1, Value: 10}, - {Key: 2, Value: 20}, - }, false)) - - if k, v := mc.SeekTo(2); k != 2 || v.(int) != 20 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 1 || v.(int) != 10 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 0 || v.(int) != 0 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors. -func TestMultiCursor_Multiple_NonOverlapping(t *testing.T) { - mc := tsdb.MultiCursor( - NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 3, Value: 30}, - {Key: 4, Value: 40}, - }, true), - NewCursor([]CursorItem{ - {Key: 1, Value: 10}, - {Key: 2, Value: 20}, - }, true), - ) - - if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 1 || v.(int) != 10 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 2 || v.(int) != 20 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 3 || v.(int) != 30 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 4 || v.(int) != 40 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can correctly iterate across multiple non-overlapping subcursors. -func TestMultiCursor_Multiple_NonOverlapping_Reverse(t *testing.T) { - mc := tsdb.MultiCursor( - NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 3, Value: 30}, - {Key: 4, Value: 40}, - }, false), - NewCursor([]CursorItem{ - {Key: 1, Value: 10}, - {Key: 2, Value: 20}, - }, false), - ) - - if k, v := mc.SeekTo(4); k != 4 || v.(int) != 40 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 3 || v.(int) != 30 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 2 || v.(int) != 20 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 1 || v.(int) != 10 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 0 || v.(int) != 00 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors. -func TestMultiCursor_Multiple_Overlapping(t *testing.T) { - mc := tsdb.MultiCursor( - NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 3, Value: 3}, - {Key: 4, Value: 4}, - }, true), - NewCursor([]CursorItem{ - {Key: 0, Value: 0xF0}, - {Key: 2, Value: 0xF2}, - {Key: 4, Value: 0xF4}, - }, true), - ) - - if k, v := mc.SeekTo(0); k != 0 || v.(int) != 0 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 3 || v.(int) != 3 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 4 || v.(int) != 4 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can correctly iterate across multiple overlapping subcursors. -func TestMultiCursor_Multiple_Overlapping_Reverse(t *testing.T) { - mc := tsdb.MultiCursor( - NewCursor([]CursorItem{ - {Key: 0, Value: 0}, - {Key: 3, Value: 3}, - {Key: 4, Value: 4}, - }, false), - NewCursor([]CursorItem{ - {Key: 0, Value: 0xF0}, - {Key: 2, Value: 0xF2}, - {Key: 4, Value: 0xF4}, - }, false), - ) - - if k, v := mc.SeekTo(4); k != 4 || v.(int) != 4 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 3 || v.(int) != 3 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 2 || v.(int) != 0xF2 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != 0 || v.(int) != 0 { - t.Fatalf("unexpected key/value: %x / %x", k, v) - } else if k, v = mc.Next(); k != tsdb.EOF { - t.Fatalf("expected eof, got: %x / %x", k, v) - } -} - -// Ensure the multi-cursor can handle randomly generated data. -func TestMultiCursor_Quick(t *testing.T) { - quick.Check(func(useek uint64, cursors []Cursor) bool { - var got, exp []CursorItem - seek := int64(useek) % 100 - - // Merge all cursor data to determine expected output. - // First seen key overrides all other items with the same key. - m := make(map[int64]CursorItem) - for _, c := range cursors { - for _, item := range c.items { - if item.Key < seek { - continue - } - if _, ok := m[item.Key]; ok { - continue - } - m[item.Key] = item - } - } - - // Convert map back to single item list. - for _, item := range m { - exp = append(exp, item) - } - sort.Sort(CursorItems(exp)) - - // Create multi-cursor and iterate over all items. - mc := tsdb.MultiCursor(tsdbCursorSlice(cursors)...) - for k, v := mc.SeekTo(seek); k != tsdb.EOF; k, v = mc.Next() { - got = append(got, CursorItem{k, v.(int)}) - } - - // Verify results. - if !reflect.DeepEqual(got, exp) { - t.Fatalf("mismatch: seek=%d\n\ngot=%+v\n\nexp=%+v", seek, got, exp) - } - - return true - }, nil) -} - -// Ensure a cursor with a single ref value can be converted into an iterator. -func TestFloatCursorIterator_SingleValue(t *testing.T) { - cur := NewCursor([]CursorItem{ - {Key: 0, Value: float64(100)}, - {Key: 3, Value: float64(200)}, - }, true) - - opt := influxql.IteratorOptions{ - Expr: &influxql.VarRef{Val: "value"}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - } - itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt) - defer itr.Close() - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 0, - Value: float64(100), - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 3, - Value: float64(200), - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p := itr.Next(); p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) - } -} - -// Ensure a cursor with a ref and multiple aux values can be converted into an iterator. -func TestFloatCursorIterator_MultipleValues(t *testing.T) { - cur := NewCursor([]CursorItem{ - {Key: 0, Value: map[string]interface{}{"val1": float64(100), "val2": "foo"}}, - {Key: 3, Value: map[string]interface{}{"val1": float64(200), "val2": "bar"}}, - }, true) - - opt := influxql.IteratorOptions{ - Expr: &influxql.VarRef{Val: "val1"}, Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - } - itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt) - defer itr.Close() - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 0, - Value: 100, - Aux: []interface{}{float64(100), "foo"}, - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 3, - Value: 200, - Aux: []interface{}{float64(200), "bar"}, - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p := itr.Next(); p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) - } -} - -// Ensure a cursor with a single value can be converted into an iterator. -func TestFloatCursorIterator_Aux_SingleValue(t *testing.T) { - cur := NewCursor([]CursorItem{ - {Key: 0, Value: float64(100)}, - {Key: 3, Value: float64(200)}, - }, true) - - opt := influxql.IteratorOptions{ - Aux: []influxql.VarRef{{Val: "val1"}}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - } - itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt) - defer itr.Close() - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 0, - Value: math.NaN(), - Aux: []interface{}{float64(100)}, - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 3, - Value: math.NaN(), - Aux: []interface{}{float64(200)}, - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p := itr.Next(); p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) - } -} - -// Ensure a cursor with multiple values can be converted into an iterator. -func TestFloatCursorIterator_Aux_MultipleValues(t *testing.T) { - cur := NewCursor([]CursorItem{ - {Key: 0, Value: map[string]interface{}{"val1": float64(100), "val2": "foo"}}, - {Key: 3, Value: map[string]interface{}{"val1": float64(200), "val2": "bar"}}, - }, true) - - opt := influxql.IteratorOptions{ - Aux: []influxql.VarRef{{Val: "val1"}, {Val: "val2"}}, - Ascending: true, - StartTime: influxql.MinTime, - EndTime: influxql.MaxTime, - } - itr := tsdb.NewFloatCursorIterator("series0", map[string]string{"host": "serverA"}, cur, opt) - defer itr.Close() - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 0, - Value: math.NaN(), - Aux: []interface{}{float64(100), "foo"}, - }) { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - - if p := itr.Next(); !deep.Equal(p, &influxql.FloatPoint{ - Name: "series0", - Time: 3, - Value: math.NaN(), - Aux: []interface{}{float64(200), "bar"}, - }) { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - - if p := itr.Next(); p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) - } -} - -// Ensure a cursor iterator does not go past the end time. -func TestFloatCursorIterator_EndTime(t *testing.T) { - cur := NewCursor([]CursorItem{ - {Key: 0, Value: float64(100)}, - {Key: 3, Value: float64(200)}, - {Key: 4, Value: float64(300)}, - }, true) - - itr := tsdb.NewFloatCursorIterator("x", nil, cur, influxql.IteratorOptions{ - Expr: &influxql.VarRef{Val: "value"}, - Ascending: true, - EndTime: 3, - }) - defer itr.Close() - - // Verify that only two points are emitted. - if p := itr.Next(); p == nil || p.Time != 0 { - t.Fatalf("unexpected point(0): %s", spew.Sdump(p)) - } - if p := itr.Next(); p == nil || p.Time != 3 { - t.Fatalf("unexpected point(1): %s", spew.Sdump(p)) - } - if p := itr.Next(); p != nil { - t.Fatalf("expected eof, got: %s", spew.Sdump(p)) - } -} - -// Cursor represents an in-memory test cursor. -type Cursor struct { - items []CursorItem - index int - ascending bool -} - -// NewCursor returns a new instance of Cursor. -func NewCursor(items []CursorItem, ascending bool) *Cursor { - index := 0 - sort.Sort(CursorItems(items)) - - if !ascending { - index = len(items) - } - return &Cursor{ - items: items, - index: index, - ascending: ascending, - } -} - -func (c *Cursor) Ascending() bool { return c.ascending } - -// Seek seeks to an item by key. -func (c *Cursor) SeekTo(seek int64) (key int64, value interface{}) { - if c.ascending { - return c.seekForward(seek) - } - return c.seekReverse(seek) -} - -func (c *Cursor) seekForward(seek int64) (key int64, value interface{}) { - for c.index = 0; c.index < len(c.items); c.index++ { - if c.items[c.index].Key < seek { // skip keys less than seek - continue - } - key, value = c.items[c.index].Key, c.items[c.index].Value - c.index++ - return key, value - } - return tsdb.EOF, nil -} - -func (c *Cursor) seekReverse(seek int64) (key int64, value interface{}) { - for c.index = len(c.items) - 1; c.index >= 0; c.index-- { - if c.items[c.index].Key > seek { // skip keys greater than seek - continue - } - key, value = c.items[c.index].Key, c.items[c.index].Value - c.index-- - return key, value - } - return tsdb.EOF, nil -} - -// Next returns the next key/value pair. -func (c *Cursor) Next() (key int64, value interface{}) { - if !c.ascending && c.index < 0 { - return tsdb.EOF, nil - } - - if c.ascending && c.index >= len(c.items) { - return tsdb.EOF, nil - } - - k, v := c.items[c.index].Key, c.items[c.index].Value - - if c.ascending { - c.index++ - } else { - c.index-- - } - return k, v -} - -// Generate returns a randomly generated cursor. Implements quick.Generator. -func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value { - c.index = 0 - c.ascending = true - - c.items = make([]CursorItem, rand.Intn(size)) - for i := range c.items { - c.items[i] = CursorItem{ - Key: rand.Int63n(int64(size)), - Value: rand.Int(), - } - } - - // Sort items by key. - sort.Sort(CursorItems(c.items)) - - return reflect.ValueOf(c) -} - -// tsdbCursorSlice converts a Cursor slice to a tsdb.Cursor slice. -func tsdbCursorSlice(a []Cursor) []tsdb.Cursor { - var other []tsdb.Cursor - for i := range a { - other = append(other, &a[i]) - } - return other -} - -// CursorItem represents a key/value pair in a cursor. -type CursorItem struct { - Key int64 - Value interface{} -} - -type CursorItems []CursorItem - -func (a CursorItems) Len() int { return len(a) } -func (a CursorItems) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CursorItems) Less(i, j int) bool { return a[i].Key < a[j].Key } - -// byteSlices represents a sortable slice of byte slices. -type byteSlices [][]byte - -func (a byteSlices) Len() int { return len(a) } -func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } -func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/tsdb/engine.go b/tsdb/engine.go index b62a09e21d..bec5fd7a07 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -1,7 +1,6 @@ package tsdb import ( - "bytes" "errors" "fmt" "io" @@ -92,7 +91,7 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro } // If it's a dir then it's a tsm1 engine - format := "tsm1" + format := DefaultEngine if fi, err := os.Stat(path); err != nil { return nil, err } else if !fi.Mode().IsDir() { @@ -124,30 +123,3 @@ func NewEngineOptions() EngineOptions { Config: NewConfig(), } } - -// DedupeEntries returns slices with unique keys (the first 8 bytes). -func DedupeEntries(a [][]byte) [][]byte { - // Convert to a map where the last slice is used. - m := make(map[string][]byte) - for _, b := range a { - m[string(b[0:8])] = b - } - - // Convert map back to a slice of byte slices. - other := make([][]byte, 0, len(m)) - for _, v := range m { - other = append(other, v) - } - - // Sort entries. - sort.Sort(ByteSlices(other)) - - return other -} - -// ByteSlices wraps a list of byte-slices for sorting. -type ByteSlices [][]byte - -func (a ByteSlices) Len() int { return len(a) } -func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 83bd44ea36..cb3cf4f724 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -470,7 +470,6 @@ func (e *Engine) addToIndexFromKey(shardID uint64, key string, fieldType influxq _, tags, _ := models.ParseKey(seriesKey) s := tsdb.NewSeries(seriesKey, tags) - s.InitializeShards() index.CreateSeriesIndexIfNotExists(measurement, s) s.AssignShard(shardID) diff --git a/tsdb/meta.go b/tsdb/meta.go index c801c8e063..b21a514ba0 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -6,7 +6,6 @@ import ( "regexp" "sort" "sync" - "time" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/influxql" @@ -20,8 +19,6 @@ import ( //go:generate protoc --gogo_out=. internal/meta.proto const ( - maxStringLength = 64 * 1024 - statDatabaseSeries = "numSeries" // number of series in this database statDatabaseMeasurements = "numMeasurements" // number of measurements in this database ) @@ -494,10 +491,6 @@ func (d *DatabaseIndex) DropSeries(keys []string) { d.statMap.Add(statDatabaseSeries, -nDeleted) } -const ( - statMeasurementSeries = "numSeries" // number of series contained in this measurement -) - // Measurement represents a collection of time series in a database. It also contains in memory // structures for indexing tags. Exported functions are goroutine safe while un-exported functions // assume the caller will use the appropriate locks @@ -1338,40 +1331,6 @@ func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string { return out } -// SelectFields returns a list of fields in the SELECT section of stmt. -func (m *Measurement) SelectFields(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInSelect() { - if m.HasField(name) { - set.add(name) - continue - } - } - return set.list() -} - -// SelectTags returns a list of non-field tags in the SELECT section of stmt. -func (m *Measurement) SelectTags(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInSelect() { - if !m.HasField(name) && m.HasTagKey(name) { - set.add(name) - } - } - return set.list() -} - -// WhereFields returns a list of non-"time" fields in the WHERE section of stmt. -func (m *Measurement) WhereFields(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInWhere() { - if name != "time" && m.HasField(name) { - set.add(name) - } - } - return set.list() -} - // Measurements represents a list of *Measurement. type Measurements []*Measurement @@ -1379,45 +1338,6 @@ func (a Measurements) Len() int { return len(a) } func (a Measurements) Less(i, j int) bool { return a[i].Name < a[j].Name } func (a Measurements) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// SelectFields returns a list of fields in the SELECT section of stmt. -func (a Measurements) SelectFields(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInSelect() { - for _, m := range a { - if m.HasField(name) { - set.add(name) - } - } - } - return set.list() -} - -// SelectTags returns a list of non-field tags in the SELECT section of stmt. -func (a Measurements) SelectTags(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInSelect() { - for _, m := range a { - if !m.HasField(name) && m.HasTagKey(name) { - set.add(name) - } - } - } - return set.list() -} - -// WhereFields returns a list of non-"time" fields in the WHERE section of stmt. -func (a Measurements) WhereFields(stmt *influxql.SelectStatement) []string { - set := newStringSet() - for _, name := range stmt.NamesInWhere() { - for _, m := range a { - if name != "time" && m.HasField(name) { - set.add(name) - } - } - } - return set.list() -} - func (a Measurements) intersect(other Measurements) Measurements { l := a r := other @@ -1552,13 +1472,6 @@ func (s *Series) UnmarshalBinary(buf []byte) error { return nil } -// InitializeShards initializes the list of shards. -func (s *Series) InitializeShards() { - s.mu.Lock() - s.shardIDs = make(map[uint64]bool) - s.mu.Unlock() -} - // SeriesIDs is a convenience type for sorting, checking equality, and doing // union and intersection of collections of series ids. type SeriesIDs []uint64 @@ -1714,11 +1627,6 @@ func MarshalTags(tags map[string]string) []byte { return b } -// timeBetweenInclusive returns true if t is between min and max, inclusive. -func timeBetweenInclusive(t, min, max time.Time) bool { - return (t.Equal(min) || t.After(min)) && (t.Equal(max) || t.Before(max)) -} - // TagKeys returns a list of the measurement's tag names. func (m *Measurement) TagKeys() []string { m.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index e5226c66f0..e63e2c8b85 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -1,7 +1,6 @@ package tsdb import ( - "encoding/json" "errors" "expvar" "fmt" @@ -1240,33 +1239,3 @@ func (itr *measurementKeysIterator) Next() (*influxql.FloatPoint, error) { return p, nil } } - -// IsNumeric returns whether a given aggregate can only be run on numeric fields. -func IsNumeric(c *influxql.Call) bool { - switch c.Name { - case "count", "first", "last", "distinct": - return false - default: - return true - } -} - -// mustMarshal encodes a value to JSON. -// This will panic if an error occurs. This should only be used internally when -// an invalid marshal will cause corruption and a panic is appropriate. -func mustMarshalJSON(v interface{}) []byte { - b, err := json.Marshal(v) - if err != nil { - panic("marshal: " + err.Error()) - } - return b -} - -// mustUnmarshalJSON decodes a value from JSON. -// This will panic if an error occurs. This should only be used internally when -// an invalid unmarshal will cause corruption and a panic is appropriate. -func mustUnmarshalJSON(b []byte, v interface{}) { - if err := json.Unmarshal(b, v); err != nil { - panic("unmarshal: " + err.Error()) - } -} diff --git a/tsdb/store.go b/tsdb/store.go index 4de4c02225..ef3333f6a1 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -11,7 +11,6 @@ import ( "runtime" "sort" "strconv" - "strings" "sync" "time" @@ -26,10 +25,6 @@ var ( ErrStoreClosed = fmt.Errorf("store is closed") ) -const ( - maintenanceCheckInterval = time.Minute -) - // Store manages shards and indexes for databases. type Store struct { mu sync.RWMutex @@ -780,18 +775,6 @@ func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) mode return filteredSeries } -// IsRetryable returns true if this error is temporary and could be retried -func IsRetryable(err error) bool { - if err == nil { - return true - } - - if strings.Contains(err.Error(), "field type conflict") { - return false - } - return true -} - // DecodeStorePath extracts the database and retention policy names // from a given shard or WAL path. func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {