Remove dead code from various pkgs

pull/7837/head
Edd Robinson 2017-01-13 17:43:38 +00:00
parent 7374e48999
commit fb7388cdfc
16 changed files with 39 additions and 241 deletions

View File

@ -4,6 +4,7 @@ package client // import "github.com/influxdata/influxdb/client"
import ( import (
"bytes" "bytes"
"context"
"crypto/tls" "crypto/tls"
"encoding/json" "encoding/json"
"errors" "errors"
@ -146,7 +147,7 @@ func NewClient(c Config) (*Client, error) {
// No need for compression in local communications. // No need for compression in local communications.
tr.DisableCompression = true tr.DisableCompression = true
tr.Dial = func(_, _ string) (net.Conn, error) { tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", c.UnixSocket) return net.Dial("unix", c.UnixSocket)
} }
} }

View File

@ -2,6 +2,7 @@ package client
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
@ -257,16 +258,19 @@ func TestClient_Concurrent_Use(t *testing.T) {
wg.Add(3) wg.Add(3)
n := 1000 n := 1000
errC := make(chan error, 3)
go func() { go func() {
defer wg.Done() defer wg.Done()
bp, err := NewBatchPoints(BatchPointsConfig{}) bp, err := NewBatchPoints(BatchPointsConfig{})
if err != nil { if err != nil {
t.Errorf("got error %v", err) errC <- fmt.Errorf("got error %v", err)
return
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if err = c.Write(bp); err != nil { if err = c.Write(bp); err != nil {
t.Fatalf("got error %v", err) errC <- fmt.Errorf("got error %v", err)
return
} }
} }
}() }()
@ -276,7 +280,8 @@ func TestClient_Concurrent_Use(t *testing.T) {
var q Query var q Query
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
if _, err := c.Query(q); err != nil { if _, err := c.Query(q); err != nil {
t.Fatalf("got error %v", err) errC <- fmt.Errorf("got error %v", err)
return
} }
} }
}() }()
@ -287,7 +292,13 @@ func TestClient_Concurrent_Use(t *testing.T) {
c.Ping(time.Second) c.Ping(time.Second)
} }
}() }()
wg.Wait() wg.Wait()
close(errC)
if err := <-errC; err != nil {
t.Fatal(err)
}
} }
func TestClient_Write(t *testing.T) { func TestClient_Write(t *testing.T) {

View File

@ -302,7 +302,7 @@ func backupDatabase(db string) error {
if err := out.Truncate(0); err != nil { if err := out.Truncate(0); err != nil {
return err return err
} }
if _, err := out.Seek(0, os.SEEK_SET); err != nil { if _, err := out.Seek(0, io.SeekStart); err != nil {
return err return err
} }
} }
@ -311,11 +311,11 @@ func backupDatabase(db string) error {
log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size()) log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size())
} }
off, err := out.Seek(0, os.SEEK_END) off, err := out.Seek(0, io.SeekEnd)
if err != nil { if err != nil {
return err return err
} }
if _, err := in.Seek(off, os.SEEK_SET); err != nil { if _, err := in.Seek(off, io.SeekStart); err != nil {
return err return err
} }

View File

@ -64,7 +64,7 @@ func (s ShardInfos) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ShardInfos) Less(i, j int) bool { func (s ShardInfos) Less(i, j int) bool {
if s[i].Database == s[j].Database { if s[i].Database == s[j].Database {
if s[i].RetentionPolicy == s[j].RetentionPolicy { if s[i].RetentionPolicy == s[j].RetentionPolicy {
return s[i].Path < s[i].Path return s[i].Path < s[j].Path
} }
return s[i].RetentionPolicy < s[j].RetentionPolicy return s[i].RetentionPolicy < s[j].RetentionPolicy

View File

@ -6,16 +6,13 @@ import (
"archive/tar" "archive/tar"
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors"
"flag" "flag"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path/filepath" "path/filepath"
"strconv" "strconv"
"sync"
"github.com/influxdata/influxdb/cmd/influxd/backup" "github.com/influxdata/influxdb/cmd/influxd/backup"
"github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/meta"
@ -356,33 +353,3 @@ Usage: influxd restore [flags] PATH
`) `)
} }
type nopListener struct {
mu sync.Mutex
closing chan struct{}
}
func newNopListener() *nopListener {
return &nopListener{closing: make(chan struct{})}
}
func (ln *nopListener) Accept() (net.Conn, error) {
ln.mu.Lock()
defer ln.mu.Unlock()
<-ln.closing
return nil, errors.New("listener closing")
}
func (ln *nopListener) Close() error {
if ln.closing != nil {
close(ln.closing)
ln.mu.Lock()
defer ln.mu.Unlock()
ln.closing = nil
}
return nil
}
func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} }

