address PR feedback

pull/5994/head
Cory LaNou 2016-03-14 10:51:12 -05:00 committed by Edd Robinson
parent ea37ed98e5
commit 1d2c1faa94
5 changed files with 163 additions and 43 deletions

View File

@ -2,14 +2,12 @@ package run
import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"strings"
"time"
"github.com/influxdata/influxdb"
@ -118,21 +116,11 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
}
}
node, err := influxdb.LoadNode(c.Meta.Dir)
_, err := influxdb.LoadNode(c.Meta.Dir)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
node = influxdb.NewNode(c.Meta.Dir)
}
// In 0.11 we removed MetaServers from node.json. To avoid confusion for
// existing users, force a re-save of the node.json file to remove that property
// if it happens to exist.
nodeContents, err := ioutil.ReadFile(filepath.Join(c.Meta.Dir, "node.json"))
if err == nil && strings.Contains(string(nodeContents), "MetaServers") {
node.Save()
}
// In 0.10.0 bind-address got moved to the top level. Check

View File

@ -495,6 +495,39 @@ func TestServer_Write_LineProtocol_Integer(t *testing.T) {
}
}
// Ensure the server returns a partial write response when some points fail to parse. Also validate that
// the successfully parsed points can be queried.
func TestServer_Write_LineProtocol_Partial(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 1*time.Hour)); err != nil {
t.Fatal(err)
}
now := now()
points := []string{
"cpu,host=server01 value=100 " + strconv.FormatInt(now.UnixNano(), 10),
"cpu,host=server01 value=NaN " + strconv.FormatInt(now.UnixNano(), 20),
"cpu,host=server01 value=NaN " + strconv.FormatInt(now.UnixNano(), 30),
}
if res, err := s.Write("db0", "rp0", strings.Join(points, "\n"), nil); err == nil {
t.Fatal("expected error. got nil", err)
} else if exp := ``; exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
} else if exp := "partial write"; !strings.Contains(err.Error(), exp) {
t.Fatalf("unexpected error: exp\nexp: %v\ngot: %v", exp, err)
}
// Verify the data was written.
if res, err := s.Query(`SELECT * FROM db0.rp0.cpu GROUP BY *`); err != nil {
t.Fatal(err)
} else if exp := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","tags":{"host":"server01"},"columns":["time","value"],"values":[["%s",100]]}]}]}`, now.Format(time.RFC3339Nano)); exp != res {
t.Fatalf("unexpected results\nexp: %s\ngot: %s\n", exp, res)
}
}
// Ensure the server can query with default databases (via param) and default retention policy
func TestServer_Query_DefaultDBAndRP(t *testing.T) {
t.Parallel()

View File

@ -118,6 +118,14 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace, JSONWriteEnab
"ping-head",
"HEAD", "/ping", true, true, h.servePing,
},
route{ // Ping w/ status
"status",
"GET", "/status", true, true, h.serveStatus,
},
route{ // Ping w/ status
"status-head",
"HEAD", "/status", true, true, h.serveStatus,
},
route{ // Tell data node to run CQs that should be run
"process_continuous_queries",
"POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
@ -561,6 +569,13 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// serveStatus has been depricated
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
h.Logger.Printf("WARNING: /status has been depricated. Use /ping instead.")
h.statMap.Add(statStatusRequest, 1)
w.WriteHeader(http.StatusNoContent)
}
// convertToEpoch converts result timestamps from time.Time to the specified epoch.
func convertToEpoch(r *influxql.Result, epoch string) {
divisor := int64(1)

View File

@ -340,6 +340,20 @@ func TestHandler_Version(t *testing.T) {
}
}
// Ensure the handler handles status requests correctly.
func TestHandler_Status(t *testing.T) {
h := NewHandler(false)
w := httptest.NewRecorder()
h.ServeHTTP(w, MustNewRequest("GET", "/status", nil))
if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
h.ServeHTTP(w, MustNewRequest("HEAD", "/status", nil))
if w.Code != http.StatusNoContent {
t.Fatalf("unexpected status: %d", w.Code)
}
}
// Ensure write endpoint can handle bad requests
func TestHandler_HandleBadRequestBody(t *testing.T) {
b := bytes.NewReader(make([]byte, 10))

View File

@ -97,7 +97,7 @@ func (c *Client) Open() error {
// If this is a brand new instance, persist to disk immediatly.
if c.cacheData.Index == 1 {
if err := c.Snapshot(); err != nil {
if err := snapshot(c.path, c.cacheData); err != nil {
return err
}
}
@ -207,7 +207,10 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
db := data.Database(name)
c.commit(data)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
@ -247,7 +250,10 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, rpi *RetentionPo
db := data.Database(name)
c.commit(data)
if err := c.commit(data); err != nil {
return nil, err
}
return db, nil
}
@ -262,7 +268,10 @@ func (c *Client) DropDatabase(name string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -290,7 +299,10 @@ func (c *Client) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo
return nil, err
}
c.commit(data)
if err := c.commit(data); err != nil {
return nil, err
}
return rp, nil
}
@ -319,7 +331,10 @@ func (c *Client) DropRetentionPolicy(database, name string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -334,7 +349,10 @@ func (c *Client) SetDefaultRetentionPolicy(database, name string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -349,7 +367,10 @@ func (c *Client) UpdateRetentionPolicy(database, name string, rpu *RetentionPoli
return err
}
defer c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -420,7 +441,10 @@ func (c *Client) CreateUser(name, password string, admin bool) (*UserInfo, error
u := data.User(name)
c.commit(data)
if err := c.commit(data); err != nil {
return nil, err
}
return u, nil
}
@ -442,7 +466,10 @@ func (c *Client) UpdateUser(name, password string) error {
delete(c.authCache, name)
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -456,7 +483,10 @@ func (c *Client) DropUser(name string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -470,7 +500,10 @@ func (c *Client) SetPrivilege(username, database string, p influxql.Privilege) e
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -484,7 +517,10 @@ func (c *Client) SetAdminPrivilege(username string, admin bool) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -651,7 +687,10 @@ func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time)
return nil, err
}
c.commit(data)
if err := c.commit(data); err != nil {
return nil, err
}
return sgi, nil
}
@ -686,7 +725,10 @@ func (c *Client) DeleteShardGroup(database, policy string, id uint64) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -722,7 +764,10 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error {
}
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -763,7 +808,10 @@ func (c *Client) CreateContinuousQuery(database, name, query string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -777,7 +825,10 @@ func (c *Client) DropContinuousQuery(database, name string) error {
return nil
}
defer c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -791,7 +842,10 @@ func (c *Client) CreateSubscription(database, rp, name, mode string, destination
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -805,7 +859,10 @@ func (c *Client) DropSubscription(database, rp, name string) error {
return err
}
c.commit(data)
if err := c.commit(data); err != nil {
return err
}
return nil
}
@ -818,7 +875,10 @@ func (c *Client) SetData(data *Data) error {
// increment the index to force the changed channel to fire
d := data.Clone()
d.Index++
c.commit(d)
if err := c.commit(d); err != nil {
return err
}
c.mu.Unlock()
@ -834,12 +894,22 @@ func (c *Client) WaitForDataChanged() chan struct{} {
}
// commit assumes it is under a full lock
func (c *Client) commit(data *Data) {
func (c *Client) commit(data *Data) error {
data.Index++
// try to write to disk before updating in memory
if err := snapshot(c.path, data); err != nil {
return err
}
// update in memory
c.cacheData = data
c.Snapshot()
// close channels to signal changes
close(c.changed)
c.changed = make(chan struct{})
return nil
}
func (c *Client) MarshalBinary() ([]byte, error) {
@ -869,9 +939,9 @@ func (c *Client) updateAuthCache() {
c.authCache = newCache
}
// Snapshot will save the current meta data to disk
func (c *Client) Snapshot() error {
file := filepath.Join(c.path, metaFile)
// snapshot will save the current meta data to disk
func snapshot(path string, data *Data) error {
file := filepath.Join(path, metaFile)
tmpFile := file + "tmp"
f, err := os.Create(tmpFile)
@ -880,14 +950,14 @@ func (c *Client) Snapshot() error {
}
defer f.Close()
var data []byte
if b, err := c.cacheData.MarshalBinary(); err != nil {
var d []byte
if b, err := data.MarshalBinary(); err != nil {
return err
} else {
data = b
d = b
}
if _, err := f.Write(data); err != nil {
if _, err := f.Write(d); err != nil {
return err
}