implement the write api.
parent
77819383f4
commit
9254aa8b54
109
src/hapi/api.go
109
src/hapi/api.go
|
@ -2,9 +2,11 @@ package hapi
|
|||
|
||||
import (
|
||||
log "code.google.com/p/log4go"
|
||||
"coordinator"
|
||||
"encoding/json"
|
||||
"engine"
|
||||
"github.com/bmizerany/pat"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"protocol"
|
||||
|
@ -12,16 +14,18 @@ import (
|
|||
)
|
||||
|
||||
type HttpServer struct {
|
||||
conn net.Listener
|
||||
config *Configuration
|
||||
engine engine.EngineI
|
||||
shutdown chan bool
|
||||
conn net.Listener
|
||||
config *Configuration
|
||||
engine engine.EngineI
|
||||
coordinator coordinator.Coordinator
|
||||
shutdown chan bool
|
||||
}
|
||||
|
||||
func NewHttpServer(config *Configuration, theEngine engine.EngineI) *HttpServer {
|
||||
func NewHttpServer(config *Configuration, theEngine engine.EngineI, theCoordinator coordinator.Coordinator) *HttpServer {
|
||||
self := &HttpServer{}
|
||||
self.config = config
|
||||
self.engine = theEngine
|
||||
self.coordinator = theCoordinator
|
||||
self.shutdown = make(chan bool)
|
||||
return self
|
||||
}
|
||||
|
@ -121,6 +125,7 @@ func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) {
|
|||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
writer.done()
|
||||
|
@ -135,6 +140,100 @@ func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) {
|
|||
// {"name": "seriesname", "columns": ["time", "email"], "points": [[], []]}
|
||||
// ]
|
||||
func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) {
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
series, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
serializedSeries := []*SerializedSeries{}
|
||||
err = json.Unmarshal(series, &serializedSeries)
|
||||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// convert the wire format to the internal representation of the time series
|
||||
for _, s := range serializedSeries {
|
||||
if len(s.Points) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := []*protocol.FieldDefinition{}
|
||||
for idx, column := range s.Columns {
|
||||
var fieldType protocol.FieldDefinition_Type
|
||||
switch s.Points[0][idx].(type) {
|
||||
case int:
|
||||
fieldType = protocol.FieldDefinition_INT64
|
||||
case float64:
|
||||
fieldType = protocol.FieldDefinition_DOUBLE
|
||||
case string:
|
||||
fieldType = protocol.FieldDefinition_STRING
|
||||
case bool:
|
||||
fieldType = protocol.FieldDefinition_BOOL
|
||||
}
|
||||
|
||||
_column := column
|
||||
fields = append(fields, &protocol.FieldDefinition{
|
||||
Name: &_column,
|
||||
Type: &fieldType,
|
||||
})
|
||||
}
|
||||
|
||||
points := []*protocol.Point{}
|
||||
for _, point := range s.Points {
|
||||
values := []*protocol.FieldValue{}
|
||||
var timestamp *int64
|
||||
for idx, field := range fields {
|
||||
if s.Columns[idx] == "time" {
|
||||
_timestamp := point[idx].(int64)
|
||||
timestamp = &_timestamp
|
||||
}
|
||||
|
||||
switch *field.Type {
|
||||
case protocol.FieldDefinition_STRING:
|
||||
if str, ok := point[idx].(string); ok {
|
||||
values = append(values, &protocol.FieldValue{StringValue: &str})
|
||||
continue
|
||||
}
|
||||
case protocol.FieldDefinition_INT64:
|
||||
if integer, ok := point[idx].(int); ok {
|
||||
temp := int64(integer)
|
||||
values = append(values, &protocol.FieldValue{Int64Value: &temp})
|
||||
continue
|
||||
}
|
||||
case protocol.FieldDefinition_DOUBLE:
|
||||
if double, ok := point[idx].(float64); ok {
|
||||
values = append(values, &protocol.FieldValue{DoubleValue: &double})
|
||||
continue
|
||||
}
|
||||
case protocol.FieldDefinition_BOOL:
|
||||
if boolean, ok := point[idx].(bool); ok {
|
||||
values = append(values, &protocol.FieldValue{BoolValue: &boolean})
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// if we reached this line then the dynamic type didn't match
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
points = append(points, &protocol.Point{
|
||||
Values: values,
|
||||
Timestamp: timestamp,
|
||||
})
|
||||
}
|
||||
|
||||
series := &protocol.Series{
|
||||
Name: &s.Name,
|
||||
Fields: fields,
|
||||
Points: points,
|
||||
}
|
||||
|
||||
self.coordinator.WriteSeriesData(db, series)
|
||||
}
|
||||
}
|
||||
|
||||
type Point struct {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package hapi
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"common"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
@ -9,6 +10,7 @@ import (
|
|||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"parser"
|
||||
"protocol"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -20,8 +22,9 @@ func Test(t *testing.T) {
|
|||
}
|
||||
|
||||
type ApiSuite struct {
|
||||
listener net.Listener
|
||||
server *HttpServer
|
||||
listener net.Listener
|
||||
server *HttpServer
|
||||
coordinator *MockCoordinator
|
||||
}
|
||||
|
||||
var _ = Suite(&ApiSuite{})
|
||||
|
@ -82,8 +85,21 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se
|
|||
return yield(series[1])
|
||||
}
|
||||
|
||||
type MockCoordinator struct {
|
||||
series []*protocol.Series
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error {
|
||||
return nil
|
||||
}
|
||||
func (self *MockCoordinator) WriteSeriesData(db string, series *protocol.Series) error {
|
||||
self.series = append(self.series, series)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ApiSuite) SetUpSuite(c *C) {
|
||||
self.server = NewHttpServer(nil, &MockEngine{})
|
||||
self.coordinator = &MockCoordinator{}
|
||||
self.server = NewHttpServer(nil, &MockEngine{}, self.coordinator)
|
||||
var err error
|
||||
self.listener, err = net.Listen("tcp4", ":")
|
||||
c.Assert(err, IsNil)
|
||||
|
@ -141,3 +157,49 @@ func (self *ApiSuite) TestChunkedQuery(c *C) {
|
|||
c.Assert(series.Points, HasLen, 2)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestWriteData(c *C) {
|
||||
data := `
|
||||
[
|
||||
{
|
||||
"points": [
|
||||
["1", 1, 1.0, true],
|
||||
["2", 2, 2.0, false],
|
||||
["3", 3, 3.0, true]
|
||||
],
|
||||
"name": "foo",
|
||||
"columns": ["column_one", "column_two", "column_three", "column_four"]
|
||||
}
|
||||
]
|
||||
`
|
||||
|
||||
port := self.listener.Addr().(*net.TCPAddr).Port
|
||||
addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series", port)
|
||||
resp, err := http.Post(addr, "application/json", bytes.NewBufferString(data))
|
||||
c.Assert(err, IsNil)
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
fmt.Printf("body: %s\n", string(body))
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
c.Assert(self.coordinator.series, HasLen, 1)
|
||||
series := self.coordinator.series[0]
|
||||
c.Assert(series.Fields, HasLen, 4)
|
||||
|
||||
// check the types
|
||||
c.Assert(*series.Fields[0].Name, Equals, "column_one")
|
||||
c.Assert(*series.Fields[0].Type, Equals, protocol.FieldDefinition_STRING)
|
||||
// TODO: cannot get an int64 from a json object
|
||||
c.Assert(*series.Fields[1].Name, Equals, "column_two")
|
||||
c.Assert(*series.Fields[1].Type, Equals, protocol.FieldDefinition_DOUBLE)
|
||||
c.Assert(*series.Fields[2].Name, Equals, "column_three")
|
||||
c.Assert(*series.Fields[2].Type, Equals, protocol.FieldDefinition_DOUBLE)
|
||||
c.Assert(*series.Fields[3].Name, Equals, "column_four")
|
||||
c.Assert(*series.Fields[3].Type, Equals, protocol.FieldDefinition_BOOL)
|
||||
|
||||
// check the values
|
||||
c.Assert(series.Points, HasLen, 3)
|
||||
c.Assert(*series.Points[0].Values[0].StringValue, Equals, "1")
|
||||
c.Assert(*series.Points[0].Values[1].DoubleValue, Equals, 1.0)
|
||||
c.Assert(*series.Points[0].Values[2].DoubleValue, Equals, 1.0)
|
||||
c.Assert(*series.Points[0].Values[3].BoolValue, Equals, true)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue