single server integration test is working

pull/1490/head
Cory LaNou 2015-02-02 17:20:34 -07:00
parent 0e818f46d4
commit 72fb148660
2 changed files with 89 additions and 59 deletions

View File

@ -62,7 +62,9 @@ func (c *Client) Query(q Query) (*Results, error) {
defer resp.Body.Close()
var results Results
err = json.NewDecoder(resp.Body).Decode(&results)
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
return nil, err
}
@ -92,7 +94,10 @@ func (c *Client) Write(writes ...Write) (*Results, error) {
defer resp.Body.Close()
var results Results
err = json.NewDecoder(resp.Body).Decode(&results)
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
err = dec.Decode(&results)
if err != nil {
return nil, err
}
@ -115,7 +120,7 @@ func (c *Client) Ping() (time.Duration, string, error) {
// Result represents a resultset returned from a single statement.
type Result struct {
Rows []*influxql.Row
Rows []influxql.Row
Err error
}
@ -123,8 +128,8 @@ type Result struct {
func (r *Result) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Rows []*influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
Rows []influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
@ -139,11 +144,13 @@ func (r *Result) MarshalJSON() ([]byte, error) {
// UnmarshalJSON decodes the data into the Result struct
func (r *Result) UnmarshalJSON(b []byte) error {
var o struct {
Rows []*influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
Rows []influxql.Row `json:"rows,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
dec := json.NewDecoder(bytes.NewBuffer(b))
dec.UseNumber()
err := dec.Decode(&o)
if err != nil {
return err
}
@ -156,15 +163,15 @@ func (r *Result) UnmarshalJSON(b []byte) error {
// Results represents a list of statement results.
type Results struct {
Results []*Result
Results []Result
Err error
}
func (r Results) MarshalJSON() ([]byte, error) {
// Define a struct that outputs "error" as a string.
var o struct {
Results []*Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
Results []Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
}
// Copy fields to output struct.
@ -179,11 +186,13 @@ func (r Results) MarshalJSON() ([]byte, error) {
// UnmarshalJSON decodes the data into the Results struct
func (r *Results) UnmarshalJSON(b []byte) error {
var o struct {
Results []*Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
Results []Result `json:"results,omitempty"`
Err string `json:"error,omitempty"`
}
err := json.Unmarshal(b, &o)
dec := json.NewDecoder(bytes.NewBuffer(b))
dec.UseNumber()
err := dec.Decode(&o)
if err != nil {
return err
}
@ -248,7 +257,9 @@ func (p *Point) UnmarshalJSON(b []byte) error {
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
dec := json.NewDecoder(bytes.NewBuffer(b))
dec.UseNumber()
if err = dec.Decode(&epoch); err != nil {
return err
}
// Convert from epoch to time.Time, but only if Timestamp
@ -264,13 +275,15 @@ func (p *Point) UnmarshalJSON(b []byte) error {
p.Tags = epoch.Tags
p.Timestamp = Timestamp(ts)
p.Precision = epoch.Precision
p.Values = epoch.Values
p.Values = normalizeValues(epoch.Values)
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
dec := json.NewDecoder(bytes.NewBuffer(b))
dec.UseNumber()
if err := dec.Decode(&normal); err != nil {
return err
}
normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision)
@ -278,11 +291,30 @@ func (p *Point) UnmarshalJSON(b []byte) error {
p.Tags = normal.Tags
p.Timestamp = Timestamp(normal.Timestamp)
p.Precision = normal.Precision
p.Values = normal.Values
p.Values = normalizeValues(normal.Values)
return nil
}
// Remove any notion of json.Number
func normalizeValues(values map[string]interface{}) map[string]interface{} {
newValues := map[string]interface{}{}
for k, v := range values {
switch v := v.(type) {
case json.Number:
jv, e := v.Float64()
if e != nil {
panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e))
}
newValues[k] = jv
default:
newValues[k] = v
}
}
return newValues
}
// utility functions
func (c *Client) Addr() string {

View File

@ -10,19 +10,17 @@ import (
"os"
"path/filepath"
"reflect"
"strconv"
"testing"
"time"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
main "github.com/influxdb/influxdb/cmd/influxd"
)
func TestNewServer(t *testing.T) {
// Uncomment this to see the test fail when running for a second time in a row
//t.Skip()
var (
join = ""
version = "x.x"
@ -93,15 +91,15 @@ func TestNewServer(t *testing.T) {
t.Log("Creating database")
u := urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE DATABASE foo"}})
client := http.Client{Timeout: 100 * time.Millisecond}
httpClient := http.Client{Timeout: 100 * time.Millisecond}
resp, err := client.Get(u.String())
resp, err := httpClient.Get(u.String())
if err != nil {
t.Fatalf("Couldn't create database: %s", err)
}
defer resp.Body.Close()
var results influxdb.Results
var results client.Results
err = json.NewDecoder(resp.Body).Decode(&results)
if err != nil {
t.Fatalf("Couldn't decode results: %v", err)
@ -122,7 +120,7 @@ func TestNewServer(t *testing.T) {
// Query the database exists
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"SHOW DATABASES"}})
resp, err = client.Get(u.String())
resp, err = httpClient.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
@ -141,21 +139,18 @@ func TestNewServer(t *testing.T) {
t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("show databases failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
expectedResults := client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
influxql.Row{
Columns: []string{"name"},
Values: [][]interface{}{{"foo"}},
},
}},
},
}
rows := results.Results[0].Rows
if len(rows) != 1 {
t.Fatalf("show databases failed. Unexpected rows length. expected: %d, actual %d", 1, len(rows))
}
row := rows[0]
expectedRow := &influxql.Row{
Columns: []string{"name"},
Values: [][]interface{}{{"foo"}},
}
if !reflect.DeepEqual(row, expectedRow) {
t.Fatalf("show databases failed. Unexpected row. expected: %+v, actual %+v", expectedRow, row)
if !reflect.DeepEqual(results, expectedResults) {
t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results)
}
// Create a retention policy
@ -163,7 +158,7 @@ func TestNewServer(t *testing.T) {
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 1 DEFAULT"}})
resp, err = client.Get(u.String())
resp, err = httpClient.Get(u.String())
if err != nil {
t.Fatalf("Couldn't create retention policy: %s", err)
}
@ -196,7 +191,7 @@ func TestNewServer(t *testing.T) {
buf := []byte(fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano()))
t.Logf("Writing raw data: %s", string(buf))
resp, err = client.Post(u.String(), "application/json", bytes.NewReader(buf))
resp, err = httpClient.Post(u.String(), "application/json", bytes.NewReader(buf))
if err != nil {
t.Fatalf("Couldn't write data: %s", err)
}
@ -205,13 +200,14 @@ func TestNewServer(t *testing.T) {
}
// Need some time for server to get consensus and write data
// TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied
time.Sleep(100 * time.Millisecond)
// Query the data exists
t.Log("Query data")
u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}})
resp, err = client.Get(u.String())
resp, err = httpClient.Get(u.String())
if err != nil {
t.Fatalf("Couldn't query databases: %s", err)
}
@ -238,24 +234,26 @@ func TestNewServer(t *testing.T) {
t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode)
}
if len(results.Results) != 1 {
t.Fatalf("query databases failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results))
}
rows = results.Results[0].Rows
if len(rows) != 1 {
t.Fatalf("query databases failed. Unexpected rows length. expected: %d, actual %d", 1, len(rows))
}
row = rows[0]
expectedRow = &influxql.Row{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{now.UnixNano(), 100},
strNow := strconv.FormatInt(now.UnixNano(), 10)
expectedResults = client.Results{
Results: []client.Result{
{Rows: []influxql.Row{
{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{json.Number(strNow), json.Number("100")},
},
}}},
},
}
if !reflect.DeepEqual(row, expectedRow) {
t.Fatalf("query databases failed. Unexpected row. expected: %+v, actual %+v", expectedRow, row)
if !reflect.DeepEqual(results, expectedResults) {
t.Logf("Expected:\n")
t.Logf("%#v\n", expectedResults)
t.Logf("Actual:\n")
t.Logf("%#v\n", results)
t.Fatalf("query databases failed. Unexpected results.")
}
}