View File

@ -546,21 +546,6 @@ func (s *Server) reportServer() {
go cl.Save(usage) go cl.Save(usage)
} }
// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {
select {
case err, ok := <-ch:
if !ok {
return
}
s.err <- err
case <-s.closing:
return
}
}
}
// Service represents a service attached to the server. // Service represents a service attached to the server.
type Service interface { type Service interface {
WithLogger(log zap.Logger) WithLogger(log zap.Logger)
@ -612,11 +597,6 @@ func stopProfile() {
} }
} }
type tcpaddr struct{ host string }
func (a *tcpaddr) Network() string { return "tcp" }
func (a *tcpaddr) String() string { return a.host }
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps // monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
// to prevent a circular dependency between the `cluster` and `monitor` packages. // to prevent a circular dependency between the `cluster` and `monitor` packages.
type monitorPointsWriter coordinator.PointsWriter type monitorPointsWriter coordinator.PointsWriter

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -22,8 +21,6 @@ import (
"github.com/influxdata/influxdb/toml" "github.com/influxdata/influxdb/toml"
) )
const emptyResults = `{"results":[{}]}`
// Server represents a test wrapper for run.Server. // Server represents a test wrapper for run.Server.
type Server struct { type Server struct {
*run.Server *run.Server
@ -250,11 +247,6 @@ func newRetentionPolicySpec(name string, rf int, duration time.Duration) *meta.R
return &meta.RetentionPolicySpec{Name: name, ReplicaN: &rf, Duration: &duration} return &meta.RetentionPolicySpec{Name: name, ReplicaN: &rf, Duration: &duration}
} }
func maxFloat64() string {
maxFloat64, _ := json.Marshal(math.MaxFloat64)
return string(maxFloat64)
}
func maxInt64() string { func maxInt64() string {
maxInt64, _ := json.Marshal(^int64(0)) maxInt64, _ := json.Marshal(^int64(0))
return string(maxInt64) return string(maxInt64)

View File

@ -53,7 +53,6 @@ type PointsWriter struct {
Database(name string) (di *meta.DatabaseInfo) Database(name string) (di *meta.DatabaseInfo)
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error) RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
} }
TSDBStore interface { TSDBStore interface {
@ -61,10 +60,6 @@ type PointsWriter struct {
WriteToShard(shardID uint64, points []models.Point) error WriteToShard(shardID uint64, points []models.Point) error
} }
ShardWriter interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}
Subscriber interface { Subscriber interface {
Points() chan<- *WritePointsRequest Points() chan<- *WritePointsRequest
} }

View File

