From 72fb1486603ba9e71202414a8bdb041167432963 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Mon, 2 Feb 2015 17:20:34 -0700 Subject: [PATCH] single server integration test is working --- client/influxdb.go | 68 +++++++++++----- cmd/influxd/server_single_integration_test.go | 80 +++++++++---------- 2 files changed, 89 insertions(+), 59 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index dc0c76f8e9..0ce20e5956 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -62,7 +62,9 @@ func (c *Client) Query(q Query) (*Results, error) { defer resp.Body.Close() var results Results - err = json.NewDecoder(resp.Body).Decode(&results) + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&results) if err != nil { return nil, err } @@ -92,7 +94,10 @@ func (c *Client) Write(writes ...Write) (*Results, error) { defer resp.Body.Close() var results Results - err = json.NewDecoder(resp.Body).Decode(&results) + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + err = dec.Decode(&results) + if err != nil { return nil, err } @@ -115,7 +120,7 @@ func (c *Client) Ping() (time.Duration, string, error) { // Result represents a resultset returned from a single statement. type Result struct { - Rows []*influxql.Row + Rows []influxql.Row Err error } @@ -123,8 +128,8 @@ type Result struct { func (r *Result) MarshalJSON() ([]byte, error) { // Define a struct that outputs "error" as a string. var o struct { - Rows []*influxql.Row `json:"rows,omitempty"` - Err string `json:"error,omitempty"` + Rows []influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` } // Copy fields to output struct. @@ -139,11 +144,13 @@ func (r *Result) MarshalJSON() ([]byte, error) { // UnmarshalJSON decodes the data into the Result struct func (r *Result) UnmarshalJSON(b []byte) error { var o struct { - Rows []*influxql.Row `json:"rows,omitempty"` - Err string `json:"error,omitempty"` + Rows []influxql.Row `json:"rows,omitempty"` + Err string `json:"error,omitempty"` } - err := json.Unmarshal(b, &o) + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + err := dec.Decode(&o) if err != nil { return err } @@ -156,15 +163,15 @@ func (r *Result) UnmarshalJSON(b []byte) error { // Results represents a list of statement results. type Results struct { - Results []*Result + Results []Result Err error } func (r Results) MarshalJSON() ([]byte, error) { // Define a struct that outputs "error" as a string. var o struct { - Results []*Result `json:"results,omitempty"` - Err string `json:"error,omitempty"` + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` } // Copy fields to output struct. @@ -179,11 +186,13 @@ func (r Results) MarshalJSON() ([]byte, error) { // UnmarshalJSON decodes the data into the Results struct func (r *Results) UnmarshalJSON(b []byte) error { var o struct { - Results []*Result `json:"results,omitempty"` - Err string `json:"error,omitempty"` + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` } - err := json.Unmarshal(b, &o) + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + err := dec.Decode(&o) if err != nil { return err } @@ -248,7 +257,9 @@ func (p *Point) UnmarshalJSON(b []byte) error { if err := func() error { var err error - if err = json.Unmarshal(b, &epoch); err != nil { + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + if err = dec.Decode(&epoch); err != nil { return err } // Convert from epoch to time.Time, but only if Timestamp @@ -264,13 +275,15 @@ func (p *Point) UnmarshalJSON(b []byte) error { p.Tags = epoch.Tags p.Timestamp = Timestamp(ts) p.Precision = epoch.Precision - p.Values = epoch.Values + p.Values = normalizeValues(epoch.Values) return nil }(); err == nil { return nil } - if err := json.Unmarshal(b, &normal); err != nil { + dec := json.NewDecoder(bytes.NewBuffer(b)) + dec.UseNumber() + if err := dec.Decode(&normal); err != nil { return err } normal.Timestamp = SetPrecision(normal.Timestamp, normal.Precision) @@ -278,11 +291,30 @@ func (p *Point) UnmarshalJSON(b []byte) error { p.Tags = normal.Tags p.Timestamp = Timestamp(normal.Timestamp) p.Precision = normal.Precision - p.Values = normal.Values + p.Values = normalizeValues(normal.Values) return nil } +// Remove any notion of json.Number +func normalizeValues(values map[string]interface{}) map[string]interface{} { + newValues := map[string]interface{}{} + + for k, v := range values { + switch v := v.(type) { + case json.Number: + jv, e := v.Float64() + if e != nil { + panic(fmt.Sprintf("unable to convert json.Number to float64: %s", e)) + } + newValues[k] = jv + default: + newValues[k] = v + } + } + return newValues +} + // utility functions func (c *Client) Addr() string { diff --git a/cmd/influxd/server_single_integration_test.go b/cmd/influxd/server_single_integration_test.go index 3dd8e8ad16..40c2299223 100644 --- a/cmd/influxd/server_single_integration_test.go +++ b/cmd/influxd/server_single_integration_test.go @@ -10,19 +10,17 @@ import ( "os" "path/filepath" "reflect" + "strconv" "testing" "time" - "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" main "github.com/influxdb/influxdb/cmd/influxd" ) func TestNewServer(t *testing.T) { - // Uncomment this to see the test fail when running for a second time in a row - //t.Skip() - var ( join = "" version = "x.x" @@ -93,15 +91,15 @@ func TestNewServer(t *testing.T) { t.Log("Creating database") u := urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE DATABASE foo"}}) - client := http.Client{Timeout: 100 * time.Millisecond} + httpClient := http.Client{Timeout: 100 * time.Millisecond} - resp, err := client.Get(u.String()) + resp, err := httpClient.Get(u.String()) if err != nil { t.Fatalf("Couldn't create database: %s", err) } defer resp.Body.Close() - var results influxdb.Results + var results client.Results err = json.NewDecoder(resp.Body).Decode(&results) if err != nil { t.Fatalf("Couldn't decode results: %v", err) @@ -122,7 +120,7 @@ func TestNewServer(t *testing.T) { // Query the database exists u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"SHOW DATABASES"}}) - resp, err = client.Get(u.String()) + resp, err = httpClient.Get(u.String()) if err != nil { t.Fatalf("Couldn't query databases: %s", err) } @@ -141,21 +139,18 @@ func TestNewServer(t *testing.T) { t.Fatalf("show databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) } - if len(results.Results) != 1 { - t.Fatalf("show databases failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results)) + expectedResults := client.Results{ + Results: []client.Result{ + {Rows: []influxql.Row{ + influxql.Row{ + Columns: []string{"name"}, + Values: [][]interface{}{{"foo"}}, + }, + }}, + }, } - - rows := results.Results[0].Rows - if len(rows) != 1 { - t.Fatalf("show databases failed. Unexpected rows length. expected: %d, actual %d", 1, len(rows)) - } - row := rows[0] - expectedRow := &influxql.Row{ - Columns: []string{"name"}, - Values: [][]interface{}{{"foo"}}, - } - if !reflect.DeepEqual(row, expectedRow) { - t.Fatalf("show databases failed. Unexpected row. expected: %+v, actual %+v", expectedRow, row) + if !reflect.DeepEqual(results, expectedResults) { + t.Fatalf("show databases failed. Unexpected results. expected: %+v, actual %+v", expectedResults, results) } // Create a retention policy @@ -163,7 +158,7 @@ func TestNewServer(t *testing.T) { u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{"CREATE RETENTION POLICY bar ON foo DURATION 1h REPLICATION 1 DEFAULT"}}) - resp, err = client.Get(u.String()) + resp, err = httpClient.Get(u.String()) if err != nil { t.Fatalf("Couldn't create retention policy: %s", err) } @@ -196,7 +191,7 @@ func TestNewServer(t *testing.T) { buf := []byte(fmt.Sprintf(`{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": %d, "precision":"n","values": {"value": 100}}]}`, now.UnixNano())) t.Logf("Writing raw data: %s", string(buf)) - resp, err = client.Post(u.String(), "application/json", bytes.NewReader(buf)) + resp, err = httpClient.Post(u.String(), "application/json", bytes.NewReader(buf)) if err != nil { t.Fatalf("Couldn't write data: %s", err) } @@ -205,13 +200,14 @@ func TestNewServer(t *testing.T) { } // Need some time for server to get consensus and write data + // TODO corylanou query the status endpoint for the server and wait for the index to update to know the write was applied time.Sleep(100 * time.Millisecond) // Query the data exists t.Log("Query data") u = urlFor(c.BrokerURL(), "query", url.Values{"q": []string{`select value from "foo"."bar".cpu`}, "db": []string{"foo"}}) - resp, err = client.Get(u.String()) + resp, err = httpClient.Get(u.String()) if err != nil { t.Fatalf("Couldn't query databases: %s", err) } @@ -238,24 +234,26 @@ func TestNewServer(t *testing.T) { t.Fatalf("query databases failed. Unexpected status code. expected: %d, actual %d", http.StatusOK, resp.StatusCode) } - if len(results.Results) != 1 { - t.Fatalf("query databases failed. Unexpected results length. expected: %d, actual %d", 1, len(results.Results)) - } - - rows = results.Results[0].Rows - if len(rows) != 1 { - t.Fatalf("query databases failed. Unexpected rows length. expected: %d, actual %d", 1, len(rows)) - } - row = rows[0] - expectedRow = &influxql.Row{ - Name: "cpu", - Columns: []string{"time", "value"}, - Values: [][]interface{}{ - []interface{}{now.UnixNano(), 100}, + strNow := strconv.FormatInt(now.UnixNano(), 10) + expectedResults = client.Results{ + Results: []client.Result{ + {Rows: []influxql.Row{ + { + Name: "cpu", + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + []interface{}{json.Number(strNow), json.Number("100")}, + }, + }}}, }, } - if !reflect.DeepEqual(row, expectedRow) { - t.Fatalf("query databases failed. Unexpected row. expected: %+v, actual %+v", expectedRow, row) + + if !reflect.DeepEqual(results, expectedResults) { + t.Logf("Expected:\n") + t.Logf("%#v\n", expectedResults) + t.Logf("Actual:\n") + t.Logf("%#v\n", results) + t.Fatalf("query databases failed. Unexpected results.") } }