Fix subtle bugs and remove dead code from services
parent
292b30b82b
commit
9d30ee0a6b
|
@ -313,24 +313,6 @@ func (w *TestService) WritePoints(database, retentionPolicy string, consistencyL
|
|||
return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
|
||||
}
|
||||
|
||||
func wait(c chan struct{}, d time.Duration) (err error) {
|
||||
select {
|
||||
case <-c:
|
||||
case <-time.After(d):
|
||||
err = errors.New("timed out")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func waitInt(c chan int, d time.Duration) (i int, err error) {
|
||||
select {
|
||||
case i = <-c:
|
||||
case <-time.After(d):
|
||||
err = errors.New("timed out")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func check(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
|
|
@ -8,9 +8,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/coordinator"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -523,13 +521,6 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
|
|||
return nil
|
||||
}
|
||||
|
||||
// QueryExecutor is a mock query executor.
|
||||
type QueryExecutor struct {
|
||||
*influxql.QueryExecutor
|
||||
Err error
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
// StatementExecutor is a mock statement executor.
|
||||
type StatementExecutor struct {
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
|
||||
|
@ -539,75 +530,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
|
|||
return e.ExecuteStatementFn(stmt, ctx)
|
||||
}
|
||||
|
||||
// NewQueryExecutor returns a *QueryExecutor.
|
||||
func NewQueryExecutor(t *testing.T) *QueryExecutor {
|
||||
e := influxql.NewQueryExecutor()
|
||||
e.StatementExecutor = &StatementExecutor{}
|
||||
return &QueryExecutor{
|
||||
QueryExecutor: e,
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
// PointsWriter is a mock points writer.
|
||||
type PointsWriter struct {
|
||||
WritePointsFn func(p *coordinator.WritePointsRequest) error
|
||||
Err error
|
||||
PointsPerSecond int
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
// NewPointsWriter returns a new *PointsWriter.
|
||||
func NewPointsWriter(t *testing.T) *PointsWriter {
|
||||
return &PointsWriter{
|
||||
PointsPerSecond: 25000,
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
// WritePoints mocks writing points.
|
||||
func (pw *PointsWriter) WritePoints(p *coordinator.WritePointsRequest) error {
|
||||
// If the test set a callback, call it.
|
||||
if pw.WritePointsFn != nil {
|
||||
if err := pw.WritePointsFn(p); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if pw.Err != nil {
|
||||
return pw.Err
|
||||
}
|
||||
ns := time.Duration((1 / pw.PointsPerSecond) * 1000000000)
|
||||
time.Sleep(ns)
|
||||
return nil
|
||||
}
|
||||
|
||||
// genResult generates a dummy query result.
|
||||
func genResult(rowCnt, valCnt int) *influxql.Result {
|
||||
rows := make(models.Rows, 0, rowCnt)
|
||||
now := time.Now()
|
||||
for n := 0; n < rowCnt; n++ {
|
||||
vals := make([][]interface{}, 0, valCnt)
|
||||
for m := 0; m < valCnt; m++ {
|
||||
vals = append(vals, []interface{}{now, float64(m)})
|
||||
now.Add(time.Second)
|
||||
}
|
||||
row := &models.Row{
|
||||
Name: "cpu",
|
||||
Tags: map[string]string{"host": "server01"},
|
||||
Columns: []string{"time", "value"},
|
||||
Values: vals,
|
||||
}
|
||||
if len(rows) > 0 {
|
||||
row.Name = fmt.Sprintf("cpu%d", len(rows)+1)
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return &influxql.Result{
|
||||
Series: rows,
|
||||
}
|
||||
}
|
||||
|
||||
func wait(c chan struct{}, d time.Duration) (err error) {
|
||||
select {
|
||||
case <-c:
|
||||
|
@ -616,18 +538,3 @@ func wait(c chan struct{}, d time.Duration) (err error) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
func waitInt(c chan int, d time.Duration) (i int, err error) {
|
||||
select {
|
||||
case i = <-c:
|
||||
case <-time.After(d):
|
||||
err = errors.New("timed out")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func check(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,7 +82,6 @@ type Service struct {
|
|||
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
|
||||
}
|
||||
MetaClient interface {
|
||||
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
|
|
|
@ -10,7 +10,6 @@ import (
|
|||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -611,10 +610,6 @@ func TestHandler_XForwardedFor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type invalidJSON struct{}
|
||||
|
||||
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
|
||||
|
||||
// NewHandler represents a test wrapper for httpd.Handler.
|
||||
type Handler struct {
|
||||
*httpd.Handler
|
||||
|
@ -707,21 +702,6 @@ func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request {
|
|||
return r
|
||||
}
|
||||
|
||||
// matchRegex returns true if a s matches pattern.
|
||||
func matchRegex(pattern, s string) bool {
|
||||
return regexp.MustCompile(pattern).MatchString(s)
|
||||
}
|
||||
|
||||
// NewResultChan returns a channel that sends all results and then closes.
|
||||
func NewResultChan(results ...*influxql.Result) <-chan *influxql.Result {
|
||||
ch := make(chan *influxql.Result, len(results))
|
||||
for _, r := range results {
|
||||
ch <- r
|
||||
}
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
// MustJWTToken returns a new JWT token and signed string or panics trying.
|
||||
func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) {
|
||||
token := jwt.New(jwt.GetSigningMethod("HS512"))
|
||||
|
|
|
@ -82,15 +82,22 @@ func BenchmarkLimitListener(b *testing.B) {
|
|||
wg.Add(b.N)
|
||||
|
||||
l := httpd.LimitListener(&fakeListener{}, b.N)
|
||||
errC := make(chan error, 1)
|
||||
for i := 0; i < b.N; i++ {
|
||||
go func() {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
c.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
close(errC)
|
||||
if err := <-errC; err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,13 +133,6 @@ func (c *Client) AcquireLease(name string) (*Lease, error) {
|
|||
return &l, nil
|
||||
}
|
||||
|
||||
func (c *Client) data() *Data {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
data := c.cacheData.Clone()
|
||||
return data
|
||||
}
|
||||
|
||||
// ClusterID returns the ID of the cluster it's connected to.
|
||||
func (c *Client) ClusterID() uint64 {
|
||||
c.mu.RLock()
|
||||
|
@ -980,21 +973,6 @@ func (c *Client) WithLogger(log zap.Logger) {
|
|||
c.logger = log.With(zap.String("service", "metaclient"))
|
||||
}
|
||||
|
||||
func (c *Client) updateAuthCache() {
|
||||
// copy cached user info for still-present users
|
||||
newCache := make(map[string]authUser, len(c.authCache))
|
||||
|
||||
for _, userInfo := range c.cacheData.Users {
|
||||
if cached, ok := c.authCache[userInfo.Name]; ok {
|
||||
if cached.bhash == userInfo.Hash {
|
||||
newCache[userInfo.Name] = cached
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c.authCache = newCache
|
||||
}
|
||||
|
||||
// snapshot saves the current meta data to disk.
|
||||
func snapshot(path string, data *Data) error {
|
||||
file := filepath.Join(path, metaFile)
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
|
@ -1098,33 +1096,3 @@ func testTempDir(skip int) string {
|
|||
}
|
||||
return dir
|
||||
}
|
||||
|
||||
func mustParseStatement(s string) influxql.Statement {
|
||||
stmt, err := influxql.ParseStatement(s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return stmt
|
||||
}
|
||||
|
||||
func mustMarshalJSON(v interface{}) string {
|
||||
b, e := json.Marshal(v)
|
||||
if e != nil {
|
||||
panic(e)
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func freePort() string {
|
||||
l, _ := net.Listen("tcp", "127.0.0.1:0")
|
||||
defer l.Close()
|
||||
return l.Addr().String()
|
||||
}
|
||||
|
||||
func freePorts(i int) []string {
|
||||
var ports []string
|
||||
for j := 0; j < i; j++ {
|
||||
ports = append(ports, freePort())
|
||||
}
|
||||
return ports
|
||||
}
|
||||
|
|
|
@ -738,25 +738,6 @@ type NodeInfo struct {
|
|||
TCPHost string
|
||||
}
|
||||
|
||||
// clone returns a deep copy of ni.
|
||||
func (ni NodeInfo) clone() NodeInfo { return ni }
|
||||
|
||||
// marshal serializes to a protobuf representation.
|
||||
func (ni NodeInfo) marshal() *internal.NodeInfo {
|
||||
pb := &internal.NodeInfo{}
|
||||
pb.ID = proto.Uint64(ni.ID)
|
||||
pb.Host = proto.String(ni.Host)
|
||||
pb.TCPHost = proto.String(ni.TCPHost)
|
||||
return pb
|
||||
}
|
||||
|
||||
// unmarshal deserializes from a protobuf representation.
|
||||
func (ni *NodeInfo) unmarshal(pb *internal.NodeInfo) {
|
||||
ni.ID = pb.GetID()
|
||||
ni.Host = pb.GetHost()
|
||||
ni.TCPHost = pb.GetTCPHost()
|
||||
}
|
||||
|
||||
// NodeInfos is a slice of NodeInfo used for sorting
|
||||
type NodeInfos []NodeInfo
|
||||
|
||||
|
|
|
@ -385,10 +385,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
|
|||
switch len(tsStr) {
|
||||
case 10:
|
||||
t = time.Unix(ts, 0)
|
||||
break
|
||||
case 13:
|
||||
t = time.Unix(ts/1000, (ts%1000)*1000)
|
||||
break
|
||||
default:
|
||||
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
|
||||
if s.LogPointErrors {
|
||||
|
|
|
@ -22,7 +22,6 @@ type Service struct {
|
|||
DeleteShard(shardID uint64) error
|
||||
}
|
||||
|
||||
enabled bool
|
||||
checkInterval time.Duration
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
|
|
|
@ -54,9 +54,6 @@ type Config struct {
|
|||
ReadBuffer int `toml:"read-buffer"`
|
||||
BatchTimeout toml.Duration `toml:"batch-timeout"`
|
||||
Precision string `toml:"precision"`
|
||||
|
||||
// Deprecated config option
|
||||
udpPayloadSize int `toml:"udp-payload-size"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new instance of Config with defaults.
|
||||
|
|
Loading…
Reference in New Issue