Merge pull request #7837 from influxdata/er-tidy

General tidy up and subtle bug fixes
pull/7892/head
Edd Robinson 2017-01-26 13:43:07 +00:00 committed by GitHub
commit 91ee34b111
47 changed files with 162 additions and 1032 deletions

View File

@ -4,6 +4,7 @@ package client // import "github.com/influxdata/influxdb/client"
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
@ -146,7 +147,7 @@ func NewClient(c Config) (*Client, error) {
// No need for compression in local communications.
tr.DisableCompression = true
tr.Dial = func(_, _ string) (net.Conn, error) {
tr.DialContext = func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", c.UnixSocket)
}
}

View File

@ -2,6 +2,7 @@ package client
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
@ -257,16 +258,19 @@ func TestClient_Concurrent_Use(t *testing.T) {
wg.Add(3)
n := 1000
errC := make(chan error)
go func() {
defer wg.Done()
bp, err := NewBatchPoints(BatchPointsConfig{})
if err != nil {
t.Errorf("got error %v", err)
errC <- fmt.Errorf("got error %v", err)
return
}
for i := 0; i < n; i++ {
if err = c.Write(bp); err != nil {
t.Fatalf("got error %v", err)
errC <- fmt.Errorf("got error %v", err)
return
}
}
}()
@ -276,7 +280,8 @@ func TestClient_Concurrent_Use(t *testing.T) {
var q Query
for i := 0; i < n; i++ {
if _, err := c.Query(q); err != nil {
t.Fatalf("got error %v", err)
errC <- fmt.Errorf("got error %v", err)
return
}
}
}()
@ -287,7 +292,17 @@ func TestClient_Concurrent_Use(t *testing.T) {
c.Ping(time.Second)
}
}()
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
t.Error(err)
}
}
}
func TestClient_Write(t *testing.T) {

View File

@ -302,7 +302,7 @@ func backupDatabase(db string) error {
if err := out.Truncate(0); err != nil {
return err
}
if _, err := out.Seek(0, os.SEEK_SET); err != nil {
if _, err := out.Seek(0, io.SeekStart); err != nil {
return err
}
}
@ -311,11 +311,11 @@ func backupDatabase(db string) error {
log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size())
}
off, err := out.Seek(0, os.SEEK_END)
off, err := out.Seek(0, io.SeekEnd)
if err != nil {
return err
}
if _, err := in.Seek(off, os.SEEK_SET); err != nil {
if _, err := in.Seek(off, io.SeekStart); err != nil {
return err
}

View File

@ -64,7 +64,7 @@ func (s ShardInfos) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s ShardInfos) Less(i, j int) bool {
if s[i].Database == s[j].Database {
if s[i].RetentionPolicy == s[j].RetentionPolicy {
return s[i].Path < s[i].Path
return s[i].Path < s[j].Path
}
return s[i].RetentionPolicy < s[j].RetentionPolicy

View File

@ -6,16 +6,13 @@ import (
"archive/tar"
"bytes"
"encoding/binary"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/influxdata/influxdb/cmd/influxd/backup"
"github.com/influxdata/influxdb/services/meta"
@ -356,33 +353,3 @@ Usage: influxd restore [flags] PATH
`)
}
type nopListener struct {
mu sync.Mutex
closing chan struct{}
}
func newNopListener() *nopListener {
return &nopListener{closing: make(chan struct{})}
}
func (ln *nopListener) Accept() (net.Conn, error) {
ln.mu.Lock()
defer ln.mu.Unlock()
<-ln.closing
return nil, errors.New("listener closing")
}
func (ln *nopListener) Close() error {
if ln.closing != nil {
close(ln.closing)
ln.mu.Lock()
defer ln.mu.Unlock()
ln.closing = nil
}
return nil
}
func (ln *nopListener) Addr() net.Addr { return &net.TCPAddr{} }

View File

@ -546,21 +546,6 @@ func (s *Server) reportServer() {
go cl.Save(usage)
}
// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {
select {
case err, ok := <-ch:
if !ok {
return
}
s.err <- err
case <-s.closing:
return
}
}
}
// Service represents a service attached to the server.
type Service interface {
WithLogger(log zap.Logger)
@ -612,11 +597,6 @@ func stopProfile() {
}
}
type tcpaddr struct{ host string }
func (a *tcpaddr) Network() string { return "tcp" }
func (a *tcpaddr) String() string { return a.host }
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
// to prevent a circular dependency between the `cluster` and `monitor` packages.
type monitorPointsWriter coordinator.PointsWriter

View File

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"net/url"
"os"
@ -22,8 +21,6 @@ import (
"github.com/influxdata/influxdb/toml"
)
const emptyResults = `{"results":[{}]}`
// Server represents a test wrapper for run.Server.
type Server struct {
*run.Server
@ -267,11 +264,6 @@ func newRetentionPolicySpec(name string, rf int, duration time.Duration) *meta.R
return &meta.RetentionPolicySpec{Name: name, ReplicaN: &rf, Duration: &duration}
}
func maxFloat64() string {
maxFloat64, _ := json.Marshal(math.MaxFloat64)
return string(maxFloat64)
}
func maxInt64() string {
maxInt64, _ := json.Marshal(^int64(0))
return string(maxInt64)

View File

@ -53,7 +53,6 @@ type PointsWriter struct {
Database(name string) (di *meta.DatabaseInfo)
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
}
TSDBStore interface {
@ -61,10 +60,6 @@ type PointsWriter struct {
WriteToShard(shardID uint64, points []models.Point) error
}
ShardWriter interface {
WriteShard(shardID, ownerID uint64, points []models.Point) error
}
Subscriber interface {
Points() chan<- *WritePointsRequest
}

View File

@ -312,13 +312,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
// Local coordinator.Node ShardWriter
// lock on the write increment since these functions get called in parallel
var mu sync.Mutex
sw := &fakeShardWriter{
ShardWriteFn: func(shardID, nodeID uint64, points []models.Point) error {
mu.Lock()
defer mu.Unlock()
return theTest.err[int(nodeID)-1]
},
}
store := &fakeStore{
WriteFn: func(shardID uint64, points []models.Point) error {
@ -341,7 +334,6 @@ func TestPointsWriter_WritePoints(t *testing.T) {
c := coordinator.NewPointsWriter()
c.MetaClient = ms
c.ShardWriter = sw
c.TSDBStore = store
c.Subscriber = sub
c.Node = &influxdb.Node{ID: 1}
@ -493,14 +485,6 @@ func TestBufferedPointsWriter(t *testing.T) {
var shardID uint64
type fakeShardWriter struct {
ShardWriteFn func(shardID, nodeID uint64, points []models.Point) error
}
func (f *fakeShardWriter) WriteShard(shardID, nodeID uint64, points []models.Point) error {
return f.ShardWriteFn(shardID, nodeID, points)
}
type fakeStore struct {
WriteFn func(shardID uint64, points []models.Point) error
CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error

View File

@ -1182,62 +1182,3 @@ func joinUint64(a []uint64) string {
}
return buf.String()
}
// stringSet represents a set of strings.
type stringSet map[string]struct{}
// newStringSet returns an empty stringSet.
func newStringSet() stringSet {
return make(map[string]struct{})
}
// add adds strings to the set.
func (s stringSet) add(ss ...string) {
for _, n := range ss {
s[n] = struct{}{}
}
}
// contains returns whether the set contains the given string.
func (s stringSet) contains(ss string) bool {
_, ok := s[ss]
return ok
}
// list returns the current elements in the set, in sorted order.
func (s stringSet) list() []string {
l := make([]string, 0, len(s))
for k := range s {
l = append(l, k)
}
sort.Strings(l)
return l
}
// union returns the union of this set and another.
func (s stringSet) union(o stringSet) stringSet {
ns := newStringSet()
for k := range s {
ns[k] = struct{}{}
}
for k := range o {
ns[k] = struct{}{}
}
return ns
}
// intersect returns the intersection of this set and another.
func (s stringSet) intersect(o stringSet) stringSet {
shorter, longer := s, o
if len(longer) < len(shorter) {
shorter, longer = longer, shorter
}
ns := newStringSet()
for k := range shorter {
if _, ok := longer[k]; ok {
ns[k] = struct{}{}
}
}
return ns
}

View File

@ -32,13 +32,3 @@ func IsClientError(err error) bool {
return false
}
const upgradeMessage = `*******************************************************************
UNSUPPORTED SHARD FORMAT DETECTED
As of version 0.11, only tsm shards are supported. Please use the
influx_tsm tool to convert non-tsm shards.
More information can be found at the documentation site:
https://docs.influxdata.com/influxdb/v0.10/administration/upgrading
*******************************************************************`

View File

@ -2421,45 +2421,6 @@ func walkFunctionCalls(exp Expr) []*Call {
return nil
}
// filterExprBySource filters an expression to exclude expressions unrelated to a source.
func filterExprBySource(name string, expr Expr) Expr {
switch expr := expr.(type) {
case *VarRef:
if !strings.HasPrefix(expr.Val, name) {
return nil
}
case *BinaryExpr:
lhs := filterExprBySource(name, expr.LHS)
rhs := filterExprBySource(name, expr.RHS)
// If an expr is logical then return either LHS/RHS or both.
// If an expr is arithmetic or comparative then require both sides.
if expr.Op == AND || expr.Op == OR {
if lhs == nil && rhs == nil {
return nil
} else if lhs != nil && rhs == nil {
return lhs
} else if lhs == nil && rhs != nil {
return rhs
}
} else {
if lhs == nil || rhs == nil {
return nil
}
}
return &BinaryExpr{Op: expr.Op, LHS: lhs, RHS: rhs}
case *ParenExpr:
exp := filterExprBySource(name, expr.Expr)
if exp == nil {
return nil
}
return &ParenExpr{Expr: exp}
}
return expr
}
// MatchSource returns the source name that matches a field name.
// It returns a blank string if no sources match.
func MatchSource(sources Sources, name string) string {

View File

@ -4,7 +4,6 @@ import (
"bytes"
"fmt"
"math"
"math/rand"
"reflect"
"testing"
"time"
@ -1075,29 +1074,6 @@ func FloatIterators(inputs []*FloatIterator) []influxql.Iterator {
return itrs
}
// GenerateFloatIterator creates a FloatIterator with random data.
func GenerateFloatIterator(rand *rand.Rand, valueN int) *FloatIterator {
const interval = 10 * time.Second
itr := &FloatIterator{
Points: make([]influxql.FloatPoint, valueN),
}
for i := 0; i < valueN; i++ {
// Generate incrementing timestamp with some jitter (1s).
jitter := (rand.Int63n(2) * int64(time.Second))
timestamp := int64(i)*int64(10*time.Second) + jitter
itr.Points[i] = influxql.FloatPoint{
Name: "cpu",
Time: timestamp,
Value: rand.Float64(),
}
}
return itr
}
// Test implementation of influxql.IntegerIterator
type IntegerIterator struct {
Points []influxql.IntegerPoint

View File

@ -568,22 +568,6 @@ func (p *Parser) parseInt(min, max int) (int, error) {
return n, nil
}
// parseUInt32 parses a string and returns a 32-bit unsigned integer literal.
func (p *Parser) parseUInt32() (uint32, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok != INTEGER {
return 0, newParseError(tokstr(tok, lit), []string{"integer"}, pos)
}
// Convert string to unsigned 32-bit integer
n, err := strconv.ParseUint(lit, 10, 32)
if err != nil {
return 0, &ParseError{Message: err.Error(), Pos: pos}
}
return uint32(n), nil
}
// parseUInt64 parses a string and returns a 64-bit unsigned integer literal.
func (p *Parser) parseUInt64() (uint64, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
@ -1845,37 +1829,6 @@ func (p *Parser) parseDropUserStatement() (*DropUserStatement, error) {
return stmt, nil
}
// parseRetentionPolicy parses a string and returns a retention policy name.
// This function assumes the "WITH" token has already been consumed.
func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) {
// Check for optional DEFAULT token.
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == DEFAULT {
dfault = true
tok, pos, lit = p.scanIgnoreWhitespace()
}
// Check for required RETENTION token.
if tok != RETENTION {
err = newParseError(tokstr(tok, lit), []string{"RETENTION"}, pos)
return
}
// Check of required POLICY token.
if tok, pos, lit = p.scanIgnoreWhitespace(); tok != POLICY {
err = newParseError(tokstr(tok, lit), []string{"POLICY"}, pos)
return
}
// Parse retention policy name.
name, err = p.parseIdent()
if err != nil {
return
}
return
}
// parseShowShardGroupsStatement parses a string for "SHOW SHARD GROUPS" statement.
// This function assumes the "SHOW SHARD GROUPS" tokens have already been consumed.
func (p *Parser) parseShowShardGroupsStatement() (*ShowShardGroupsStatement, error) {

View File

@ -554,7 +554,6 @@ func ScanString(r io.RuneScanner) (string, error) {
var errBadString = errors.New("bad string")
var errBadEscape = errors.New("bad escape")
var errBadRegex = errors.New("bad regex")
// ScanBareIdent reads bare identifier from a rune reader.
func ScanBareIdent(r io.RuneScanner) string {
@ -575,16 +574,7 @@ func ScanBareIdent(r io.RuneScanner) string {
return buf.String()
}
var errInvalidIdentifier = errors.New("invalid identifier")
// IsRegexOp returns true if the operator accepts a regex operand.
func IsRegexOp(t Token) bool {
return (t == EQREGEX || t == NEQREGEX)
}
// assert will panic with a given formatted message if the given condition is false.
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}

View File

@ -1820,23 +1820,6 @@ func (a Tags) HashKey() []byte {
// values.
type Fields map[string]interface{}
func parseNumber(val []byte) (interface{}, error) {
if val[len(val)-1] == 'i' {
val = val[:len(val)-1]
return parseIntBytes(val, 10, 64)
}
for i := 0; i < len(val); i++ {
// If there is a decimal or an N (NaN), I (Inf), parse as float
if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' {
return parseFloatBytes(val, 64)
}
if val[i] < '0' && val[i] > '9' {
return string(val), nil
}
}
return parseFloatBytes(val, 64)
}
// FieldIterator retuns a FieldIterator that can be used to traverse the
// fields of a point without constructing the in-memory map.
func (p *point) FieldIterator() FieldIterator {

View File

@ -48,13 +48,10 @@ type Monitor struct {
done chan struct{}
storeCreated bool
storeEnabled bool
storeAddress string
storeDatabase string
storeRetentionPolicy string
storeRetentionDuration time.Duration
storeReplicationFactor int
storeInterval time.Duration
storeDatabase string
storeRetentionPolicy string
storeInterval time.Duration
MetaClient interface {
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)

View File

@ -313,24 +313,6 @@ func (w *TestService) WritePoints(database, retentionPolicy string, consistencyL
return w.WritePointsFn(database, retentionPolicy, consistencyLevel, points)
}
func wait(c chan struct{}, d time.Duration) (err error) {
select {
case <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}
func waitInt(c chan int, d time.Duration) (i int, err error) {
select {
case i = <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}
func check(err error) {
if err != nil {
panic(err)

View File

@ -8,9 +8,7 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/coordinator"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"go.uber.org/zap"
)
@ -523,13 +521,6 @@ func (ms *MetaClient) CreateContinuousQuery(database, name, query string) error
return nil
}
// QueryExecutor is a mock query executor.
type QueryExecutor struct {
*influxql.QueryExecutor
Err error
t *testing.T
}
// StatementExecutor is a mock statement executor.
type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
@ -539,75 +530,6 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influx
return e.ExecuteStatementFn(stmt, ctx)
}
// NewQueryExecutor returns a *QueryExecutor.
func NewQueryExecutor(t *testing.T) *QueryExecutor {
e := influxql.NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{}
return &QueryExecutor{
QueryExecutor: e,
t: t,
}
}
// PointsWriter is a mock points writer.
type PointsWriter struct {
WritePointsFn func(p *coordinator.WritePointsRequest) error
Err error
PointsPerSecond int
t *testing.T
}
// NewPointsWriter returns a new *PointsWriter.
func NewPointsWriter(t *testing.T) *PointsWriter {
return &PointsWriter{
PointsPerSecond: 25000,
t: t,
}
}
// WritePoints mocks writing points.
func (pw *PointsWriter) WritePoints(p *coordinator.WritePointsRequest) error {
// If the test set a callback, call it.
if pw.WritePointsFn != nil {
if err := pw.WritePointsFn(p); err != nil {
return err
}
}
if pw.Err != nil {
return pw.Err
}
ns := time.Duration((1 / pw.PointsPerSecond) * 1000000000)
time.Sleep(ns)
return nil
}
// genResult generates a dummy query result.
func genResult(rowCnt, valCnt int) *influxql.Result {
rows := make(models.Rows, 0, rowCnt)
now := time.Now()
for n := 0; n < rowCnt; n++ {
vals := make([][]interface{}, 0, valCnt)
for m := 0; m < valCnt; m++ {
vals = append(vals, []interface{}{now, float64(m)})
now.Add(time.Second)
}
row := &models.Row{
Name: "cpu",
Tags: map[string]string{"host": "server01"},
Columns: []string{"time", "value"},
Values: vals,
}
if len(rows) > 0 {
row.Name = fmt.Sprintf("cpu%d", len(rows)+1)
}
rows = append(rows, row)
}
return &influxql.Result{
Series: rows,
}
}
func wait(c chan struct{}, d time.Duration) (err error) {
select {
case <-c:
@ -616,18 +538,3 @@ func wait(c chan struct{}, d time.Duration) (err error) {
}
return
}
func waitInt(c chan int, d time.Duration) (i int, err error) {
select {
case i = <-c:
case <-time.After(d):
err = errors.New("timed out")
}
return
}
func check(err error) {
if err != nil {
panic(err)
}
}

View File

@ -82,7 +82,6 @@ type Service struct {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
MetaClient interface {
CreateDatabase(name string) (*meta.DatabaseInfo, error)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
Database(name string) *meta.DatabaseInfo

View File

@ -10,7 +10,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"strings"
"testing"
"time"
@ -611,10 +610,6 @@ func TestHandler_XForwardedFor(t *testing.T) {
}
}
type invalidJSON struct{}
func (*invalidJSON) MarshalJSON() ([]byte, error) { return nil, errors.New("marker") }
// NewHandler represents a test wrapper for httpd.Handler.
type Handler struct {
*httpd.Handler
@ -707,21 +702,6 @@ func MustNewJSONRequest(method, urlStr string, body io.Reader) *http.Request {
return r
}
// matchRegex returns true if a s matches pattern.
func matchRegex(pattern, s string) bool {
return regexp.MustCompile(pattern).MatchString(s)
}
// NewResultChan returns a channel that sends all results and then closes.
func NewResultChan(results ...*influxql.Result) <-chan *influxql.Result {
ch := make(chan *influxql.Result, len(results))
for _, r := range results {
ch <- r
}
close(ch)
return ch
}
// MustJWTToken returns a new JWT token and signed string or panics trying.
func MustJWTToken(username, secret string, expired bool) (*jwt.Token, string) {
token := jwt.New(jwt.GetSigningMethod("HS512"))

View File

@ -82,15 +82,27 @@ func BenchmarkLimitListener(b *testing.B) {
wg.Add(b.N)
l := httpd.LimitListener(&fakeListener{}, b.N)
errC := make(chan error)
for i := 0; i < b.N; i++ {
go func() {
defer wg.Done()
c, err := l.Accept()
if err != nil {
b.Fatal(err)
errC <- err
return
}
c.Close()
wg.Done()
}()
}
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
b.Error(err)
}
}
}

View File

@ -133,13 +133,6 @@ func (c *Client) AcquireLease(name string) (*Lease, error) {
return &l, nil
}
func (c *Client) data() *Data {
c.mu.RLock()
defer c.mu.RUnlock()
data := c.cacheData.Clone()
return data
}
// ClusterID returns the ID of the cluster it's connected to.
func (c *Client) ClusterID() uint64 {
c.mu.RLock()
@ -998,21 +991,6 @@ func (c *Client) WithLogger(log zap.Logger) {
c.logger = log.With(zap.String("service", "metaclient"))
}
func (c *Client) updateAuthCache() {
// copy cached user info for still-present users
newCache := make(map[string]authUser, len(c.authCache))
for _, userInfo := range c.cacheData.Users {
if cached, ok := c.authCache[userInfo.Name]; ok {
if cached.bhash == userInfo.Hash {
newCache[userInfo.Name] = cached
}
}
}
c.authCache = newCache
}
// snapshot saves the current meta data to disk.
func snapshot(path string, data *Data) error {
file := filepath.Join(path, metaFile)

View File

@ -1,9 +1,7 @@
package meta_test
import (
"encoding/json"
"io/ioutil"
"net"
"os"
"path"
"reflect"
@ -1166,33 +1164,3 @@ func testTempDir(skip int) string {
}
return dir
}
func mustParseStatement(s string) influxql.Statement {
stmt, err := influxql.ParseStatement(s)
if err != nil {
panic(err)
}
return stmt
}
func mustMarshalJSON(v interface{}) string {
b, e := json.Marshal(v)
if e != nil {
panic(e)
}
return string(b)
}
func freePort() string {
l, _ := net.Listen("tcp", "127.0.0.1:0")
defer l.Close()
return l.Addr().String()
}
func freePorts(i int) []string {
var ports []string
for j := 0; j < i; j++ {
ports = append(ports, freePort())
}
return ports
}

View File

@ -738,25 +738,6 @@ type NodeInfo struct {
TCPHost string
}
// clone returns a deep copy of ni.
func (ni NodeInfo) clone() NodeInfo { return ni }
// marshal serializes to a protobuf representation.
func (ni NodeInfo) marshal() *internal.NodeInfo {
pb := &internal.NodeInfo{}
pb.ID = proto.Uint64(ni.ID)
pb.Host = proto.String(ni.Host)
pb.TCPHost = proto.String(ni.TCPHost)
return pb
}
// unmarshal deserializes from a protobuf representation.
func (ni *NodeInfo) unmarshal(pb *internal.NodeInfo) {
ni.ID = pb.GetID()
ni.Host = pb.GetHost()
ni.TCPHost = pb.GetTCPHost()
}
// NodeInfos is a slice of NodeInfo used for sorting
type NodeInfos []NodeInfo

View File

@ -385,10 +385,8 @@ func (s *Service) handleTelnetConn(conn net.Conn) {
switch len(tsStr) {
case 10:
t = time.Unix(ts, 0)
break
case 13:
t = time.Unix(ts/1000, (ts%1000)*1000)
break
default:
atomic.AddInt64(&s.stats.TelnetBadTime, 1)
if s.LogPointErrors {

View File

@ -22,7 +22,6 @@ type Service struct {
DeleteShard(shardID uint64) error
}
enabled bool
checkInterval time.Duration
wg sync.WaitGroup
done chan struct{}

View File

@ -54,9 +54,6 @@ type Config struct {
ReadBuffer int `toml:"read-buffer"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
Precision string `toml:"precision"`
// Deprecated config option
udpPayloadSize int `toml:"udp-payload-size"`
}
// NewConfig returns a new instance of Config with defaults.

View File

@ -57,47 +57,8 @@ const (
keywordEnd
)
var tokens = [...]string{
ILLEGAL: "ILLEGAL",
EOF: "EOF",
WS: "WS",
IDENT: "IDENT",
NUMBER: "NUMBER",
DURATIONVAL: "DURATION",
STRING: "STRING",
BADSTRING: "BADSTRING",
TEMPLATEVAR: "TEMPLATEVAR",
COMMA: ",",
PERIOD: ".",
LPAREN: "(",
RPAREN: ")",
LBRACKET: "[",
RBRACKET: "]",
PIPE: "|",
SET: "SET",
USE: "USE",
QUERY: "QUERY",
INSERT: "INSERT",
EXEC: "EXEC",
DO: "DO",
GO: "GO",
WAIT: "WAIT",
INT: "INT",
FLOAT: "FLOAT",
STR: "STRING",
}
var eof = rune(1)
func check(e error) {
if e != nil {
panic(e)
}
}
func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' }
func isDigit(r rune) bool {
@ -128,12 +89,6 @@ func (s *Scanner) read() rune {
func (s *Scanner) unread() { _ = s.r.UnreadRune() }
func (s *Scanner) peek() rune {
ch := s.read()
s.unread()
return ch
}
// Scan moves to the next character in the file and returns a tokenized version as well as the literal
func (s *Scanner) Scan() (tok Token, lit string) {
ch := s.read()

View File

@ -2,6 +2,7 @@ package tcp_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
@ -41,6 +42,8 @@ func TestMux(t *testing.T) {
if !testing.Verbose() {
mux.Logger = log.New(ioutil.Discard, "", 0)
}
errC := make(chan error)
for i := uint8(0); i < n; i++ {
ln := mux.Listen(byte(i))
@ -58,7 +61,8 @@ func TestMux(t *testing.T) {
// doesn't match then expect close.
if len(msg) == 0 || msg[0] != byte(i) {
if err == nil || err.Error() != "network connection closed" {
t.Fatalf("unexpected error: %s", err)
errC <- fmt.Errorf("unexpected error: %s", err)
return
}
return
}
@ -67,14 +71,17 @@ func TestMux(t *testing.T) {
// then expect a connection and read the message.
var buf bytes.Buffer
if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil {
t.Fatal(err)
errC <- err
return
} else if !bytes.Equal(msg[1:], buf.Bytes()) {
t.Fatalf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes())
errC <- fmt.Errorf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes())
return
}
// Write response.
if _, err := conn.Write([]byte("OK")); err != nil {
t.Fatal(err)
errC <- err
return
}
}(i, ln)
}
@ -114,9 +121,21 @@ func TestMux(t *testing.T) {
// Close original TCP listener and wait for all goroutines to close.
tcpListener.Close()
wg.Wait()
return true
go func() {
wg.Wait()
close(errC)
}()
ok := true
for err := range errC {
if err != nil {
ok = false
t.Error(err)
}
}
return ok
}, nil); err != nil {
t.Error(err)
}

View File

@ -85,9 +85,11 @@ func TestCacheRace(t *testing.T) {
c.Values(s)
}(s)
}
errC := make(chan error)
wg.Add(1)
go func() {
wg.Done()
defer wg.Done()
<-ch
s, err := c.Snapshot()
if err == tsm1.ErrSnapshotInProgress {
@ -95,13 +97,26 @@ func TestCacheRace(t *testing.T) {
}
if err != nil {
t.Fatalf("failed to snapshot cache: %v", err)
errC <- fmt.Errorf("failed to snapshot cache: %v", err)
return
}
s.Deduplicate()
c.ClearSnapshot(true)
}()
close(ch)
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
t.Error(err)
}
}
}
func TestCacheRace2Compacters(t *testing.T) {
@ -138,10 +153,11 @@ func TestCacheRace2Compacters(t *testing.T) {
fileCounter := 0
mapFiles := map[int]bool{}
mu := sync.Mutex{}
errC := make(chan error)
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
wg.Done()
defer wg.Done()
<-ch
s, err := c.Snapshot()
if err == tsm1.ErrSnapshotInProgress {
@ -149,7 +165,8 @@ func TestCacheRace2Compacters(t *testing.T) {
}
if err != nil {
t.Fatalf("failed to snapshot cache: %v", err)
errC <- fmt.Errorf("failed to snapshot cache: %v", err)
return
}
mu.Lock()
@ -166,7 +183,8 @@ func TestCacheRace2Compacters(t *testing.T) {
defer mu.Unlock()
for k, _ := range myFiles {
if _, ok := mapFiles[k]; !ok {
t.Fatalf("something else deleted one of my files")
errC <- fmt.Errorf("something else deleted one of my files")
return
} else {
delete(mapFiles, k)
}
@ -174,5 +192,15 @@ func TestCacheRace2Compacters(t *testing.T) {
}()
}
close(ch)
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
t.Error(err)
}
}
}

View File

@ -114,16 +114,6 @@ func (t *tsmGeneration) level() int {
return 4
}
func (t *tsmGeneration) lastModified() int64 {
var max int64
for _, f := range t.files {
if f.LastModified > max {
max = f.LastModified
}
}
return max
}
// count returns the number of files in the generation.
func (t *tsmGeneration) count() int {
return len(t.files)
@ -868,8 +858,6 @@ type tsmKeyIterator struct {
// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
pos []int
keys []string
// err is any error we received while iterating values.
err error

View File

@ -8,7 +8,6 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/engine/tsm1"
)
@ -2142,36 +2141,6 @@ func assertValueEqual(t *testing.T, a, b tsm1.Value) {
}
}
func assertEqual(t *testing.T, a tsm1.Value, b models.Point, field string) {
if got, exp := a.UnixNano(), b.UnixNano(); got != exp {
t.Fatalf("time mismatch: got %v, exp %v", got, exp)
}
fields, err := b.Fields()
if err != nil {
t.Fatal(err)
}
if got, exp := a.Value(), fields[field]; got != exp {
t.Fatalf("value mismatch: got %v, exp %v", got, exp)
}
}
func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader {
f := MustTempFile(dir)
w := tsm1.NewWALSegmentWriter(f)
for _, e := range entries {
if err := w.Write(mustMarshalEntry(e)); err != nil {
panic(fmt.Sprintf("write WAL entry: %v", err))
}
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
panic(fmt.Sprintf("seek WAL: %v", err))
}
return tsm1.NewWALSegmentReader(f)
}
func MustTSMWriter(dir string, gen int) (tsm1.TSMWriter, string) {
f := MustTempFile(dir)
oldName := f.Name()
@ -2238,14 +2207,6 @@ func MustOpenTSMReader(name string) *tsm1.TSMReader {
return r
}
type fakeWAL struct {
ClosedSegmentsFn func() ([]string, error)
}
func (w *fakeWAL) ClosedSegments() ([]string, error) {
return w.ClosedSegmentsFn()
}
type fakeFileStore struct {
PathsFn func() []tsm1.FileStat
lastModified time.Time

View File

@ -822,6 +822,7 @@ func benchmarkEngine_WritePoints_Parallel(b *testing.B, batchSize int) {
b.StartTimer()
var wg sync.WaitGroup
errC := make(chan error)
for i := 0; i < cpus; i++ {
wg.Add(1)
go func(i int) {
@ -829,11 +830,22 @@ func benchmarkEngine_WritePoints_Parallel(b *testing.B, batchSize int) {
from, to := i*batchSize, (i+1)*batchSize
err := e.WritePoints(pp[from:to])
if err != nil {
b.Fatal(err)
errC <- err
return
}
}(i)
}
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
b.Error(err)
}
}
}
}

View File

@ -2,7 +2,6 @@ package tsm1
import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
@ -137,7 +136,6 @@ type FileStore struct {
logger zap.Logger // Logger to be used for important messages
traceLogger zap.Logger // Logger to be used when trace-logging is on.
logOutput io.Writer // Writer to be logger and traceLogger if active.
traceLogging bool
stats *FileStoreStatistics

View File

@ -600,10 +600,6 @@ func BenchmarkIntegerEncoderPackedSimple(b *testing.B) {
}
}
type byteSetter interface {
SetBytes(b []byte)
}
func BenchmarkIntegerDecoderPackedSimple(b *testing.B) {
x := make([]int64, 1024)
enc := NewIntegerEncoder(1024)

View File

@ -5,7 +5,6 @@ package tsm1
import (
"os"
"syscall"
"unsafe"
)
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
@ -20,12 +19,3 @@ func mmap(f *os.File, offset int64, length int) ([]byte, error) {
func munmap(b []byte) (err error) {
return syscall.Munmap(b)
}
// From: github.com/boltdb/bolt/bolt_unix.go
func madvise(b []byte, advice int) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
if e1 != 0 {
err = e1
}
return
}

View File

@ -1,18 +1,8 @@
package tsm1
import (
"sync"
import "github.com/influxdata/influxdb/pkg/pool"
"github.com/influxdata/influxdb/pkg/pool"
)
var (
bufPool = pool.NewBytes(10)
float64ValuePool sync.Pool
integerValuePool sync.Pool
booleanValuePool sync.Pool
stringValuePool sync.Pool
)
var bufPool = pool.NewBytes(10)
// getBuf returns a buffer with length size from the buffer pool.
func getBuf(size int) []byte {
@ -23,121 +13,3 @@ func getBuf(size int) []byte {
func putBuf(buf []byte) {
bufPool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getFloat64Values(size int) []Value {
var buf []Value
x := float64ValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = FloatValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putFloat64Values(buf []Value) {
float64ValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getIntegerValues(size int) []Value {
var buf []Value
x := integerValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = IntegerValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putIntegerValues(buf []Value) {
integerValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getBooleanValues(size int) []Value {
var buf []Value
x := booleanValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = BooleanValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putStringValues(buf []Value) {
stringValuePool.Put(buf)
}
// getBuf returns a buffer with length size from the buffer pool.
func getStringValues(size int) []Value {
var buf []Value
x := stringValuePool.Get()
if x == nil {
buf = make([]Value, size)
} else {
buf = x.([]Value)
}
if cap(buf) < size {
return make([]Value, size)
}
for i, v := range buf {
if v == nil {
buf[i] = StringValue{}
}
}
return buf[:size]
}
// putBuf returns a buffer to the pool.
func putBooleanValues(buf []Value) {
booleanValuePool.Put(buf)
}
func putValue(buf []Value) {
if len(buf) > 0 {
switch buf[0].(type) {
case FloatValue:
putFloat64Values(buf)
case IntegerValue:
putIntegerValues(buf)
case BooleanValue:
putBooleanValues(buf)
case StringValue:
putStringValues(buf)
}
}
}

View File

@ -67,16 +67,27 @@ func benchmarkRingWrite(b *testing.B, r *ring, n int) {
for i := 0; i < b.N; i++ {
var wg sync.WaitGroup
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
errC := make(chan error)
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < n; j++ {
if err := r.write(fmt.Sprintf("cpu,host=server-%d value=1", j), Values{}); err != nil {
b.Fatal(err)
errC <- err
}
}
}()
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
b.Error(err)
}
}
}
}
}

View File

@ -3,6 +3,7 @@ package tsm1
import (
"bufio"
"encoding/binary"
"io"
"io/ioutil"
"math"
"os"
@ -153,7 +154,7 @@ func (t *Tombstoner) Walk(fn func(t Tombstone) error) error {
return t.readTombstoneV1(f, fn)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
return err
}
@ -250,7 +251,7 @@ func (t *Tombstoner) readTombstoneV1(f *os.File, fn func(t Tombstone) error) err
// format is binary.
func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) error {
// Skip header, already checked earlier
if _, err := f.Seek(v2headerSize, os.SEEK_SET); err != nil {
if _, err := f.Seek(v2headerSize, io.SeekStart); err != nil {
return err
}
n := int64(4)

View File

@ -43,12 +43,6 @@ const (
stringEntryType = 4
)
// SegmentInfo represents metadata about a segment.
type SegmentInfo struct {
name string
id int
}
// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains.
type WalEntryType byte

View File

@ -2,6 +2,7 @@ package tsm1_test
import (
"fmt"
"io"
"os"
"testing"
@ -36,7 +37,7 @@ func TestWALWriter_WritePoints_Single(t *testing.T) {
fatal(t, "write points", err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -93,7 +94,7 @@ func TestWALWriter_WritePoints_LargeBatch(t *testing.T) {
fatal(t, "write points", err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -153,7 +154,7 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) {
}
// Seek back to the beinning of the file for reading
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -210,7 +211,7 @@ func TestWALWriter_WriteDelete_Single(t *testing.T) {
fatal(t, "write points", err)
}
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -268,7 +269,7 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) {
}
// Seek back to the beinning of the file for reading
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -359,7 +360,7 @@ func TestWALWriter_WritePointsDeleteRange_Multiple(t *testing.T) {
}
// Seek back to the beinning of the file for reading
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
@ -538,7 +539,7 @@ func TestWALWriter_Corrupt(t *testing.T) {
}
// Create the WAL segment reader.
if _, err := f.Seek(0, os.SEEK_SET); err != nil {
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
r := tsm1.NewWALSegmentReader(f)
@ -699,7 +700,7 @@ func BenchmarkWALSegmentReader(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
f.Seek(0, os.SEEK_SET)
f.Seek(0, io.SeekStart)
b.StartTimer()
for r.Next() {

View File

@ -3,6 +3,7 @@ package tsm1_test
import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"os"
"testing"
@ -86,7 +87,7 @@ func TestTSMWriter_Write_Single(t *testing.T) {
t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber)
}
if _, err := fd.Seek(0, os.SEEK_SET); err != nil {
if _, err := fd.Seek(0, io.SeekStart); err != nil {
t.Fatalf("unexpected error seeking: %v", err)
}
@ -546,7 +547,7 @@ func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber)
}
if _, err := fd.Seek(0, os.SEEK_SET); err != nil {
if _, err := fd.Seek(0, io.SeekStart); err != nil {
t.Fatalf("error seeking: %v", err)
}

View File

@ -1243,63 +1243,6 @@ func (m *Measurement) walkWhereForSeriesIds(expr influxql.Expr) (SeriesIDs, Filt
}
}
// expandExpr returns a list of expressions expanded by all possible tag combinations.
func (m *Measurement) expandExpr(expr influxql.Expr) []tagSetExpr {
// Retrieve list of unique values for each tag.
valuesByTagKey := m.uniqueTagValues(expr)
// Convert keys to slices.
keys := make([]string, 0, len(valuesByTagKey))
for key := range valuesByTagKey {
keys = append(keys, key)
}
sort.Strings(keys)
// Order uniques by key.
uniques := make([][]string, len(keys))
for i, key := range keys {
uniques[i] = valuesByTagKey[key]
}
// Reduce a condition for each combination of tag values.
return expandExprWithValues(expr, keys, []tagExpr{}, uniques, 0)
}
func expandExprWithValues(expr influxql.Expr, keys []string, tagExprs []tagExpr, uniques [][]string, index int) []tagSetExpr {
// If we have no more keys left then execute the reduction and return.
if index == len(keys) {
// Create a map of tag key/values.
m := make(map[string]*string, len(keys))
for i, key := range keys {
if tagExprs[i].op == influxql.EQ {
m[key] = &tagExprs[i].values[0]
} else {
m[key] = nil
}
}
// TODO: Rewrite full expressions instead of VarRef replacement.
// Reduce using the current tag key/value set.
// Ignore it if reduces down to "false".
e := influxql.Reduce(expr, &tagValuer{tags: m})
if e, ok := e.(*influxql.BooleanLiteral); ok && !e.Val {
return nil
}
return []tagSetExpr{{values: copyTagExprs(tagExprs), expr: e}}
}
// Otherwise expand for each possible equality value of the key.
var exprs []tagSetExpr
for _, v := range uniques[index] {
exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], []string{v}, influxql.EQ}), uniques, index+1)...)
}
exprs = append(exprs, expandExprWithValues(expr, keys, append(tagExprs, tagExpr{keys[index], uniques[index], influxql.NEQ}), uniques, index+1)...)
return exprs
}
// SeriesIDsAllOrByExpr walks an expressions for matching series IDs
// or, if no expression is given, returns all series IDs for the measurement.
func (m *Measurement) SeriesIDsAllOrByExpr(expr influxql.Expr) (SeriesIDs, error) {
@ -1415,91 +1358,6 @@ func (m *Measurement) tagKeysByFilter(op influxql.Token, val string, regex *rege
return ss
}
// tagValuer is used during expression expansion to evaluate all sets of tag values.
type tagValuer struct {
tags map[string]*string
}
// Value returns the string value of a tag and true if it's listed in the tagset.
func (v *tagValuer) Value(name string) (interface{}, bool) {
if value, ok := v.tags[name]; ok {
if value == nil {
return nil, true
}
return *value, true
}
return nil, false
}
// tagSetExpr represents a set of tag keys/values and associated expression.
type tagSetExpr struct {
values []tagExpr
expr influxql.Expr
}
// tagExpr represents one or more values assigned to a given tag.
type tagExpr struct {
key string
values []string
op influxql.Token // EQ or NEQ
}
func copyTagExprs(a []tagExpr) []tagExpr {
other := make([]tagExpr, len(a))
copy(other, a)
return other
}
// uniqueTagValues returns a list of unique tag values used in an expression.
func (m *Measurement) uniqueTagValues(expr influxql.Expr) map[string][]string {
// Track unique value per tag.
tags := make(map[string]map[string]struct{})
// Find all tag values referenced in the expression.
influxql.WalkFunc(expr, func(n influxql.Node) {
switch n := n.(type) {
case *influxql.BinaryExpr:
// Ignore operators that are not equality.
if n.Op != influxql.EQ {
return
}
// Extract ref and string literal.
var key, value string
switch lhs := n.LHS.(type) {
case *influxql.VarRef:
if rhs, ok := n.RHS.(*influxql.StringLiteral); ok {
key, value = lhs.Val, rhs.Val
}
case *influxql.StringLiteral:
if rhs, ok := n.RHS.(*influxql.VarRef); ok {
key, value = rhs.Val, lhs.Val
}
}
if key == "" {
return
}
// Add value to set.
if tags[key] == nil {
tags[key] = make(map[string]struct{})
}
tags[key][value] = struct{}{}
}
})
// Convert to map of slices.
out := make(map[string][]string)
for k, values := range tags {
out[k] = make([]string, 0, len(values))
for v := range values {
out[k] = append(out[k], v)
}
sort.Strings(out[k])
}
return out
}
// Measurements represents a list of *Measurement.
type Measurements []*Measurement
@ -1988,39 +1846,6 @@ func (m *Measurement) FieldNames() []string {
return a
}
func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids SeriesIDs) map[string]stringSet {
// If no tag keys were passed, get all tag keys for the measurement.
if len(tagKeys) == 0 {
for k := range m.seriesByTagKeyValue {
tagKeys = append(tagKeys, k)
}
}
// Mapping between tag keys to all existing tag values.
tagValues := make(map[string]stringSet, 0)
// Iterate all series to collect tag values.
for _, id := range ids {
s, ok := m.seriesByID[id]
if !ok {
continue
}
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, tagKey := range tagKeys {
if tagVal := s.Tags.GetString(tagKey); tagVal != "" {
if _, ok = tagValues[tagKey]; !ok {
tagValues[tagKey] = newStringSet()
}
tagValues[tagKey].add(tagVal)
}
}
}
return tagValues
}
// stringSet represents a set of strings.
type stringSet map[string]struct{}
@ -2036,12 +1861,6 @@ func (s stringSet) add(ss ...string) {
}
}
// contains returns whether the set contains the given string.
func (s stringSet) contains(ss string) bool {
_, ok := s[ss]
return ok
}
// list returns the current elements in the set, in sorted order.
func (s stringSet) list() []string {
l := make([]string, 0, len(s))

View File

@ -378,16 +378,3 @@ func genStrList(prefix string, n int) []string {
}
return lst
}
// MustParseExpr parses an expression string and returns its AST representation.
func MustParseExpr(s string) influxql.Expr {
expr, err := influxql.ParseExpr(s)
if err != nil {
panic(err.Error())
}
return expr
}
func strref(s string) *string {
return &s
}

View File

@ -22,9 +22,6 @@ import (
"go.uber.org/zap"
)
// DefaultPrecision is the precision used by the MustWritePointsString() function.
const DefaultPrecision = "s"
func TestShardWriteAndIndex(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
@ -405,11 +402,13 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
var wg sync.WaitGroup
wg.Add(2)
errC := make(chan error)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
errC <- err
return
}
_ = sh.WritePoints(points[:500])
@ -424,7 +423,8 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
defer wg.Done()
for i := 0; i < 50; i++ {
if err := sh.DeleteMeasurement("cpu", []string{"cpu,host=server"}); err != nil {
t.Fatalf(err.Error())
errC <- err
return
}
_ = sh.WritePoints(points[500:])
@ -434,7 +434,16 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
}
}()
wg.Wait()
go func() {
wg.Wait()
close(errC)
}()
for err := range errC {
if err != nil {
t.Error(err)
}
}
}
// Ensures that when a shard is closed, it removes any series meta-data
@ -997,15 +1006,6 @@ func NewShard() *Shard {
}
}
// MustOpenShard returns a new open shard. Panic on error.
func MustOpenShard() *Shard {
sh := NewShard()
if err := sh.Open(); err != nil {
panic(err)
}
return sh
}
// Close closes the shard and removes all underlying data.
func (sh *Shard) Close() error {
defer os.RemoveAll(sh.path)

View File

@ -976,35 +976,6 @@ func (a KeyValues) Less(i, j int) bool {
return ki < kj
}
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
// to the number of total Values returned, since each Value represents a unique series.
func (e *Store) filterShowSeriesResult(limit, offset int, rows models.Rows) models.Rows {
var filteredSeries models.Rows
seriesCount := 0
for _, r := range rows {
var currentSeries [][]interface{}
// filter the values
for _, v := range r.Values {
if seriesCount >= offset && seriesCount-offset < limit {
currentSeries = append(currentSeries, v)
}
seriesCount++
}
// only add the row back in if there are some values in it
if len(currentSeries) > 0 {
r.Values = currentSeries
filteredSeries = append(filteredSeries, r)
if seriesCount > limit+offset {
return filteredSeries
}
}
}
return filteredSeries
}
// DecodeStorePath extracts the database and retention policy names
// from a given shard or WAL path.
func DecodeStorePath(shardOrWALPath string) (database, retentionPolicy string) {