diff --git a/client/influxdb.go b/client/influxdb.go index 12dd14ca65..773eb279da 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -4,6 +4,7 @@ package client // import "github.com/influxdata/influxdb/client" import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -146,7 +147,7 @@ func NewClient(c Config) (*Client, error) { // No need for compression in local communications. tr.DisableCompression = true - tr.Dial = func(_, _ string) (net.Conn, error) { + tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) { return net.Dial("unix", c.UnixSocket) } } diff --git a/client/v2/client_test.go b/client/v2/client_test.go index d27c162e94..145086263e 100644 --- a/client/v2/client_test.go +++ b/client/v2/client_test.go @@ -2,6 +2,7 @@ package client import ( "encoding/json" + "fmt" "net/http" "net/http/httptest" "reflect" @@ -257,16 +258,19 @@ func TestClient_Concurrent_Use(t *testing.T) { wg.Add(3) n := 1000 + errC := make(chan error, 3) go func() { defer wg.Done() bp, err := NewBatchPoints(BatchPointsConfig{}) if err != nil { - t.Errorf("got error %v", err) + errC <- fmt.Errorf("got error %v", err) + return } for i := 0; i < n; i++ { if err = c.Write(bp); err != nil { - t.Fatalf("got error %v", err) + errC <- fmt.Errorf("got error %v", err) + return } } }() @@ -276,7 +280,8 @@ func TestClient_Concurrent_Use(t *testing.T) { var q Query for i := 0; i < n; i++ { if _, err := c.Query(q); err != nil { - t.Fatalf("got error %v", err) + errC <- fmt.Errorf("got error %v", err) + return } } }() @@ -287,7 +292,13 @@ func TestClient_Concurrent_Use(t *testing.T) { c.Ping(time.Second) } }() + wg.Wait() + + close(errC) + if err := <-errC; err != nil { + t.Fatal(err) + } } func TestClient_Write(t *testing.T) { diff --git a/cmd/influx_tsm/main.go b/cmd/influx_tsm/main.go index d66994376c..f852388789 100644 --- a/cmd/influx_tsm/main.go +++ b/cmd/influx_tsm/main.go @@ -302,7 +302,7 @@ func backupDatabase(db string) error { if err := out.Truncate(0); err != nil { return err } - if _, err := out.Seek(0, os.SEEK_SET); err != nil { + if _, err := out.Seek(0, io.SeekStart); err != nil { return err } } @@ -311,11 +311,11 @@ func backupDatabase(db string) error { log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size()) } - off, err := out.Seek(0, os.SEEK_END) + off, err := out.Seek(0, io.SeekEnd) if err != nil { return err } - if _, err := in.Seek(off, os.SEEK_SET); err != nil { + if _, err := in.Seek(off, io.SeekStart); err != nil { return err } diff --git a/cmd/influx_tsm/tsdb/database.go b/cmd/influx_tsm/tsdb/database.go index bb2513145d..94003d7672 100644 --- a/cmd/influx_tsm/tsdb/database.go +++ b/cmd/influx_tsm/tsdb/database.go @@ -64,7 +64,7 @@ func (s ShardInfos) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s ShardInfos) Less(i, j int) bool { if s[i].Database == s[j].Database { if s[i].RetentionPolicy == s[j].RetentionPolicy { - return s[i].Path < s[i].Path + return s[i].Path < s[j].Path } return s[i].RetentionPolicy < s[j].RetentionPolicy diff --git a/cmd/influxd/restore/restore.go b/cmd/influxd/restore/restore.go index 253cb81e0a..932aeb7236 100644 --- a/cmd/influxd/restore/restore.go +++ b/cmd/influxd/restore/restore.go @@ -6,16 +6,13 @@ import ( "archive/tar" "bytes" "encoding/binary" - "errors" "flag" "fmt" "io" "io/ioutil" - "net" "os" "path/filepath" "strconv" - "sync" "github.com/influxdata/influxdb/cmd/influxd/backup" "github.com/influxdata/influxdb/services/meta" @@ -356,33 +353,3 @@ Usage: influxd restore [flags] PATH `) } - -type nopListener struct { - mu sync.Mutex - closing chan struct{} -} - -func newNopListener() *nopListener { - return &nopListener{closing: make(chan struct{})} -} - -func (ln *nopListener) Accept() (net.Conn, error) { - ln.mu.Lock() - defer ln.mu.Unlock() - - <-ln.closing - return nil, errors.New("listener closing") -} - -func (ln *nopListener) Close() error { - if ln.closing != nil { - close(ln.closing) - ln.mu.Lock() - defer ln.mu.Unlock() - - ln.closing = nil - } - return nil -} - -func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} } diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 7449fd9b82..1071ab56d0 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -546,21 +546,6 @@ func (s *Server) reportServer() { go cl.Save(usage) } -// monitorErrorChan reads an error channel and resends it through the server. -func (s *Server) monitorErrorChan(ch <-chan error) { - for { - select { - case err, ok := <-ch: - if !ok { - return - } - s.err <- err - case <-s.closing: - return - } - } -} - // Service represents a service attached to the server. type Service interface { WithLogger(log zap.Logger) @@ -612,11 +597,6 @@ func stopProfile() { } } -type tcpaddr struct{ host string } - -func (a *tcpaddr) Network() string { return "tcp" } -func (a *tcpaddr) String() string { return a.host } - // monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps // to prevent a circular dependency between the `cluster` and `monitor` packages. type monitorPointsWriter coordinator.PointsWriter diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index bd95501e3f..8ca27663ee 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "io/ioutil" - "math" "net/http" "net/url" "os" @@ -22,8 +21,6 @@ import ( "github.com/influxdata/influxdb/toml" ) -const emptyResults = `{"results":[{}]}` - // Server represents a test wrapper for run.Server. type Server struct { *run.Server @@ -250,11 +247,6 @@ func newRetentionPolicySpec(name string, rf int, duration time.Duration) *meta.R return &meta.RetentionPolicySpec{Name: name, ReplicaN: &rf, Duration: &duration} } -func maxFloat64() string { - maxFloat64, _ := json.Marshal(math.MaxFloat64) - return string(maxFloat64) -} - func maxInt64() string { maxInt64, _ := json.Marshal(^int64(0)) return string(maxInt64) diff --git a/coordinator/points_writer.go b/coordinator/points_writer.go index 5c884a1f3d..7f3829b68e 100644 --- a/coordinator/points_writer.go +++ b/coordinator/points_writer.go @@ -53,7 +53,6 @@ type PointsWriter struct { Database(name string) (di *meta.DatabaseInfo) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) - ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) } TSDBStore interface { @@ -61,10 +60,6 @@ type PointsWriter struct { WriteToShard(shardID uint64, points []models.Point) error } - ShardWriter interface { - WriteShard(shardID, ownerID uint64, points []models.Point) error - } - Subscriber interface { Points() chan<- *WritePointsRequest } diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 86752e9d6e..8563ea8f5a 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -312,13 +312,6 @@ func TestPointsWriter_WritePoints(t *testing.T) { // Local coordinator.Node ShardWriter // lock on the write increment since these functions get called in parallel var mu sync.Mutex - sw := &fakeShardWriter{ - ShardWriteFn: func(shardID, nodeID uint64, points []models.Point) error { - mu.Lock() - defer mu.Unlock() - return theTest.err[int(nodeID)-1] - }, - } store := &fakeStore{ WriteFn: func(shardID uint64, points []models.Point) error { @@ -341,7 +334,6 @@ func TestPointsWriter_WritePoints(t *testing.T) { c := coordinator.NewPointsWriter() c.MetaClient = ms - c.ShardWriter = sw c.TSDBStore = store c.Subscriber = sub c.Node = &influxdb.Node{ID: 1} @@ -493,14 +485,6 @@ func TestBufferedPointsWriter(t *testing.T) { var shardID uint64 -type fakeShardWriter struct { - ShardWriteFn func(shardID, nodeID uint64, points []models.Point) error -} - -func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Point) error { - return f.ShardWriteFn(shardID, nodeID, points) -} - type fakeStore struct { WriteFn func(shardID uint64, points []models.Point) error CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 750df19642..4666a2339b 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -1174,62 +1174,3 @@ func joinUint64(a []uint64) string { } return buf.String() } - -// stringSet represents a set of strings. -type stringSet map[string]struct{} - -// newStringSet returns an empty stringSet. -func newStringSet() stringSet { - return make(map[string]struct{}) -} - -// add adds strings to the set. -func (s stringSet) add(ss ...string) { - for _, n := range ss { - s[n] = struct{}{} - } -} - -// contains returns whether the set contains the given string. -func (s stringSet) contains(ss string) bool { - _, ok := s[ss] - return ok -} - -// list returns the current elements in the set, in sorted order. -func (s stringSet) list() []string { - l := make([]string, 0, len(s)) - for k := range s { - l = append(l, k) - } - sort.Strings(l) - return l -} - -// union returns the union of this set and another. -func (s stringSet) union(o stringSet) stringSet { - ns := newStringSet() - for k := range s { - ns[k] = struct{}{} - } - for k := range o { - ns[k] = struct{}{} - } - return ns -} - -// intersect returns the intersection of this set and another. -func (s stringSet) intersect(o stringSet) stringSet { - shorter, longer := s, o - if len(longer) < len(shorter) { - shorter, longer = longer, shorter - } - - ns := newStringSet() - for k := range shorter { - if _, ok := longer[k]; ok { - ns[k] = struct{}{} - } - } - return ns -} diff --git a/errors.go b/errors.go index 13c782ad32..ae19ecbafe 100644 --- a/errors.go +++ b/errors.go @@ -32,13 +32,3 @@ func IsClientError(err error) bool { return false } - -const upgradeMessage = `******************************************************************* - UNSUPPORTED SHARD FORMAT DETECTED - -As of version 0.11, only tsm shards are supported. Please use the -influx_tsm tool to convert non-tsm shards. - -More information can be found at the documentation site: -https://docs.influxdata.com/influxdb/v0.10/administration/upgrading -*******************************************************************` diff --git a/models/points.go b/models/points.go index 3f7cd5e639..f76fcc83fd 100644 --- a/models/points.go +++ b/models/points.go @@ -1820,23 +1820,6 @@ func (a Tags) HashKey() []byte { // values. type Fields map[string]interface{} -func parseNumber(val []byte) (interface{}, error) { - if val[len(val)-1] == 'i' { - val = val[:len(val)-1] - return parseIntBytes(val, 10, 64) - } - for i := 0; i < len(val); i++ { - // If there is a decimal or an N (NaN), I (Inf), parse as float - if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' { - return parseFloatBytes(val, 64) - } - if val[i] < '0' && val[i] > '9' { - return string(val), nil - } - } - return parseFloatBytes(val, 64) -} - // FieldIterator retuns a FieldIterator that can be used to traverse the // fields of a point without constructing the in-memory map. func (p *point) FieldIterator() FieldIterator { diff --git a/monitor/service.go b/monitor/service.go index eb0d39ad06..cdcea2fe3f 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -48,13 +48,10 @@ type Monitor struct { done chan struct{} storeCreated bool storeEnabled bool - storeAddress string - storeDatabase string - storeRetentionPolicy string - storeRetentionDuration time.Duration - storeReplicationFactor int - storeInterval time.Duration + storeDatabase string + storeRetentionPolicy string + storeInterval time.Duration MetaClient interface { CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) diff --git a/stress/v2/stressql/statement/parser.go b/stress/v2/stressql/statement/parser.go index 8326352986..57e802bdd5 100644 --- a/stress/v2/stressql/statement/parser.go +++ b/stress/v2/stressql/statement/parser.go @@ -57,47 +57,8 @@ const ( keywordEnd ) -var tokens = [...]string{ - ILLEGAL: "ILLEGAL", - EOF: "EOF", - WS: "WS", - - IDENT: "IDENT", - NUMBER: "NUMBER", - DURATIONVAL: "DURATION", - STRING: "STRING", - BADSTRING: "BADSTRING", - TEMPLATEVAR: "TEMPLATEVAR", - - COMMA: ",", - PERIOD: ".", - LPAREN: "(", - RPAREN: ")", - LBRACKET: "[", - RBRACKET: "]", - PIPE: "|", - - SET: "SET", - USE: "USE", - QUERY: "QUERY", - INSERT: "INSERT", - EXEC: "EXEC", - DO: "DO", - GO: "GO", - WAIT: "WAIT", - INT: "INT", - FLOAT: "FLOAT", - STR: "STRING", -} - var eof = rune(1) -func check(e error) { - if e != nil { - panic(e) - } -} - func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' } func isDigit(r rune) bool { @@ -128,12 +89,6 @@ func (s *Scanner) read() rune { func (s *Scanner) unread() { _ = s.r.UnreadRune() } -func (s *Scanner) peek() rune { - ch := s.read() - s.unread() - return ch -} - // Scan moves to the next character in the file and returns a tokenized version as well as the literal func (s *Scanner) Scan() (tok Token, lit string) { ch := s.read() diff --git a/tcp/mux_test.go b/tcp/mux_test.go index 7e552ee250..f99cc540d5 100644 --- a/tcp/mux_test.go +++ b/tcp/mux_test.go @@ -2,6 +2,7 @@ package tcp_test import ( "bytes" + "fmt" "io" "io/ioutil" "log" @@ -41,6 +42,8 @@ func TestMux(t *testing.T) { if !testing.Verbose() { mux.Logger = log.New(ioutil.Discard, "", 0) } + + errC := make(chan error, n) for i := uint8(0); i < n; i++ { ln := mux.Listen(byte(i)) @@ -58,7 +61,8 @@ func TestMux(t *testing.T) { // doesn't match then expect close. if len(msg) == 0 || msg[0] != byte(i) { if err == nil || err.Error() != "network connection closed" { - t.Fatalf("unexpected error: %s", err) + errC <- fmt.Errorf("unexpected error: %s", err) + return } return } @@ -67,14 +71,17 @@ func TestMux(t *testing.T) { // then expect a connection and read the message. var buf bytes.Buffer if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil { - t.Fatal(err) + errC <- err + return } else if !bytes.Equal(msg[1:], buf.Bytes()) { - t.Fatalf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes()) + errC <- fmt.Errorf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes()) + return } // Write response. if _, err := conn.Write([]byte("OK")); err != nil { - t.Fatal(err) + errC <- err + return } }(i, ln) } @@ -116,6 +123,11 @@ func TestMux(t *testing.T) { tcpListener.Close() wg.Wait() + close(errC) + if err := <-errC; err != nil { + t.Fatal(err) + } + return true }, nil); err != nil { t.Error(err) diff --git a/tsdb/engine/tsm1/mmap_unix.go b/tsdb/engine/tsm1/mmap_unix.go index e652f583cb..0a94e7259b 100644 --- a/tsdb/engine/tsm1/mmap_unix.go +++ b/tsdb/engine/tsm1/mmap_unix.go @@ -5,7 +5,6 @@ package tsm1 import ( "os" "syscall" - "unsafe" ) func mmap(f *os.File, offset int64, length int) ([]byte, error) { @@ -20,12 +19,3 @@ func mmap(f *os.File, offset int64, length int) ([]byte, error) { func munmap(b []byte) (err error) { return syscall.Munmap(b) } - -// From: github.com/boltdb/bolt/bolt_unix.go -func madvise(b []byte, advice int) (err error) { - _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) - if e1 != 0 { - err = e1 - } - return -}