Merge branch 'master' of https://github.com/influxdb/influxdb into broker-truncation

pull/1801/head
Ben Johnson 2015-03-01 09:47:39 -07:00
commit 85be4e1f6a
31 changed files with 1010 additions and 350 deletions

View File

@ -1,11 +1,31 @@
## v0.9.0-rc5 [unreleased]
## v0.9.0-rc7 [Unreleased]
### Bugfixes
- [#1744](https://github.com/influxdb/influxdb/pull/1744): Allow retention policies to be modified without specifying replication factor. Thanks @kylezh
## v0.9.0-rc6 [2015-02-27]
### Bugfixes
- [#1780](https://github.com/influxdb/influxdb/pull/1780): Malformed identifiers get through the parser
- [#1775](https://github.com/influxdb/influxdb/pull/1775): Panic "index out of range" on some queries
- [#1744](https://github.com/influxdb/influxdb/pull/1744): Select shard groups which completely encompass time range. Thanks @kylezh.
## v0.9.0-rc5 [2015-02-27]
### Bugfixes
- [#1752](https://github.com/influxdb/influxdb/pull/1752): remove debug log output from collectd.
- [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits.
- [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761.
- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval
- [#1771](https://github.com/influxdb/influxdb/pull/1771): Make `SHOW SERIES` return IDs and support `LIMIT` and `OFFSET`
### Features
- [#1698](https://github.com/influxdb/influxdb/pull/1698): Wire up DROP MEASUREMENT
## v0.9.0-rc4 [2015-02-24]
### Bugfixes

View File

@ -1,6 +1,45 @@
Contributing to InfluxDB
========================
Bug reports
---------------
Before you file an issue, please search existing issues in case it has already been filed, or perhaps even fixed. If you file an issue, please include the following.
* Full details of your operating system (or distribution) e.g. 64-bit Ubuntu 14.04.
* The version of InfluxDB you are running
* Whether you installed it using a pre-built package, or built it from source.
* A small test case, if applicable, that demonstrates the issues.
Remember the golden rule of bug reports: **The easier you make it for us to reproduce the problem, the faster it will get fixed.**
If you have never written a bug report before, or if you want to brush up on your bug reporting skills, we recommend reading [Simon Tatham's essay "How to Report Bugs Effectively."](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html)
Test cases should be in the form of `curl` commands. For example:
```
# create database
curl -G http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
# create retention policy
curl -G http://localhost:8086/query --data-urlencode "q=CREATE RETENTION POLICY myrp ON mydb DURATION 365d REPLICATION 1 DEFAULT"
# write data
curl -d '{"database" : "mydb", "retentionPolicy" : "myrp", "points": [{"name":"cpu","tags":{"region":"useast","host":"server_1","service":"redis"},"fields":{"value":61}}]}' -H "Content-Type: application/json" http://localhost:8086/write
# Delete a Measurement
curl -G http://localhost:8086/query --data-urlencode 'db=mydb' --data-urlencode 'q=DROP MEASUREMENT cpu'
# Query the Measurement, it should return no data, but data comes back.
curl -G http://localhost:8086/query --data-urlencode 'db=mydb' --data-urlencode 'SELECT * from cpu'
```
If you don't include a clear test case like this, your issue may not be investigated, and may even be closed. If writing the data is too difficult, please zip up your data directory and include a link to it in your bug report.
Please note that issues are *not the place to file general questions* such as "how do I use collectd with InfluxDB?" Questions of this nature should be sent to the [Google Group](https://groups.google.com/forum/#!forum/influxdb), not filed as issues. Issues like this will be closed.
Feature requests
---------------
We really like to receive feature requests, as it helps us prioritize our work. Please be clear about your requirements, as incomplete feature requests may simply be closed if we don't understand what you would like to see added to InfluxDB.
Contributing to the source code
---------------
InfluxDB follows standard Go project structure. This means that all
your go development are done in $GOPATH/src. GOPATH can be any
directory under which InfluxDB and all it's dependencies will be
@ -117,6 +156,10 @@ To install go cover, run the following command:
go get golang.org/x/tools/cmd/cover
```
Submitting a pull request
------------
To submit a pull request you should fork the InfluxDB repository, and make your change on a feature branch of your fork. Then generate a pull request from your branch against *master* of the InfluxDB repository. Include in your pull request details of your change -- the why *and* the how -- as well as the testing your performed. There will usually be some back and forth as we finalize the change, but once that completes it may be merged.
Useful links
------------
- [Useful techniques in Go](http://arslan.io/ten-useful-techniques-in-go)

View File

@ -1,4 +1,4 @@
# InfluxDB [![Build Status](https://travis-ci.org/influxdb/influxdb.png?branch=master)](https://travis-ci.org/influxdb/influxdb)
# InfluxDB [![Circle CI](https://circleci.com/gh/influxdb/influxdb/tree/master.svg?style=svg)](https://circleci.com/gh/influxdb/influxdb/tree/master)
## An Open-Source, Distributed, Time Series Database

10
circle.yml Normal file
View File

@ -0,0 +1,10 @@
dependencies:
pre:
- go get -u golang.org/x/tools/cmd/vet;
test:
override:
# Put each test command on its own line.
- go test -timeout 60s -v ./...
- go tool vet .

View File

@ -221,12 +221,12 @@ func (c *CommandLine) SetAuth() {
}
func (c *CommandLine) use(cmd string) {
args := strings.Split(cmd, " ")
args := strings.Split(strings.TrimSpace(cmd), " ")
if len(args) != 2 {
fmt.Printf("Could not parse database name from %q.\n", cmd)
return
}
d := strings.TrimSpace(args[1])
d := args[1]
c.Database = d
fmt.Printf("Using database %s\n", d)
}

View File

@ -57,3 +57,21 @@ func TestParseCommand_Exit(t *testing.T) {
}
}
}
func TestParseCommand_Use(t *testing.T) {
c := main.CommandLine{}
tests := []struct {
cmd string
}{
{cmd: "use db"},
{cmd: " use db"},
{cmd: "use db "},
{cmd: "Use db"},
}
for _, test := range tests {
if !c.ParseCommand(test.cmd) {
t.Fatalf(`Command "use" failed for %q.`, test.cmd)
}
}
}

View File

@ -32,9 +32,10 @@ const (
// urlFor returns a URL with the path and query params correctly appended and set.
func urlFor(u *url.URL, path string, params url.Values) *url.URL {
u.Path = path
u.RawQuery = params.Encode()
return u
v, _ := url.Parse(u.String())
v.Path = path
v.RawQuery = params.Encode()
return v
}
// node represents a node under test, which is both a broker and data node.
@ -46,7 +47,7 @@ type node struct {
}
// cluster represents a multi-node cluster.
type cluster []node
type cluster []*node
// createBatch returns a JSON string, representing the request body for a batch write. The timestamp
// simply increases and the value is a random integer.
@ -88,7 +89,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
}
nodes := make([]node, 0)
nodes := make([]*node, 0)
tmpDir := os.TempDir()
tmpBrokerDir := filepath.Join(tmpDir, "broker-integration-test")
@ -117,7 +118,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
if s == nil {
t.Fatalf("Test %s: failed to create leader data node on port %d", testName, basePort)
}
nodes = append(nodes, node{
nodes = append(nodes, &node{
broker: b,
server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(basePort)},
@ -140,7 +141,7 @@ func createCombinedNodeCluster(t *testing.T, testName string, nNodes, basePort i
t.Fatalf("Test %s: failed to create following data node on port %d", testName, basePort)
}
nodes = append(nodes, node{
nodes = append(nodes, &node{
broker: b,
server: s,
url: &url.URL{Scheme: "http", Host: "localhost:" + strconv.Itoa(nextPort)},
@ -155,7 +156,7 @@ func createDatabase(t *testing.T, testName string, nodes cluster, database strin
t.Logf("Test: %s: creating database %s", testName, database)
serverURL := nodes[0].url
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
u := urlFor(serverURL, "query", url.Values{"q": []string{"CREATE DATABASE " + database}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't create database: %s", err)
@ -206,7 +207,7 @@ func createDatabase(t *testing.T, testName string, nodes cluster, database strin
{Series: []influxql.Row{
{
Columns: []string{"name"},
Values: [][]interface{}{{"foo"}},
Values: [][]interface{}{{database}},
},
}},
},
@ -220,7 +221,7 @@ func createDatabase(t *testing.T, testName string, nodes cluster, database strin
func createRetentionPolicy(t *testing.T, testName string, nodes cluster, database, retention string) {
t.Log("Creating retention policy")
serverURL := nodes[0].url
replication := fmt.Sprintf("CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION %d DEFAULT", len(nodes))
replication := fmt.Sprintf("CREATE RETENTION POLICY %s ON %s DURATION 1h REPLICATION %d DEFAULT", retention, database, len(nodes))
u := urlFor(serverURL, "query", url.Values{"q": []string{replication}})
resp, err := http.Get(u.String())
@ -279,7 +280,7 @@ func simpleQuery(t *testing.T, testName string, nodes cluster, query string, exp
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
u := urlFor(n.url, "query", url.Values{"q": []string{query}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
@ -359,7 +360,7 @@ func simpleCountQuery(t *testing.T, testName string, nodes cluster, query, field
// Query the data exists
for _, n := range nodes {
t.Logf("Test name %s: query data on node %s", testName, n.url)
u := urlFor(n.url, "query", url.Values{"q": []string{query}, "db": []string{"foo"}})
u := urlFor(n.url, "query", url.Values{"q": []string{query}})
resp, err := http.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
@ -567,7 +568,7 @@ func Test_Server3NodeLargeBatchIntegration(t *testing.T) {
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}
func Test_Server5NodeLargeBatchIntegration(t *testing.T) {
@ -583,7 +584,7 @@ func Test_Server5NodeLargeBatchIntegration(t *testing.T) {
createDatabase(t, testName, nodes, "foo")
createRetentionPolicy(t, testName, nodes, "foo", "bar")
write(t, testName, nodes, createBatch(batchSize, "foo", "bar", "cpu", map[string]string{"host": "server01"}))
simpleCountQuery(t, "single node large batch", nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
simpleCountQuery(t, testName, nodes, `select count(value) from "foo"."bar".cpu`, "value", batchSize)
}
func Test_ServerMultiLargeBatchIntegration(t *testing.T) {

View File

@ -1,105 +0,0 @@
// +build !windows,!nacl,!plan9
package main
import (
"errors"
"log/syslog"
"strings"
log "code.google.com/p/log4go"
)
type sysLogWriter chan *log.LogRecord
func (w sysLogWriter) LogWrite(rec *log.LogRecord) {
w <- rec
}
func (w sysLogWriter) Close() {
close(w)
}
func GetSysLogFacility(name string) (syslog.Priority, bool) {
switch strings.ToLower(name) {
case "syslog":
return syslog.LOG_SYSLOG, true
case "local0":
return syslog.LOG_LOCAL0, true
case "local1":
return syslog.LOG_LOCAL1, true
case "local2":
return syslog.LOG_LOCAL2, true
case "local3":
return syslog.LOG_LOCAL3, true
case "local4":
return syslog.LOG_LOCAL4, true
case "local5":
return syslog.LOG_LOCAL5, true
case "local6":
return syslog.LOG_LOCAL6, true
case "local7":
return syslog.LOG_LOCAL7, true
default:
return syslog.LOG_SYSLOG, false
}
}
func getWriter(writer *syslog.Writer, level string) func(string) error {
switch level {
case "DEBG", "TRAC", "FINE", "FNST":
return writer.Debug
case "INFO":
return writer.Info
case "WARN":
return writer.Warning
case "EROR":
return writer.Err
default:
return writer.Crit
}
}
func connectSyslogDaemon(priority syslog.Priority) (writer *syslog.Writer, err error) {
logTypes := []string{"unixgram", "unix"}
logPaths := []string{
"/dev/log", // unix socket for syslog on linux
"/var/run/syslog", // unix socket for syslog on osx
}
var raddr string
for _, network := range logTypes {
for _, path := range logPaths {
raddr = path
writer, err = syslog.Dial(network, raddr, priority, "influxdb")
if err != nil {
continue
} else {
return
}
}
}
if err != nil {
err = errors.New("cannot connect to Syslog Daemon")
}
return
}
func NewSysLogWriter(priority syslog.Priority) (w sysLogWriter, err error) {
writer, err := connectSyslogDaemon(priority)
if err != nil {
return
}
w = sysLogWriter(make(chan *log.LogRecord, log.LogBufferLength))
go func() {
defer func() {
if w != nil {
w.Close()
}
}()
for rec := range w {
m := log.FormatLogRecord("(%S) %M", rec)
getWriter(writer, rec.Level.String())(m)
}
}()
return
}

View File

@ -90,7 +90,6 @@ func (s *Server) serve(conn *net.UDPConn) {
log.Printf("Collectd ReadFromUDP error: %s", err)
continue
}
log.Printf("received %d bytes", n)
if n > 0 {
s.handleMessage(buffer[:n])
}
@ -102,7 +101,6 @@ func (s *Server) serve(conn *net.UDPConn) {
}
func (s *Server) handleMessage(buffer []byte) {
log.Printf("handling message")
packets, err := gollectd.Packets(buffer, s.typesdb)
if err != nil {
log.Printf("Collectd parse error: %s", err)

View File

@ -36,6 +36,7 @@ const (
// Measurement messages
createMeasurementsIfNotExistsMessageType = messaging.MessageType(0x60)
dropMeasurementMessageType = messaging.MessageType(0x61)
// Continuous Query messages
createContinuousQueryMessageType = messaging.MessageType(0x70)
@ -114,6 +115,11 @@ type setDefaultRetentionPolicyCommand struct {
Name string `json:"name"`
}
type dropMeasurementCommand struct {
Database string `json:"database"`
Name string `json:"name"`
}
type createMeasurementSubcommand struct {
Name string `json:"name"`
Tags []map[string]string `json:"tags"`
@ -121,36 +127,39 @@ type createMeasurementSubcommand struct {
}
type createMeasurementsIfNotExistsCommand struct {
Database string `json:"database"`
Measurements map[string]*createMeasurementSubcommand `json:"measurements"`
Database string `json:"database"`
Measurements []*createMeasurementSubcommand `json:"measurements"`
}
func newCreateMeasurementsIfNotExistsCommand(database string) *createMeasurementsIfNotExistsCommand {
return &createMeasurementsIfNotExistsCommand{
Database: database,
Measurements: make(map[string]*createMeasurementSubcommand),
Measurements: make([]*createMeasurementSubcommand, 0),
}
}
// addMeasurementIfNotExists adds the Measurement to the command, but only if not already present
// in the command.
func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) {
_, ok := c.Measurements[name]
if !ok {
c.Measurements[name] = &createMeasurementSubcommand{
Name: name,
Tags: make([]map[string]string, 0),
Fields: make([]*Field, 0),
func (c *createMeasurementsIfNotExistsCommand) addMeasurementIfNotExists(name string) *createMeasurementSubcommand {
for _, m := range c.Measurements {
if m.Name == name {
return m
}
}
m := &createMeasurementSubcommand{
Name: name,
Tags: make([]map[string]string, 0),
Fields: make([]*Field, 0),
}
c.Measurements = append(c.Measurements, m)
return m
}
// addSeriesIfNotExists adds the Series, identified by Measurement name and tag set, to
// the command, but only if not already present in the command.
func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement string, tags map[string]string) {
c.addMeasurementIfNotExists(measurement)
m := c.addMeasurementIfNotExists(measurement)
m := c.Measurements[measurement]
tagset := string(marshalTags(tags))
for _, t := range m.Tags {
if string(marshalTags(t)) == tagset {
@ -167,9 +176,8 @@ func (c *createMeasurementsIfNotExistsCommand) addSeriesIfNotExists(measurement
// addFieldIfNotExists adds the field to the command for the Measurement, but only if it is not already
// present. It will return an error if the field is present in the command, but is of a different type.
func (c *createMeasurementsIfNotExistsCommand) addFieldIfNotExists(measurement, name string, typ influxql.DataType) error {
c.addMeasurementIfNotExists(measurement)
m := c.addMeasurementIfNotExists(measurement)
m := c.Measurements[measurement]
for _, f := range m.Fields {
if f.Name == name {
if f.Type != typ {

View File

@ -221,7 +221,7 @@ func (m *Measurement) addSeries(s *Series) bool {
return true
}
// removeSeries will remove a series from the measurementIndex. Returns true if already removed
// dropSeries will remove a series from the measurementIndex. Returns true if already removed
func (m *Measurement) dropSeries(seriesID uint32) bool {
if _, ok := m.seriesByID[seriesID]; !ok {
return true
@ -930,7 +930,7 @@ func (a seriesIDs) intersect(other seriesIDs) seriesIDs {
var i, j int
ids := make([]uint32, 0, len(l))
for i < len(l) {
for i < len(l) && j < len(r) {
if l[i] == r[j] {
ids = append(ids, l[i])
i++
@ -1049,6 +1049,40 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup {
return nil
}
// dropMeasurement will remove a measurement from:
// In memory index.
// Series data from the shards.
func (db *database) dropMeasurement(name string) error {
if _, ok := db.measurements[name]; !ok {
return nil
}
// remove measurement from in memory index
delete(db.measurements, name)
// collect the series ids to remove
var ids []uint32
// remove series from in memory map
for id, series := range db.series {
if series.measurement.Name == name {
ids = append(ids, id)
delete(db.series, id)
}
}
// remove series data from shards
for _, rp := range db.policies {
for _, id := range ids {
if err := rp.dropSeries(id); err != nil {
return err
}
}
}
return nil
}
// dropSeries will delete all data with the seriesID
func (rp *RetentionPolicy) dropSeries(seriesID uint32) error {
for _, g := range rp.shardGroups {
@ -1150,7 +1184,7 @@ func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error {
// Remove shard data
for _, rp := range db.policies {
if err := rp.dropSeries(id); err != nil {
return err
return fmt.Errorf("database.retentionPolicies.dropSeries: %s", err)
}
}
}
@ -1387,6 +1421,7 @@ func (m *Measurement) tagValuesByKeyAndSeriesID(tagKeys []string, ids seriesIDs)
// Iterate the tag keys we're interested in and collect values
// from this series, if they exist.
for _, tagKey := range tagKeys {
tagKey = strings.Trim(tagKey, `"`)
if tagVal, ok := s.Tags[tagKey]; ok {
if _, ok = tagValues[tagKey]; !ok {
tagValues[tagKey] = newStringSet()

View File

@ -1652,20 +1652,20 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", ""}),
str2iface([]string{"server01", "uswest"}),
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{1, "server01", ""},
[]interface{}{2, "server01", "uswest"},
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server02", "useast"}),
str2iface([]string{"server03", "caeast"}),
[]interface{}{5, "server02", "useast"},
[]interface{}{6, "server03", "caeast"},
},
},
},
@ -1673,29 +1673,6 @@ func TestHandler_serveShowSeries(t *testing.T) {
},
},
},
// SHOW SERIES ... LIMIT
// {
// q: `SHOW SERIES LIMIT 1`,
// r: &influxdb.Results{
// Results: []*influxdb.Result{
// &influxdb.Result{
// Series: []*influxql.Row{
// &influxql.Row{
// Name: "cpu",
// Columns: []string{"host", "region"},
// Values: [][]interface{}{
// str2iface([]string{"server01", ""}),
// str2iface([]string{"server01", "uswest"}),
// str2iface([]string{"server01", "useast"}),
// str2iface([]string{"server02", "useast"}),
// },
// },
// },
// },
// },
// },
// },
// SHOW SERIES FROM
{
q: `SHOW SERIES FROM cpu`,
r: &influxdb.Results{
@ -1704,12 +1681,12 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", ""}),
str2iface([]string{"server01", "uswest"}),
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{1, "server01", ""},
[]interface{}{2, "server01", "uswest"},
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
},
@ -1726,9 +1703,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", "uswest"}),
[]interface{}{2, "server01", "uswest"},
},
},
},
@ -1746,9 +1723,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server03", "caeast"}),
[]interface{}{6, "server03", "caeast"},
},
},
},
@ -1766,9 +1743,9 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "gpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server03", "caeast"}),
[]interface{}{6, "server03", "caeast"},
},
},
},
@ -1786,10 +1763,10 @@ func TestHandler_serveShowSeries(t *testing.T) {
Series: []*influxql.Row{
{
Name: "cpu",
Columns: []string{"host", "region"},
Columns: []string{"id", "host", "region"},
Values: [][]interface{}{
str2iface([]string{"server01", "useast"}),
str2iface([]string{"server02", "useast"}),
[]interface{}{3, "server01", "useast"},
[]interface{}{4, "server02", "useast"},
},
},
},
@ -1819,7 +1796,7 @@ func TestHandler_serveShowSeries(t *testing.T) {
if !reflect.DeepEqual(tt.err, errstring(r.Err)) {
t.Logf("query #%d: %s", i, tt.q)
t.Errorf("%d. %s: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.q, tt.err, r.Err)
} else if tt.err == "" && !reflect.DeepEqual(tt.r, r) {
} else if tt.err == "" && mustMarshalJSON(tt.r) != body {
t.Logf("query #%d: %s", i, tt.q)
t.Logf("exp = %s", mustMarshalJSON(tt.r))
t.Logf("got = %s", body)
@ -2066,6 +2043,27 @@ func TestHandler_serveShowTagValues(t *testing.T) {
},
},
},
// SHOW TAG VALUES WITH KEY = "..."
{
q: `SHOW TAG VALUES WITH KEY = "host"`,
r: &influxdb.Results{
Results: []*influxdb.Result{
{
Series: []*influxql.Row{
{
Name: "hostTagValues",
Columns: []string{"host"},
Values: [][]interface{}{
str2iface([]string{"server01"}),
str2iface([]string{"server02"}),
str2iface([]string{"server03"}),
},
},
},
},
},
},
},
// SHOW TAG VALUES FROM ...
{
q: `SHOW TAG VALUES FROM cpu WITH KEY = host`,

View File

@ -610,9 +610,9 @@ unary_expr = "(" expr ")" | var_ref | time_lit | string_lit |
```
decimals = decimal_digit { decimal_digit } .
dimenson = expr .
dimension = expr .
dimensons = dimenson { "," dimenson } .
dimensions = dimension { "," dimension } .
field = expr [ alias ] .

View File

@ -65,6 +65,7 @@ func (*CreateUserStatement) node() {}
func (*DeleteStatement) node() {}
func (*DropContinuousQueryStatement) node() {}
func (*DropDatabaseStatement) node() {}
func (*DropMeasurementStatement) node() {}
func (*DropRetentionPolicyStatement) node() {}
func (*DropSeriesStatement) node() {}
func (*DropUserStatement) node() {}
@ -154,6 +155,7 @@ func (*CreateUserStatement) stmt() {}
func (*DeleteStatement) stmt() {}
func (*DropContinuousQueryStatement) stmt() {}
func (*DropDatabaseStatement) stmt() {}
func (*DropMeasurementStatement) stmt() {}
func (*DropRetentionPolicyStatement) stmt() {}
func (*DropSeriesStatement) stmt() {}
func (*DropUserStatement) stmt() {}
@ -1198,6 +1200,25 @@ func (s *ShowMeasurementsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// DropMeasurmentStatement represents a command to drop a measurement.
type DropMeasurementStatement struct {
// Name of the measurement to be dropped.
Name string
}
// String returns a string representation of the drop measurement statement.
func (s *DropMeasurementStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("DROP MEASUREMENT ")
_, _ = buf.WriteString(s.Name)
return buf.String()
}
// RequiredPrivileges returns the privilege(s) required to execute a DropMeasurementStatement
func (s *DropMeasurementStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// ShowRetentionPoliciesStatement represents a command for listing retention policies.
type ShowRetentionPoliciesStatement struct {
// Name of the database to list policies for.

View File

@ -69,7 +69,7 @@ func NewPlanner(db DB) *Planner {
// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
now := p.Now()
now := p.Now().UTC()
// Clone the statement to be planned.
// Replace instances of "now()" with the current time.
@ -698,7 +698,9 @@ func MapMean(itr Iterator, e *Emitter, tmin int64) {
out.Count++
out.Sum += v.(float64)
}
e.Emit(Key{tmin, itr.Tags()}, out)
if out.Count > 0 {
e.Emit(Key{tmin, itr.Tags()}, out)
}
}
type meanMapOutput struct {
@ -714,7 +716,9 @@ func ReduceMean(key Key, values []interface{}, e *Emitter) {
out.Count += val.Count
out.Sum += val.Sum
}
e.Emit(key, out.Sum/float64(out.Count))
if out.Count > 0 {
e.Emit(key, out.Sum/float64(out.Count))
}
}
// MapMin collects the values to pass to the reducer

View File

@ -159,6 +159,8 @@ func (p *Parser) parseDropStatement() (Statement, error) {
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == SERIES {
return p.parseDropSeriesStatement()
} else if tok == MEASUREMENT {
return p.parseDropMeasurementStatement()
} else if tok == CONTINUOUS {
return p.parseDropContinuousQueryStatement()
} else if tok == DATABASE {
@ -172,7 +174,7 @@ func (p *Parser) parseDropStatement() (Statement, error) {
return p.parseDropUserStatement()
}
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS"}, pos)
return nil, newParseError(tokstr(tok, lit), []string{"SERIES", "CONTINUOUS", "MEASUREMENT"}, pos)
}
// parseAlterStatement parses a string and returns an alter statement.
@ -871,6 +873,21 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error)
return stmt, nil
}
// parseDropMeasurementStatement parses a string and returns a DropMeasurementStatement.
// This function assumes the "DROP MEASUREMENT" tokens have already been consumed.
func (p *Parser) parseDropMeasurementStatement() (*DropMeasurementStatement, error) {
stmt := &DropMeasurementStatement{}
// Parse the name of the measurement to be dropped.
lit, err := p.parseIdent()
if err != nil {
return nil, err
}
stmt.Name = lit
return stmt, nil
}
// parseDropSeriesStatement parses a string and returns a DropSeriesStatement.
// This function assumes the "DROP SERIES" tokens have already been consumed.
func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {

View File

@ -293,6 +293,19 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// SHOW TAG VALUES WITH KEY = "..."
{
s: `SHOW TAG VALUES WITH KEY = "host" WHERE region = 'uswest'`,
stmt: &influxql.ShowTagValuesStatement{
TagKeys: []string{`"host"`},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "region"},
RHS: &influxql.StringLiteral{Val: "uswest"},
},
},
},
// SHOW USERS
{
s: `SHOW USERS`,
@ -400,6 +413,39 @@ func TestParser_ParseStatement(t *testing.T) {
},
},
// CREATE CONTINUOUS QUERY for non-aggregate SELECT stmts
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT value INTO "policy1"."value" FROM myseries END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "value"}}},
Target: &influxql.Target{
Measurement: `"policy1"."value"`,
},
Source: &influxql.Measurement{Name: "myseries"},
},
},
},
// CREATE CONTINUOUS QUERY for non-aggregate SELECT stmts with multiple values
{
s: `CREATE CONTINUOUS QUERY myquery ON testdb BEGIN SELECT transmit_rx, transmit_tx INTO "policy1"."network" FROM myseries END`,
stmt: &influxql.CreateContinuousQueryStatement{
Name: "myquery",
Database: "testdb",
Source: &influxql.SelectStatement{
Fields: []*influxql.Field{{Expr: &influxql.Call{Name: "transmit_rx"}},
{Expr: &influxql.Call{Name: "transmit_tx"}}},
Target: &influxql.Target{
Measurement: `"policy1"."network"`,
},
Source: &influxql.Measurement{Name: "myseries"},
},
},
},
// CREATE DATABASE statement
{
s: `CREATE DATABASE testdb`,
@ -439,6 +485,12 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: &influxql.DropDatabaseStatement{Name: "testdb"},
},
// DROP MEASUREMENT statement
{
s: `DROP MEASUREMENT cpu`,
stmt: &influxql.DropMeasurementStatement{Name: "cpu"},
},
// DROP RETENTION POLICY
{
s: `DROP RETENTION POLICY "1h.cpu" ON mydb`,
@ -628,6 +680,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP MEASUREMENT`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
@ -637,7 +690,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, FIELD, MEASUREMENTS, RETENTION, SERIES, TAG, USERS at line 1, char 6`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 23`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS at line 1, char 6`},
{s: `CREATE CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 19`},
{s: `CREATE CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `DROP FOO`, err: `found FOO, expected SERIES, CONTINUOUS, MEASUREMENT at line 1, char 6`},
{s: `DROP DATABASE`, err: `found EOF, expected identifier at line 1, char 15`},
{s: `DROP RETENTION`, err: `found EOF, expected POLICY at line 1, char 16`},
{s: `DROP RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 23`},
@ -677,21 +732,19 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `ALTER`, err: `found EOF, expected RETENTION at line 1, char 7`},
{s: `ALTER RETENTION`, err: `found EOF, expected POLICY at line 1, char 17`},
{s: `ALTER RETENTION POLICY`, err: `found EOF, expected identifier at line 1, char 24`},
{s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`},
{s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `ALTER RETENTION POLICY policy1`, err: `found EOF, expected ON at line 1, char 32`}, {s: `ALTER RETENTION POLICY policy1 ON`, err: `found EOF, expected identifier at line 1, char 35`},
{s: `ALTER RETENTION POLICY policy1 ON testdb`, err: `found EOF, expected DURATION, RETENTION, DEFAULT at line 1, char 42`},
}
for i, tt := range tests {
stmt, err := influxql.NewParser(strings.NewReader(tt.s)).ParseStatement()
// if it's a CQ, there is a non-exported field that gets memoized during parsing that needs to be set
if _, ok := stmt.(*influxql.CreateContinuousQueryStatement); ok {
tt.stmt.(*influxql.CreateContinuousQueryStatement).Source.GroupByInterval()
}
if !reflect.DeepEqual(tt.err, errstring(err)) {
t.Errorf("%d. %q: error mismatch:\n exp=%s\n got=%s\n\n", i, tt.s, tt.err, err)
} else if st, ok := stmt.(*influxql.CreateContinuousQueryStatement); ok { // if it's a CQ, there is a non-exported field that gets memoized during parsing that needs to be set
if st != nil && st.Source != nil {
tt.stmt.(*influxql.CreateContinuousQueryStatement).Source.GroupByInterval()
}
} else if tt.err == "" && !reflect.DeepEqual(tt.stmt, stmt) {
t.Errorf("%d. %q\n\nstmt mismatch:\n\nexp=%#v\n\ngot=%#v\n\n", i, tt.s, tt.stmt, stmt)
}

View File

@ -137,7 +137,7 @@ func (s *Scanner) scanIdent() (tok Token, pos Pos, lit string) {
} else if ch == '.' {
buf.WriteRune(ch)
} else if ch == '"' {
if tok0, pos0, lit0 := s.scanString(); tok == BADSTRING || tok == BADESCAPE {
if tok0, pos0, lit0 := s.scanString(); tok0 == BADSTRING || tok0 == BADESCAPE {
return tok0, pos0, lit0
} else {
_ = buf.WriteByte('"')

View File

@ -60,6 +60,11 @@ func TestScanner_Scan(t *testing.T) {
{s: `foo`, tok: influxql.IDENT, lit: `foo`},
{s: `Zx12_3U_-`, tok: influxql.IDENT, lit: `Zx12_3U_`},
{s: `"foo".bar`, tok: influxql.IDENT, lit: `"foo".bar`},
{s: `"foo\\bar"`, tok: influxql.IDENT, lit: `"foo\bar"`},
{s: `"foo\bar"`, tok: influxql.BADESCAPE, lit: `\b`, pos: influxql.Pos{Line: 0, Char: 5}},
{s: `"foo\"bar\""`, tok: influxql.IDENT, lit: `"foo"bar""`},
{s: `test"`, tok: influxql.BADSTRING, lit: "", pos: influxql.Pos{Line: 0, Char: 3}},
{s: `"test`, tok: influxql.BADSTRING, lit: `test`},
{s: `true`, tok: influxql.TRUE},
{s: `false`, tok: influxql.FALSE},

View File

@ -121,6 +121,8 @@ var tokens = [...]string{
NUMBER: "NUMBER",
DURATION_VAL: "DURATION_VAL",
STRING: "STRING",
BADSTRING: "BADSTRING",
BADESCAPE: "BADESCAPE",
TRUE: "TRUE",
FALSE: "FALSE",

View File

@ -5,6 +5,7 @@ package influxdb
import (
"reflect"
"testing"
"time"
"github.com/influxdb/influxdb/influxql"
)
@ -117,9 +118,12 @@ func TestCreateMeasurementsCommand(t *testing.T) {
// Add Series, same tags again.
c.addSeriesIfNotExists("bar", tags)
n = len(c.Measurements["bar"].Tags)
if n != 2 {
t.Fatalf("measurement has wrong number of tags, expected 2, got %d", n)
for _, m := range c.Measurements {
if m.Name == "bar" {
if len(m.Tags) != 2 {
t.Fatalf("measurement has wrong number of tags, expected 2, got %d", n)
}
}
}
// Add a field.
@ -140,9 +144,12 @@ func TestCreateMeasurementsCommand(t *testing.T) {
t.Fatal("error re-adding field \"value2\"")
}
n = len(c.Measurements["bar"].Fields)
if n != 2 {
t.Fatalf("wrong number of fields, expected 2, got %d", n)
for _, m := range c.Measurements {
if m.Name == "bar" {
if len(m.Fields) != 2 {
t.Fatalf("measurement has wrong number of fields, expected 2, got %d", n)
}
}
}
}
@ -177,6 +184,132 @@ func TestCreateMeasurementsCommand_Errors(t *testing.T) {
}
}
// Test comparing seriesIDs for equality.
func Test_seriesIDs_equals(t *testing.T) {
ids1 := seriesIDs{1, 2, 3}
ids2 := seriesIDs{1, 2, 3}
ids3 := seriesIDs{4, 5, 6}
if !ids1.equals(ids2) {
t.Fatal("expected ids1 == ids2")
} else if ids1.equals(ids3) {
t.Fatal("expected ids1 != ids3")
}
}
// Test intersecting sets of seriesIDs.
func Test_seriesIDs_intersect(t *testing.T) {
// Test swaping l & r, all branches of if-else, and exit loop when 'j < len(r)'
ids1 := seriesIDs{1, 3, 4, 5, 6}
ids2 := seriesIDs{1, 2, 3, 7}
exp := seriesIDs{1, 3}
got := ids1.intersect(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
// Test exit for loop when 'i < len(l)'
ids1 = seriesIDs{1}
ids2 = seriesIDs{1, 2}
exp = seriesIDs{1}
got = ids1.intersect(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
}
// Test union sets of seriesIDs.
func Test_seriesIDs_union(t *testing.T) {
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
ids1 := seriesIDs{1, 2, 3, 7}
ids2 := seriesIDs{1, 3, 4, 5, 6}
exp := seriesIDs{1, 2, 3, 4, 5, 6, 7}
got := ids1.union(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
// Test exit because of 'i < len(l)' and append remainder from right.
ids1 = seriesIDs{1}
ids2 = seriesIDs{1, 2}
exp = seriesIDs{1, 2}
got = ids1.union(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
}
// Test removing one set of seriesIDs from another.
func Test_seriesIDs_reject(t *testing.T) {
// Test all branches of if-else, exit loop because of 'j < len(r)', and append remainder from left.
ids1 := seriesIDs{1, 2, 3, 7}
ids2 := seriesIDs{1, 3, 4, 5, 6}
exp := seriesIDs{2, 7}
got := ids1.reject(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
// Test exit because of 'i < len(l)'.
ids1 = seriesIDs{1}
ids2 = seriesIDs{1, 2}
exp = seriesIDs{}
got = ids1.reject(ids2)
if !exp.equals(got) {
t.Fatalf("exp=%v, got=%v", exp, got)
}
}
// Test shard group selection.
func TestShardGroup_Contains(t *testing.T) {
// Make a shard group 1 hour in duration
g := newShardGroup()
g.StartTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
g.EndTime = g.StartTime.Add(time.Hour)
if !g.Contains(g.StartTime.Add(-time.Minute), g.EndTime) {
t.Fatal("shard group not selected when min before start time")
}
if !g.Contains(g.StartTime, g.EndTime.Add(time.Minute)) {
t.Fatal("shard group not selected when max after after end time")
}
if !g.Contains(g.StartTime.Add(-time.Minute), g.EndTime.Add(time.Minute)) {
t.Fatal("shard group not selected when min before start time and when max after end time")
}
if !g.Contains(g.StartTime.Add(time.Minute), g.EndTime.Add(-time.Minute)) {
t.Fatal("shard group not selected when min after start time and when max before end time")
}
if !g.Contains(g.StartTime, g.EndTime) {
t.Fatal("shard group not selected when min at start time and when max at end time")
}
if !g.Contains(g.StartTime, g.StartTime) {
t.Fatal("shard group not selected when min and max set to start time")
}
if !g.Contains(g.EndTime, g.EndTime) {
t.Fatal("shard group not selected when min and max set to end time")
}
if g.Contains(g.StartTime.Add(-10*time.Hour), g.EndTime.Add(-9*time.Hour)) {
t.Fatal("shard group selected when both min and max before shard times")
}
if g.Contains(g.StartTime.Add(24*time.Hour), g.EndTime.Add(25*time.Hour)) {
t.Fatal("shard group selected when both min and max after shard times")
}
}
// MustParseExpr parses an expression string and returns its AST representation.
func MustParseExpr(s string) influxql.Expr {
expr, err := influxql.ParseExpr(s)

View File

@ -44,6 +44,8 @@ func TestBroker_Close_ErrClosed(t *testing.T) {
// Ensure the broker can write messages to the appropriate topics.
func TestBroker_Publish(t *testing.T) {
//TODO fix and renable test. currently racy
t.Skip()
b := NewBroker(nil)
defer b.Close()

View File

@ -209,7 +209,13 @@ func (tx *metatx) dropDatabase(name string) error {
return tx.Bucket([]byte("Databases")).DeleteBucket([]byte(name))
}
// dropMeasurement removes measurement from the metastore.
func (tx *metatx) dropMeasurement(database, measurement string) error {
return tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).DeleteBucket([]byte(measurement))
}
// saveMeasurement persists a measurement to the metastore.
func (tx *metatx) saveMeasurement(database string, m *Measurement) error {
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Measurements"))
return b.Put([]byte(m.Name), mustMarshalJSON(m))

View File

@ -49,6 +49,11 @@ MAINTAINER=support@influxdb.com
VENDOR=Influxdb
DESCRIPTION="Distributed time-series database"
BINS=(
influxd
influx
)
###########################################################################
# Helper functions.
@ -134,7 +139,9 @@ do_build() {
cleanup_exit 1
fi
rm $GOPATH/bin/*
for b in ${BINS[*]}; do
rm -f $GOPATH/bin/$b
done
go get -u -f ./...
if [ $? -ne 0 ]; then
echo "WARNING: failed to 'go get' packages."
@ -207,12 +214,14 @@ make_dir_tree $TMP_WORK_DIR $VERSION
###########################################################################
# Copy the assets to the installation directories.
cp $GOPATH/bin/* $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION
if [ $? -ne 0 ]; then
echo "Failed to copy binaries to packaging directory -- aborting."
cleanup_exit 1
fi
echo "Binaries in $GOPATH/bin copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION"
for b in ${BINS[*]}; do
cp $GOPATH/bin/$b $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION
if [ $? -ne 0 ]; then
echo "Failed to copy binaries to packaging directory -- aborting."
cleanup_exit 1
fi
done
echo "${BINS[*]} copied to $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION"
cp $INITD_SCRIPT $TMP_WORK_DIR/$INSTALL_ROOT_DIR/versions/$VERSION/scripts
if [ $? -ne 0 ]; then

111
server.go
View File

@ -302,7 +302,7 @@ func (s *Server) EnforceRetentionPolicies() {
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if g.EndTime.Add(rp.Duration).Before(time.Now()) {
if g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil {
@ -1340,7 +1340,7 @@ func (s *Server) applyDropSeries(m *messaging.Message) error {
// Delete series from the database.
if err := database.dropSeries(c.SeriesByMeasurement); err != nil {
return fmt.Errorf("failed to remove series from index")
return fmt.Errorf("failed to remove series from index: %s", err)
}
return nil
})
@ -1606,6 +1606,45 @@ func (s *Server) applyCreateMeasurementsIfNotExists(m *messaging.Message) error
return nil
}
func (s *Server) DropMeasurement(database, name string) error {
c := &dropMeasurementCommand{Database: database, Name: name}
_, err := s.broadcast(dropMeasurementMessageType, c)
return err
}
func (s *Server) applyDropMeasurement(m *messaging.Message) error {
var c dropMeasurementCommand
mustUnmarshalJSON(m.Data, &c)
database := s.databases[c.Database]
if database == nil {
return ErrDatabaseNotFound
}
measurement := database.measurements[c.Name]
if measurement == nil {
return ErrMeasurementNotFound
}
err := s.meta.mustUpdate(m.Index, func(tx *metatx) error {
// Drop metastore data
if err := tx.dropMeasurement(c.Database, c.Name); err != nil {
return err
}
// Drop measurement from the database.
if err := database.dropMeasurement(c.Name); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}
// createShardGroupsIfNotExist walks the "points" and ensures that all required shards exist on the cluster.
func (s *Server) createShardGroupsIfNotExists(database, retentionPolicy string, points []Point) error {
for _, p := range points {
@ -1736,6 +1775,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
res = s.executeDropSeriesStatement(stmt, database, user)
case *influxql.ShowSeriesStatement:
res = s.executeShowSeriesStatement(stmt, database, user)
case *influxql.DropMeasurementStatement:
res = s.executeDropMeasurementStatement(stmt, database, user)
case *influxql.ShowMeasurementsStatement:
res = s.executeShowMeasurementsStatement(stmt, database, user)
case *influxql.ShowTagKeysStatement:
@ -1885,6 +1926,10 @@ func (s *Server) executeDropUserStatement(q *influxql.DropUserStatement, user *U
return &Result{Err: s.DeleteUser(q.Name)}
}
func (s *Server) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string, user *User) *Result {
return &Result{Err: s.DropMeasurement(database, stmt.Name)}
}
func (s *Server) executeDropSeriesStatement(stmt *influxql.DropSeriesStatement, database string, user *User) *Result {
s.mu.RLock()
@ -1955,11 +2000,6 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
return &Result{Err: err}
}
// // If OFFSET is past the end of the array, return empty results.
// if stmt.Offset > len(measurements)-1 {
// return &Result{}
// }
// Create result struct that will be populated and returned.
result := &Result{
Series: make(influxql.Rows, 0, len(measurements)),
@ -1994,7 +2034,8 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
// Loop through series IDs getting matching tag sets.
for _, id := range ids {
if s, ok := m.seriesByID[id]; ok {
values := make([]interface{}, 0, len(r.Columns))
values := make([]interface{}, 0, len(r.Columns)+1)
values = append(values, id)
for _, column := range r.Columns {
values = append(values, s.Tags[column])
}
@ -2003,14 +2044,49 @@ func (s *Server) executeShowSeriesStatement(stmt *influxql.ShowSeriesStatement,
r.Values = append(r.Values, values)
}
}
// make the id the first column
r.Columns = append([]string{"id"}, r.Columns...)
// Append the row to the result.
result.Series = append(result.Series, r)
}
if stmt.Limit > 0 || stmt.Offset > 0 {
result.Series = s.filterShowSeriesResult(stmt.Limit, stmt.Offset, result.Series)
}
return result
}
// filterShowSeriesResult will limit the number of series returned based on the limit and the offset.
// Unlike limit and offset on SELECT statements, the limit and offset don't apply to the number of Rows, but
// to the number of total Values returned, since each Value represents a unique series.
func (s *Server) filterShowSeriesResult(limit, offset int, rows influxql.Rows) influxql.Rows {
var filteredSeries influxql.Rows
seriesCount := 0
for _, r := range rows {
var currentSeries [][]interface{}
// filter the values
for _, v := range r.Values {
if seriesCount >= offset && seriesCount-offset < limit {
currentSeries = append(currentSeries, v)
}
seriesCount++
}
// only add the row back in if there are some values in it
if len(currentSeries) > 0 {
r.Values = currentSeries
filteredSeries = append(filteredSeries, r)
if seriesCount > limit+offset {
return filteredSeries
}
}
}
return filteredSeries
}
func (s *Server) executeShowMeasurementsStatement(stmt *influxql.ShowMeasurementsStatement, database string, user *User) *Result {
s.mu.RLock()
defer s.mu.RUnlock()
@ -2311,8 +2387,12 @@ func measurementsFromSourceOrDB(stmt influxql.Source, db *database) (Measurement
return nil, errors.New("identifiers in FROM clause must be measurement names")
}
} else {
// No measurements specified in FROM clause so get all measurements.
measurements = db.Measurements()
// No measurements specified in FROM clause so get all measurements that have series.
for _, m := range db.Measurements() {
if len(m.seriesIDs) > 0 {
measurements = append(measurements, m)
}
}
}
sort.Sort(measurements)
@ -2349,7 +2429,14 @@ func (s *Server) executeCreateRetentionPolicyStatement(q *influxql.CreateRetenti
func (s *Server) executeAlterRetentionPolicyStatement(stmt *influxql.AlterRetentionPolicyStatement, user *User) *Result {
rpu := &RetentionPolicyUpdate{
Duration: stmt.Duration,
ReplicaN: func() *uint32 { n := uint32(*stmt.Replication); return &n }(),
ReplicaN: func() *uint32 {
if stmt.Replication == nil {
return nil
} else {
n := uint32(*stmt.Replication)
return &n
}
}(),
}
// Update the retention policy.
@ -2619,6 +2706,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
err = s.applySetDefaultRetentionPolicy(m)
case createMeasurementsIfNotExistsMessageType:
err = s.applyCreateMeasurementsIfNotExists(m)
case dropMeasurementMessageType:
err = s.applyDropMeasurement(m)
case setPrivilegeMessageType:
err = s.applySetPrivilege(m)
case createContinuousQueryMessageType:

View File

@ -625,6 +625,24 @@ func TestServer_AlterRetentionPolicy(t *testing.T) {
} else if o.ReplicaN != *rp2.ReplicaN {
t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN)
}
// Test update duration only.
duration = time.Hour
results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1h`), "foo", nil)
if results.Error() != nil {
t.Fatalf("unexpected error: %s", results.Error())
}
// Verify results
if o, err := s.RetentionPolicy("foo", "bar"); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if o == nil {
t.Fatalf("retention policy not found")
} else if o.Duration != duration {
t.Fatalf("retention policy mismatch:\n\texp Duration = %s\n\tgot Duration = %s\n", duration, o.Duration)
} else if o.ReplicaN != *rp2.ReplicaN {
t.Fatalf("retention policy mismatch:\n\texp ReplicaN = %d\n\tgot ReplicaN = %d\n", rp2.ReplicaN, o.ReplicaN)
}
}
// Ensure the server can delete an existing retention policy.
@ -830,6 +848,227 @@ func TestServer_WriteSeries(t *testing.T) {
}
}
// Ensure the server can drop a measurement.
func TestServer_DropMeasurement(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
s.CreateUser("susy", "pass", false)
// Write series with one point to the database.
tags := map[string]string{"host": "serverA", "region": "uswest"}
index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(23.2)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
// Ensure measurement exists
results := s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Ensure series exists
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Drop measurement
results = s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT cpu`), "foo", nil)
if results.Error() != nil {
t.Fatalf("unexpected error: %s", results.Error())
}
results = s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure the server can handles drop measurement if none exists.
func TestServer_DropMeasurementNoneExists(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
s.CreateUser("susy", "pass", false)
// Drop measurement
results := s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil)
if results.Error().Error() != `measurement not found` {
t.Fatalf("unexpected error: %s", results.Error())
}
// Write series with one point to the database.
tags := map[string]string{"host": "serverA", "region": "uswest"}
index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(23.2)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
// Drop measurement after writing data to ensure we still get the same error
results = s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil)
if results.Error().Error() != `measurement not found` {
t.Fatalf("unexpected error: %s", results.Error())
}
}
// Ensure Drop measurement can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement memory with tags region=uswest host=serverB
// drop one of those measurements
// ensure that the dropped measurement is gone
// ensure that we can still query: show measurements
// ensure that we can still make various queries:
// select * from memory where region=uswest and host=serverb
// select * from memory where host=serverb
// select * from memory where region=uswest
func TestServer_DropMeasurementSeriesTagsPreserved(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
s.CreateUser("susy", "pass", false)
// Write series with one point to the database.
tags := map[string]string{"host": "serverA", "region": "uswest"}
index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(23.2)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
tags = map[string]string{"host": "serverB", "region": "uswest"}
index, err = s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:01Z"), Fields: map[string]interface{}{"value": float64(33.2)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
// Ensure measurement exists
results := s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"],["memory"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]},{"name":"memory","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Ensure we can query for memory with both tags
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM memory where region='uswest' and host='serverB'`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Drop measurement
results = s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT cpu`), "foo", nil)
if results.Error() != nil {
t.Fatalf("unexpected error: %s", results.Error())
}
// Ensure measurement exists
results = s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"measurements","columns":["name"],"values":[["memory"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu`), "foo", nil)
if res := results.Results[0]; res.Err.Error() != `measurement "foo"."raw"."cpu" does not exist.` {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
}
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM memory where host='serverB'`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM memory where region='uswest'`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM memory where region='uswest' and host='serverB'`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["time","value"],"values":[["2000-01-01T00:00:01Z",33.2]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure the server can drop a series.
func TestServer_DropSeries(t *testing.T) {
c := NewMessagingClient()
@ -849,13 +1088,13 @@ func TestServer_DropSeries(t *testing.T) {
t.Fatalf("sync error: %s", err)
}
// Ensure series exiss
// Ensure series exists
results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
@ -868,12 +1107,84 @@ func TestServer_DropSeries(t *testing.T) {
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":[]}]}` {
} else if s := mustMarshalJSON(res); s != `{}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure the server can drop a series from measurement when more than one shard exists.
func TestServer_DropSeriesFromMeasurement(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
s.CreateUser("susy", "pass", false)
// Write series with one point to the database.
tags := map[string]string{"host": "serverA", "region": "uswest"}
index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(23.2)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
tags = map[string]string{"host": "serverb", "region": "useast"}
index, err = s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: tags, Timestamp: mustParseTime("2000-01-02T00:00:00Z"), Fields: map[string]interface{}{"value": float64(23465432423)}}})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
// Drop series
results := s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM memory`), "foo", nil)
if results.Error() != nil {
t.Fatalf("unexpected error: %s", results.Error())
}
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure that when merging many series together and some of them have a different number of points than others
// in a group by interval the results are correct
func TestServer_MergeManySeries(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
for i := 1; i < 11; i++ {
for j := 1; j < 5+i%3; j++ {
tags := map[string]string{"host": fmt.Sprintf("server_%d", i)}
if index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: time.Unix(int64(j), int64(0)), Fields: map[string]interface{}{"value": float64(22)}}}); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
}
}
results := s.ExecuteQuery(MustParseQuery(`select count(value) from cpu group by time(1s)`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}
// Ensure Drop Series can:
@ -913,7 +1224,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverA","uswest"],["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","uswest"],[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
@ -927,7 +1238,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["host","region"],"values":[["serverB","uswest"]]}]}` {
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[2,"serverB","uswest"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
@ -1018,11 +1329,74 @@ func TestServer_ExecuteQuery(t *testing.T) {
t.Fatalf("unexpected row(0) during SUM AND: %s", s)
}
results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu WHERE region='uk' AND host='serverZ'`), "foo", nil)
// TODO re-enable. The following code is racy
//results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu WHERE region='uk' AND host='serverZ'`), "foo", nil)
//if res := results.Results[0]; res.Err != nil {
//t.Fatalf("unexpected error during SUM: %s", res.Err)
//} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","value","otherVal"],"values":[["2000-01-01T00:00:03Z",30,0],["2000-01-01T00:00:03Z",20,0]]}]}` {
//t.Fatalf("unexpected row(0) during SUM AND: %s", s)
//}
// Select that should return an empty result.
results = s.ExecuteQuery(MustParseQuery(`SELECT value FROM cpu WHERE time >= '3000-01-01 00:00:05'`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error during SUM: %s", res.Err)
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","value","otherVal"],"values":[["2000-01-01T00:00:03Z",30,0],["2000-01-01T00:00:03Z",20,0]]}]}` {
t.Fatalf("unexpected row(0) during SUM AND: %s", s)
t.Fatalf("unexpected error during simple SELECT: %s", res.Err)
} else if s := mustMarshalJSON(res); s != `{}` {
t.Fatalf("unexpected row(0) during simple SELECT: %s", s)
}
}
// Ensure the server respects limit and offset in show series queries
func TestServer_ShowSeriesLimitOffset(t *testing.T) {
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")
// Write series with one point to the database.
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(20)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverB"}, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Fields: map[string]interface{}{"value": float64(30)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west", "host": "serverC"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: map[string]string{"region": "us-west", "host": "serverB"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
// Select data from the server.
results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 3 OFFSET 1`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[2,"serverB","us-east"],[3,"serverC","us-west"]]},{"name":"memory","columns":["id","host","region"],"values":[[4,"serverB","us-west"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 4`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"memory","columns":["id","host","region"],"values":[[5,"serverA","us-east"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 0 {
t.Fatalf("unexpected row count: %d", len(res.Series))
}
// Select data from the server.
results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 20`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 2 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["id","host","region"],"values":[[1,"serverA","us-east"],[2,"serverB","us-east"],[3,"serverC","us-west"]]},{"name":"memory","columns":["id","host","region"],"values":[[4,"serverB","us-west"],[5,"serverA","us-east"]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}

View File

@ -45,6 +45,13 @@ func newShardGroup() *ShardGroup { return &ShardGroup{} }
// Duration returns the duration between the shard group's start and end time.
func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) }
// Contains return whether the shard group contains data for the time between min and max
func (g *ShardGroup) Contains(min, max time.Time) bool {
return timeBetweenInclusive(g.StartTime, min, max) ||
timeBetweenInclusive(g.EndTime, min, max) ||
(g.StartTime.Before(min) && g.EndTime.After(max))
}
// dropSeries will delete all data with the seriesID
func (g *ShardGroup) dropSeries(seriesID uint32) error {
for _, s := range g.Shards {
@ -161,7 +168,11 @@ func (s *Shard) dropSeries(seriesID uint32) error {
return nil
}
return s.store.Update(func(tx *bolt.Tx) error {
return tx.DeleteBucket(u32tob(seriesID))
err := tx.DeleteBucket(u32tob(seriesID))
if err != bolt.ErrBucketNotFound {
return err
}
return nil
})
}

View File

@ -8,6 +8,7 @@ echo "inserting data"
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-26T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-27T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2015-01-28T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
curl -d '{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "mem", "tags": {"host": "server01"},"timestamp": "2015-01-29T22:01:11.703Z","fields": {"value": 100}}]}' -H "Content-Type: application/json" http://localhost:8086/write
echo "querying data"
curl -G http://localhost:8086/query --data-urlencode "db=foo" --data-urlencode "q=SELECT sum(value) FROM \"foo\".\"bar\".cpu GROUP BY time(1h)"

View File

@ -1,4 +1,4 @@
#!/bin/sh -e
#!/bin/bash -e
# Calculate start time as unix timestamp.
@ -23,8 +23,8 @@ _timestamp_to_time_string () {
fi
}
# Starting the test at "Jan 01 00:00:00 EDT 2000"
TIME=`_date_to_unix_timestamp "Jan 01 00:00:00 EDT 2000"`
# Starting the test at "Jan 01 00:00:00 GMT 2000"
TIME=`_date_to_unix_timestamp "Jan 01 00:00:00 GMT 2000"`
# Set defaults.
@ -86,7 +86,7 @@ do
do
# Format the timestamp to ISO 8601.
let CURRTIME=TIME+j
TIMESTAMP=`date -j -f "%s" $CURRTIME +"%Y-%m-%dT%H:%M:%SZ"`
TIMESTAMP=`_timestamp_to_time_string $CURRTIME`
# Add comma separator.
if [ "$j" -ne "0" ]

27
tx.go
View File

@ -2,6 +2,7 @@ package influxdb
import (
"fmt"
"math"
"sort"
"sync"
"time"
@ -100,10 +101,20 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
return nil, ErrRetentionPolicyNotFound
}
// Find measurement.
m, err := tx.server.measurement(database, measurement)
if err != nil {
return nil, err
}
if m == nil {
return nil, ErrMeasurementNotFound
}
tx.measurement = m
// Find shard groups within time range.
var shardGroups []*ShardGroup
for _, group := range rp.shardGroups {
if timeBetweenInclusive(group.StartTime, tmin, tmax) || timeBetweenInclusive(group.EndTime, tmin, tmax) {
if group.Contains(tmin, tmax) {
shardGroups = append(shardGroups, group)
}
}
@ -117,16 +128,6 @@ func (tx *tx) CreateIterators(stmt *influxql.SelectStatement) ([]influxql.Iterat
return nil, err
}
// Find measurement.
m, err := tx.server.measurement(database, measurement)
if err != nil {
return nil, err
}
if m == nil {
return nil, ErrMeasurementNotFound
}
tx.measurement = m
// Find field.
fieldName := stmt.Fields[0].Expr.(*influxql.VarRef).Val
f := m.FieldByName(fieldName)
@ -309,10 +310,12 @@ func (i *shardIterator) Tags() string { return i.tags }
func (i *shardIterator) Next() (key int64, data []byte, value interface{}) {
min := -1
minKey := int64(math.MaxInt64)
for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
if kv.key != 0 && kv.key < i.tmax && kv.key < minKey {
min = ind
minKey = kv.key
}
}

96
util.go
View File

@ -1,96 +0,0 @@
package influxdb
import (
"fmt"
"code.google.com/p/log4go"
)
// TimePrecision represents a level of time precision.
type TimePrecision int
const (
// MicrosecondPrecision is 1/1,000,000 th of a second.
MicrosecondPrecision TimePrecision = iota
// MillisecondPrecision is 1/1,000 th of a second.
MillisecondPrecision
// SecondPrecision is 1 second precision.
SecondPrecision
)
func parseTimePrecision(s string) (TimePrecision, error) {
switch s {
case "u":
return MicrosecondPrecision, nil
case "m":
log4go.Warn("time_precision=m will be disabled in future release, use time_precision=ms instead")
fallthrough
case "ms":
return MillisecondPrecision, nil
case "s":
return SecondPrecision, nil
case "":
return MillisecondPrecision, nil
}
return 0, fmt.Errorf("Unknown time precision %s", s)
}
func hasDuplicates(ss []string) bool {
m := make(map[string]struct{}, len(ss))
for _, s := range ss {
if _, ok := m[s]; ok {
return true
}
m[s] = struct{}{}
}
return false
}
func removeField(fields []string, name string) []string {
index := -1
for idx, field := range fields {
if field == name {
index = idx
break
}
}
if index == -1 {
return fields
}
return append(fields[:index], fields[index+1:]...)
}
func removeTimestampFieldDefinition(fields []string) []string {
fields = removeField(fields, "time")
return removeField(fields, "sequence_number")
}
func mapKeyList(m interface{}) []string {
switch m.(type) {
case map[string]string:
return mapStrStrKeyList(m.(map[string]string))
case map[string]uint32:
return mapStrUint32KeyList(m.(map[string]uint32))
}
return nil
}
func mapStrStrKeyList(m map[string]string) []string {
l := make([]string, 0, len(m))
for k := range m {
l = append(l, k)
}
return l
}
func mapStrUint32KeyList(m map[string]uint32) []string {
l := make([]string, 0, len(m))
for k := range m {
l = append(l, k)
}
return l
}