@ -312,13 +312,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
// Local coordinator.Node ShardWriter // Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel // lock on the write increment since these functions get called in parallel
var mu sync.Mutex var mu sync.Mutex
sw := &fakeShardWriter{
ShardWriteFn: func(shardID, nodeID uint64, points []models.Point) error {
mu.Lock()
defer mu.Unlock()
return theTest.err[int(nodeID)-1]
},
}
store := &fakeStore{ store := &fakeStore{
WriteFn: func(shardID uint64, points []models.Point) error { WriteFn: func(shardID uint64, points []models.Point) error {
@ -341,7 +334,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
c := coordinator.NewPointsWriter() c := coordinator.NewPointsWriter()
c.MetaClient = ms c.MetaClient = ms
c.ShardWriter = sw
c.TSDBStore = store c.TSDBStore = store
c.Subscriber = sub c.Subscriber = sub
c.Node = &influxdb.Node{ID: 1} c.Node = &influxdb.Node{ID: 1}
@ -493,14 +485,6 @@ func TestBufferedPointsWriter(t *testing.T) {
var shardID uint64 var shardID uint64
type fakeShardWriter struct {
ShardWriteFn func(shardID, nodeID uint64, points []models.Point) error
}
func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Point) error {
return f.ShardWriteFn(shardID, nodeID, points)
}
type fakeStore struct { type fakeStore struct {
WriteFn func(shardID uint64, points []models.Point) error WriteFn func(shardID uint64, points []models.Point) error
CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error

View File

@ -1174,62 +1174,3 @@ func joinUint64(a []uint64) string {
} }
return buf.String() return buf.String()
} }
// stringSet represents a set of strings.
type stringSet map[string]struct{}
// newStringSet returns an empty stringSet.
func newStringSet() stringSet {
return make(map[string]struct{})
}
// add adds strings to the set.
func (s stringSet) add(ss ...string) {
for _, n := range ss {
s[n] = struct{}{}
}
}
// contains returns whether the set contains the given string.
func (s stringSet) contains(ss string) bool {
_, ok := s[ss]
return ok
}
// list returns the current elements in the set, in sorted order.
func (s stringSet) list() []string {
l := make([]string, 0, len(s))
for k := range s {
l = append(l, k)
}
sort.Strings(l)
return l
}
// union returns the union of this set and another.
func (s stringSet) union(o stringSet) stringSet {
ns := newStringSet()
for k := range s {
ns[k] = struct{}{}
}
for k := range o {
ns[k] = struct{}{}
}
return ns
}
// intersect returns the intersection of this set and another.
func (s stringSet) intersect(o stringSet) stringSet {
shorter, longer := s, o
if len(longer) < len(shorter) {
shorter, longer = longer, shorter
}
ns := newStringSet()
for k := range shorter {
if _, ok := longer[k]; ok {
ns[k] = struct{}{}
}
}
return ns
}

View File

@ -32,13 +32,3 @@ func IsClientError(err error) bool {
return false return false
} }
const upgradeMessage = `*******************************************************************
UNSUPPORTED SHARD FORMAT DETECTED
As of version 0.11, only tsm shards are supported. Please use the
influx_tsm tool to convert non-tsm shards.
More information can be found at the documentation site:
https://docs.influxdata.com/influxdb/v0.10/administration/upgrading
*******************************************************************`

View File

@ -1820,23 +1820,6 @@ func (a Tags) HashKey() []byte {
// values. // values.
type Fields map[string]interface{} type Fields map[string]interface{}
func parseNumber(val []byte) (interface{}, error) {
if val[len(val)-1] == 'i' {
val = val[:len(val)-1]
return parseIntBytes(val, 10, 64)
}
for i := 0; i < len(val); i++ {
// If there is a decimal or an N (NaN), I (Inf), parse as float
if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' {
return parseFloatBytes(val, 64)
}
if val[i] < '0' && val[i] > '9' {
return string(val), nil
}
}
return parseFloatBytes(val, 64)
}
// FieldIterator retuns a FieldIterator that can be used to traverse the // FieldIterator retuns a FieldIterator that can be used to traverse the
// fields of a point without constructing the in-memory map. // fields of a point without constructing the in-memory map.
func (p *point) FieldIterator() FieldIterator { func (p *point) FieldIterator() FieldIterator {

View File

@ -48,12 +48,9 @@ type Monitor struct {
done chan struct{} done chan struct{}
storeCreated bool storeCreated bool
storeEnabled bool storeEnabled bool
storeAddress string
storeDatabase string storeDatabase string
storeRetentionPolicy string storeRetentionPolicy string
storeRetentionDuration time.Duration
storeReplicationFactor int
storeInterval time.Duration storeInterval time.Duration
MetaClient interface { MetaClient interface {

View File

@ -57,47 +57,8 @@ const (
keywordEnd keywordEnd
) )
var tokens = [...]string{
ILLEGAL: "ILLEGAL",
EOF: "EOF",
WS: "WS",
IDENT: "IDENT",
NUMBER: "NUMBER",
DURATIONVAL: "DURATION",
STRING: "STRING",
BADSTRING: "BADSTRING",
TEMPLATEVAR: "TEMPLATEVAR",
COMMA: ",",
PERIOD: ".",
LPAREN: "(",
RPAREN: ")",
LBRACKET: "[",
RBRACKET: "]",
PIPE: "|",
SET: "SET",
USE: "USE",
QUERY: "QUERY",
INSERT: "INSERT",
EXEC: "EXEC",
DO: "DO",
GO: "GO",
WAIT: "WAIT",
INT: "INT",
FLOAT: "FLOAT",
STR: "STRING",
}
var eof = rune(1) var eof = rune(1)
func check(e error) {
if e != nil {
panic(e)
}
}
func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' } func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' }
func isDigit(r rune) bool { func isDigit(r rune) bool {
@ -128,12 +89,6 @@ func (s *Scanner) read() rune {
func (s *Scanner) unread() { _ = s.r.UnreadRune() } func (s *Scanner) unread() { _ = s.r.UnreadRune() }
func (s *Scanner) peek() rune {
ch := s.read()
s.unread()
return ch
}
// Scan moves to the next character in the file and returns a tokenized version as well as the literal // Scan moves to the next character in the file and returns a tokenized version as well as the literal
func (s *Scanner) Scan() (tok Token, lit string) { func (s *Scanner) Scan() (tok Token, lit string) {
ch := s.read() ch := s.read()

View File

@ -2,6 +2,7 @@ package tcp_test
import ( import (
"bytes" "bytes"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"log" "log"
@ -41,6 +42,8 @@ func TestMux(t *testing.T) {
if !testing.Verbose() { if !testing.Verbose() {
mux.Logger = log.New(ioutil.Discard, "", 0) mux.Logger = log.New(ioutil.Discard, "", 0)
} }
errC := make(chan error, n)
for i := uint8(0); i < n; i++ { for i := uint8(0); i < n; i++ {
ln := mux.Listen(byte(i)) ln := mux.Listen(byte(i))
@ -58,7 +61,8 @@ func TestMux(t *testing.T) {
// doesn't match then expect close. // doesn't match then expect close.
if len(msg) == 0 || msg[0] != byte(i) { if len(msg) == 0 || msg[0] != byte(i) {
if err == nil || err.Error() != "network connection closed" { if err == nil || err.Error() != "network connection closed" {
t.Fatalf("unexpected error: %s", err) errC <- fmt.Errorf("unexpected error: %s", err)
return
} }
return return
} }
@ -67,14 +71,17 @@ func TestMux(t *testing.T) {
// then expect a connection and read the message. // then expect a connection and read the message.
var buf bytes.Buffer var buf bytes.Buffer
if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil { if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil {
t.Fatal(err) errC <- err
return
} else if !bytes.Equal(msg[1:], buf.Bytes()) { } else if !bytes.Equal(msg[1:], buf.Bytes()) {
t.Fatalf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes()) errC <- fmt.Errorf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes())
return
} }
// Write response. // Write response.
if _, err := conn.Write([]byte("OK")); err != nil { if _, err := conn.Write([]byte("OK")); err != nil {
t.Fatal(err) errC <- err
return
} }
}(i, ln) }(i, ln)
} }
@ -116,6 +123,11 @@ func TestMux(t *testing.T) {
tcpListener.Close() tcpListener.Close()
wg.Wait() wg.Wait()
close(errC)
if err := <-errC; err != nil {
t.Fatal(err)
}
return true return true
}, nil); err != nil { }, nil); err != nil {
t.Error(err) t.Error(err)

View File

@ -5,7 +5,6 @@ package tsm1
import ( import (
"os" "os"
"syscall" "syscall"
"unsafe"
) )
func mmap(f *os.File, offset int64, length int) ([]byte, error) { func mmap(f *os.File, offset int64, length int) ([]byte, error) {
@ -20,12 +19,3 @@ func mmap(f *os.File, offset int64, length int) ([]byte, error) {
func munmap(b []byte) (err error) { func munmap(b []byte) (err error) {
return syscall.Munmap(b) return syscall.Munmap(b)
} }
// From: github.com/boltdb/bolt/bolt_unix.go
func madvise(b []byte, advice int) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
if e1 != 0 {
err = e1
}
return
}