chore(influxdb): Placate the linter.

pull/19446/head
Brett Buddin 2020-08-27 15:46:32 -04:00
parent c91e502f82
commit b917d8d9b0
No known key found for this signature in database
GPG Key ID: C51265E441C4C5AC
27 changed files with 45 additions and 367 deletions

View File

@ -75,7 +75,7 @@ Usage: influx_inspect verify [flags]
This check skips verification of block checksums.
`, os.Getenv("HOME"))
fmt.Fprintf(cmd.Stdout, usage)
fmt.Fprint(cmd.Stdout, usage)
}
type verifyTSM struct {

View File

@ -1,3 +1,4 @@
//lint:file-ignore U1000 this error seems to be misreporting
package legacy
import (

View File

@ -85,8 +85,6 @@ const (
msgInvalidGzipHeader = "gzipped HTTP body contains an invalid header"
msgInvalidPrecision = "invalid precision; valid precision units are ns, us, ms, and s"
msgUnexpectedWriteError = "unexpected error writing points to database"
opWriteHandler = "http/writeHandler"
)

View File

@ -4,6 +4,7 @@
// DO NOT EDIT!
// Source: iterator.gen.go.tmpl
//lint:file-ignore U1000 this is generated code
package query
import (

View File

@ -1,3 +1,4 @@
//lint:file-ignore U1000 this is generated code
package query
import (

View File

@ -150,7 +150,7 @@ func New(config Config) (*Controller, error) {
if err != nil {
return nil, errors.Wrap(err, "invalid controller config")
}
c.MetricLabelKeys = append(c.MetricLabelKeys, orgLabel) //lint:ignore SA1029 this is a temporary ignore until we have time to create an appropriate type
c.MetricLabelKeys = append(c.MetricLabelKeys, orgLabel)
logger := c.Logger
if logger == nil {
logger = zap.NewNop()

View File

@ -1,3 +1,4 @@
//lint:file-ignore U1000 ignore these flagger-related dead code issues until we can circle back
package testing_test
import (

View File

@ -1,199 +0,0 @@
package storageflux
import (
"context"
"fmt"
"sync/atomic"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
)
// splitWindows will split a windowTable by creating a new table from each
// row and modifying the group key to use the start and stop values from
// that row.
func splitWindows(ctx context.Context, alloc memory.Allocator, in flux.Table, selector bool, f func(t flux.Table) error) error {
wts := &windowTableSplitter{
ctx: ctx,
in: in,
alloc: alloc,
selector: selector,
}
return wts.Do(f)
}
type windowTableSplitter struct {
ctx context.Context
in flux.Table
alloc memory.Allocator
selector bool
}
func (w *windowTableSplitter) Do(f func(flux.Table) error) error {
defer w.in.Done()
startIdx, err := w.getTimeColumnIndex(execute.DefaultStartColLabel)
if err != nil {
return err
}
stopIdx, err := w.getTimeColumnIndex(execute.DefaultStopColLabel)
if err != nil {
return err
}
return w.in.Do(func(cr flux.ColReader) error {
// Retrieve the start and stop columns for splitting
// the windows.
start := cr.Times(startIdx)
stop := cr.Times(stopIdx)
// Iterate through each time to produce a table
// using the start and stop values.
arrs := make([]array.Interface, len(cr.Cols()))
for j := range cr.Cols() {
arrs[j] = getColumnValues(cr, j)
}
values := arrs[valueColIdx]
for i, n := 0, cr.Len(); i < n; i++ {
startT, stopT := start.Value(i), stop.Value(i)
// Rewrite the group key using the new time.
key := groupKeyForWindow(cr.Key(), startT, stopT)
if w.selector && values.IsNull(i) {
// Produce an empty table if the value is null
// and this is a selector.
table := execute.NewEmptyTable(key, cr.Cols())
if err := f(table); err != nil {
return err
}
continue
}
// Produce a slice for each column into a new
// table buffer.
buffer := arrow.TableBuffer{
GroupKey: key,
Columns: cr.Cols(),
Values: make([]array.Interface, len(cr.Cols())),
}
for j, arr := range arrs {
buffer.Values[j] = arrow.Slice(arr, int64(i), int64(i+1))
}
// Wrap these into a single table and execute.
done := make(chan struct{})
table := &windowTableRow{
buffer: buffer,
done: done,
}
if err := f(table); err != nil {
return err
}
select {
case <-done:
case <-w.ctx.Done():
return w.ctx.Err()
}
}
return nil
})
}
func (w *windowTableSplitter) getTimeColumnIndex(label string) (int, error) {
j := execute.ColIdx(label, w.in.Cols())
if j < 0 {
return -1, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("missing %q column from window splitter", label),
}
} else if c := w.in.Cols()[j]; c.Type != flux.TTime {
return -1, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("%q column must be of type time", label),
}
}
return j, nil
}
type windowTableRow struct {
used int32
buffer arrow.TableBuffer
done chan struct{}
}
func (w *windowTableRow) Key() flux.GroupKey {
return w.buffer.GroupKey
}
func (w *windowTableRow) Cols() []flux.ColMeta {
return w.buffer.Columns
}
func (w *windowTableRow) Do(f func(flux.ColReader) error) error {
if !atomic.CompareAndSwapInt32(&w.used, 0, 1) {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "table already read",
}
}
defer close(w.done)
err := f(&w.buffer)
w.buffer.Release()
return err
}
func (w *windowTableRow) Done() {
if atomic.CompareAndSwapInt32(&w.used, 0, 1) {
w.buffer.Release()
close(w.done)
}
}
func (w *windowTableRow) Empty() bool {
return false
}
func groupKeyForWindow(key flux.GroupKey, start, stop int64) flux.GroupKey {
cols := key.Cols()
vs := make([]values.Value, len(cols))
for j, c := range cols {
if c.Label == execute.DefaultStartColLabel {
vs[j] = values.NewTime(values.Time(start))
} else if c.Label == execute.DefaultStopColLabel {
vs[j] = values.NewTime(values.Time(stop))
} else {
vs[j] = key.Value(j)
}
}
return execute.NewGroupKey(cols, vs)
}
// getColumnValues returns the array from the column reader as an array.Interface.
func getColumnValues(cr flux.ColReader, j int) array.Interface {
switch typ := cr.Cols()[j].Type; typ {
case flux.TInt:
return cr.Ints(j)
case flux.TUInt:
return cr.UInts(j)
case flux.TFloat:
return cr.Floats(j)
case flux.TString:
return cr.Strings(j)
case flux.TBool:
return cr.Bools(j)
case flux.TTime:
return cr.Times(j)
default:
panic(fmt.Errorf("unimplemented column type: %s", typ))
}
}

View File

@ -1,62 +0,0 @@
package storage
import (
"context"
"io"
)
// opener is something that can be opened and closed.
type opener interface {
Open(context.Context) error
io.Closer // TODO consider a closer-with-context instead
}
// openHelper is a helper to abstract the pattern of opening multiple things,
// exiting early if any open fails, and closing any of the opened things
// in the case of failure.
type openHelper struct {
opened []io.Closer
err error
}
// Open attempts to open the opener. If an error has happened already
// then no calls are made to the opener.
func (o *openHelper) Open(ctx context.Context, op opener) {
if o.err != nil {
return
}
o.err = op.Open(ctx)
if o.err == nil {
o.opened = append(o.opened, op)
}
}
// Done returns the error of the first open and closes in reverse
// order any opens that have already happened if there was an error.
func (o *openHelper) Done() error {
if o.err == nil {
return nil
}
for i := len(o.opened) - 1; i >= 0; i-- {
o.opened[i].Close()
}
return o.err
}
// closeHelper is a helper to abstract the pattern of closing multiple
// things and keeping track of the first encountered error.
type closeHelper struct {
err error
}
// Close closes the closer and keeps track of the first error.
func (c *closeHelper) Close(cl io.Closer) {
if err := cl.Close(); c.err == nil {
c.err = err
}
}
// Done returns the first error.
func (c *closeHelper) Done() error {
return c.err
}

View File

@ -54,19 +54,6 @@ type retentionEnforcer struct {
tracker *retentionTracker
}
// newRetentionEnforcer returns a new enforcer that ensures expired data is
// deleted every interval period. Setting interval to 0 is equivalent to
// disabling the service.
func newRetentionEnforcer(engine Deleter, snapshotter Snapshotter, bucketService BucketFinder) *retentionEnforcer {
return &retentionEnforcer{
Engine: engine,
Snapshotter: snapshotter,
BucketService: bucketService,
logger: zap.NewNop(),
tracker: newRetentionTracker(newRetentionMetrics(nil), nil),
}
}
// SetDefaultMetricLabels sets the default labels for the retention metrics.
func (s *retentionEnforcer) SetDefaultMetricLabels(defaultLabels prometheus.Labels) {
if s == nil {

View File

@ -118,13 +118,6 @@ func dumpBufs(a, b []byte) {
fmt.Println()
}
func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%d %08b\n", i, v)
}
fmt.Println()
}
func TestFloatArrayEncodeAll_NaN(t *testing.T) {
examples := [][]float64{
{1.0, math.NaN(), 2.0},
@ -179,7 +172,7 @@ func Test_FloatArrayEncodeAll_Quick(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
if got, exp := result, src[:len(src)]; !reflect.DeepEqual(got, exp) {
if got, exp := result, src; !reflect.DeepEqual(got, exp) {
t.Fatalf("got result %v, expected %v", got, exp)
}
return true

View File

@ -34,13 +34,6 @@ func dumpBufs(a, b []byte) {
fmt.Println()
}
func dumpBuf(b []byte) {
for i, v := range b {
fmt.Printf("%[1]d %08[2]b (%[2]d)\n", i, v)
}
fmt.Println()
}
func TestIntegerArrayEncodeAll_NoValues(t *testing.T) {
b, err := IntegerArrayEncodeAll(nil, nil)
if err != nil {
@ -650,9 +643,7 @@ func TestIntegerArrayEncodeAll_Quick(t *testing.T) {
// Copy over values to compare result—src is modified...
exp := make([]int64, 0, len(src))
for _, v := range src {
exp = append(exp, v)
}
exp = append(exp, src...)
// Retrieve encoded bytes from encoder.
b, err := IntegerArrayEncodeAll(src, nil)

View File

@ -296,7 +296,7 @@ func TestCompactor_DecodeError(t *testing.T) {
compactor.Open()
files, err = compactor.CompactFull([]string{f1, f2, f3})
_, err = compactor.CompactFull([]string{f1, f2, f3})
if err == nil ||
!strings.Contains(err.Error(), "decode error: unable to decompress block type float for key 'cpu,host=A#!~#value': unpackBlock: not enough data for timestamp") {
t.Fatalf("expected error writing snapshot: %v", err)

View File

@ -4,6 +4,7 @@
// DO NOT EDIT!
// Source: encoding.gen.go.tmpl
//lint:file-ignore U1000 generated code
package tsm1
import (

View File

@ -1,3 +1,4 @@
//lint:file-ignore U1000 generated code
package tsm1
import (

View File

@ -169,12 +169,12 @@ func (e EmptyValue) Size() int { return 0 }
// String returns the empty string.
func (e EmptyValue) String() string { return "" }
func (_ EmptyValue) internalOnly() {}
func (_ StringValue) internalOnly() {}
func (_ IntegerValue) internalOnly() {}
func (_ UnsignedValue) internalOnly() {}
func (_ BooleanValue) internalOnly() {}
func (_ FloatValue) internalOnly() {}
func (EmptyValue) internalOnly() {}
func (StringValue) internalOnly() {}
func (IntegerValue) internalOnly() {}
func (UnsignedValue) internalOnly() {}
func (BooleanValue) internalOnly() {}
func (FloatValue) internalOnly() {}
// Encode converts the values to a byte slice. If there are no values,
// this function panics.

View File

@ -260,7 +260,7 @@ func benchmarkTagSets(b *testing.B, n int, opt query.IteratorOptions) {
for i := 0; i < n; i++ {
tags := map[string]string{"tag1": "value1", "tag2": "value2"}
s := newSeries(uint64(i), m, fmt.Sprintf("m,tag1=value1,tag2=value2"), models.NewTags(tags))
s := newSeries(uint64(i), m, "m,tag1=value1,tag2=value2", models.NewTags(tags))
ss.Add(uint64(i))
m.AddSeries(s)
}

View File

@ -1,3 +1,4 @@
//lint:file-ignore SA5011 we use assertions, which don't guard
package tsi1
import (

View File

@ -686,7 +686,7 @@ func (c *SeriesPartitionCompactor) insertKeyIDMap(dst []byte, capacity int64, se
binary.BigEndian.PutUint64(elem[8:], id)
// Swap with values in that position.
hash, key, offset, id = elemHash, elemKey, elemOffset, elemID
_, _, offset, id = elemHash, elemKey, elemOffset, elemID
// Update current distance.
dist = d
@ -723,7 +723,7 @@ func (c *SeriesPartitionCompactor) insertIDOffsetMap(dst []byte, capacity int64,
binary.BigEndian.PutUint64(elem[8:], uint64(offset))
// Swap with values in that position.
hash, id, offset = elemHash, elemID, elemOffset
_, id, offset = elemHash, elemID, elemOffset
// Update current distance.
dist = d

View File

@ -212,7 +212,7 @@ func BenchmarkSeriesIDSet_Add(b *testing.B) {
lookup := uint64(300032)
// Add the same value over and over.
b.Run(fmt.Sprint("cardinality_1000000_add"), func(b *testing.B) {
b.Run("cardinality_1000000_add", func(b *testing.B) {
b.Run("same", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Add(lookup)
@ -245,7 +245,7 @@ func BenchmarkSeriesIDSet_Add(b *testing.B) {
})
// Add the same value over and over with no lock
b.Run(fmt.Sprint("cardinality_1000000_check_add"), func(b *testing.B) {
b.Run("cardinality_1000000_check_add", func(b *testing.B) {
b.Run("same no lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if !set.ContainsNoLock(lookup) {
@ -533,7 +533,7 @@ func BenchmarkSeriesIDSet_Remove(b *testing.B) {
lookup := uint64(300032)
// Remove the same value over and over.
b.Run(fmt.Sprint("cardinality_1000000_remove_same"), func(b *testing.B) {
b.Run("cardinality_1000000_remove_same", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Remove(lookup)
}
@ -541,7 +541,7 @@ func BenchmarkSeriesIDSet_Remove(b *testing.B) {
// Check if the value exists before adding it. Subsequent repeats of the code
// will result in contains checks.
b.Run(fmt.Sprint("cardinality_1000000_check_remove_global_lock"), func(b *testing.B) {
b.Run("cardinality_1000000_check_remove_global_lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
set.Lock()
if set.ContainsNoLock(lookup) {
@ -552,7 +552,7 @@ func BenchmarkSeriesIDSet_Remove(b *testing.B) {
})
// Check if the value exists before adding it under two locks.
b.Run(fmt.Sprint("cardinality_1000000_check_remove_multi_lock"), func(b *testing.B) {
b.Run("cardinality_1000000_check_remove_multi_lock", func(b *testing.B) {
for i := 0; i < b.N; i++ {
if set.Contains(lookup) {
set.Remove(lookup)

View File

@ -1915,7 +1915,9 @@ func benchmarkWritePointsExistingSeries(b *testing.B, mCnt, tkCnt, tvCnt, pntCnt
defer sfile.Close()
shard, tmpDir, err := openShard(sfile)
defer shard.Close()
defer func() {
_ = shard.Close()
}()
if err != nil {
b.Fatal(err)
}
@ -1956,10 +1958,14 @@ func benchmarkWritePointsExistingSeriesFields(b *testing.B, mCnt, tkCnt, tvCnt,
}
sfile := MustOpenSeriesFile()
defer sfile.Close()
defer func() {
_ = sfile.Close()
}()
shard, tmpDir, err := openShard(sfile)
defer shard.Close()
defer func() {
_ = shard.Close()
}()
if err != nil {
b.Fatal(err)
}
@ -2000,7 +2006,9 @@ func benchmarkWritePointsExistingSeriesEqualBatches(b *testing.B, mCnt, tkCnt, t
defer sfile.Close()
shard, tmpDir, err := openShard(sfile)
defer shard.Close()
defer func() {
_ = shard.Close()
}()
if err != nil {
b.Fatal(err)
}

View File

@ -1,3 +1,4 @@
//lint:file-ignore ST1005 this is old code. we're not going to conform error messages
package tsdb // import "github.com/influxdata/influxdb/v2/tsdb"
import (

View File

@ -1,3 +1,4 @@
//lint:file-ignore SA2002 this is older code, and `go test` will panic if its really a problem.
package tsdb_test
import (
@ -192,7 +193,7 @@ func TestStore_CreateMixedShards(t *testing.T) {
}
indexes := tsdb.RegisteredIndexes()
for i, _ := range indexes {
for i := range indexes {
j := (i + 1) % len(indexes)
index1 := indexes[i]
index2 := indexes[j]
@ -237,7 +238,7 @@ func TestStore_DropMeasurementMixedShards(t *testing.T) {
}
indexes := tsdb.RegisteredIndexes()
for i, _ := range indexes {
for i := range indexes {
j := (i + 1) % len(indexes)
index1 := indexes[i]
index2 := indexes[j]
@ -372,7 +373,7 @@ func TestStore_WriteMixedShards(t *testing.T) {
}
indexes := tsdb.RegisteredIndexes()
for i, _ := range indexes {
for i := range indexes {
j := (i + 1) % len(indexes)
index1 := indexes[i]
index2 := indexes[j]

View File

@ -5,8 +5,6 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"reflect"
"regexp"
"testing"
@ -327,26 +325,6 @@ func TestStatementExecutor_NormalizeDeleteSeries(t *testing.T) {
}
}
type mockAuthorizer struct {
AuthorizeDatabaseFn func(influxql.Privilege, string) bool
}
func (a *mockAuthorizer) AuthorizeDatabase(p influxql.Privilege, name string) bool {
return a.AuthorizeDatabaseFn(p, name)
}
func (m *mockAuthorizer) AuthorizeQuery(database string, query *influxql.Query) error {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func (m *mockAuthorizer) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
panic("fail")
}
func TestQueryExecutor_ExecuteQuery_ShowDatabases(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@ -454,11 +432,6 @@ func NewQueryExecutor(t *testing.T, opts ...optFn) *QueryExecutor {
}
e.Executor.StatementExecutor = e.StatementExecutor
var out io.Writer = &e.LogOutput
if testing.Verbose() {
out = io.MultiWriter(out, os.Stderr)
}
return e
}

View File

@ -2,11 +2,8 @@ package meta_test
import (
"context"
"io/ioutil"
"os"
"path"
"reflect"
"runtime"
"strings"
"testing"
"time"
@ -1157,21 +1154,6 @@ func newConfig() *meta.Config {
return meta.NewConfig()
}
func testTempDir(skip int) string {
// Get name of the calling function.
pc, _, _, ok := runtime.Caller(skip)
if !ok {
panic("failed to get name of test function")
}
_, prefix := path.Split(runtime.FuncForPC(pc).Name())
// Make a temp dir prefixed with calling function's name.
dir, err := ioutil.TempDir(os.TempDir(), prefix)
if err != nil {
panic(err)
}
return dir
}
func isAdmin(u meta.User) bool {
ui := u.(*meta.UserInfo)
return ui.Admin

View File

@ -428,6 +428,7 @@ func (data *Data) CreateContinuousQuery(database, name, query string) error {
// If the query string is the same, we'll silently return,
// otherwise we'll assume the user might be trying to
// overwrite an existing CQ with a different query.
//lint:ignore SA6005 this is old code so we should revisit the use of strings.EqualFold
if strings.ToLower(cq.Query) == strings.ToLower(query) {
return nil
}

View File

@ -10,10 +10,7 @@ import (
func exprEqual(x, y influxql.Expr) bool {
if x == nil {
if y == nil {
return true
}
return false
return y == nil
}
if y == nil {