draft implementation of the http query api.

pull/17/head
John Shahid 2013-10-17 13:21:50 -04:00
parent ab00b6155e
commit ea41a98727
5 changed files with 364 additions and 0 deletions

161
src/hapi/api.go Normal file
View File

@ -0,0 +1,161 @@
package hapi
import (
log "code.google.com/p/log4go"
"encoding/json"
"engine"
"github.com/bmizerany/pat"
"github.com/fitstar/falcore"
"github.com/fitstar/falcore/filter"
"net"
"net/http"
"protocol"
"strings"
)
type HttpServer struct {
conn net.Listener
Server *falcore.Server
config *Configuration
engine engine.EngineI
shutdown chan bool
}
func NewHttpServer(config *Configuration, theEngine engine.EngineI) *HttpServer {
self := &HttpServer{}
self.config = config
self.engine = theEngine
self.shutdown = make(chan bool)
return self
}
func (self *HttpServer) ListenAndServe() {
conn, err := net.Listen("tcp", self.config.HttpAddr)
if err != nil {
log.Error("Listen: ", err)
}
self.Serve(conn)
}
func (self *HttpServer) Serve(listener net.Listener) {
self.conn = listener
p := pat.New()
// Run the given query and return an array of series or a chunked response
// with each batch of points we get back
p.Get("/api/db/:db/series", CorsAndCompressionHeaderHandler(self.query))
// Write points to the given database
p.Post("/api/db/:db/series", CorsHeaderHandler(self.writePoints))
pipeline := falcore.NewPipeline()
pipeline.Upstream.PushBack(filter.NewHandlerFilter(p))
self.Server = falcore.NewServer(-1, pipeline)
file, err := listener.(*net.TCPListener).File()
if err != nil {
panic(err)
}
if err := self.Server.FdListen(int(file.Fd())); err != nil {
panic(err)
}
if err := self.Server.ListenAndServe(); err != nil && !strings.Contains(err.Error(), "closed network") {
panic(err)
}
self.shutdown <- true
}
func (self *HttpServer) Close() {
log.Info("Closing http server")
self.Server.StopAccepting()
log.Info("Waiting for all requests to finish before killing the process")
<-self.shutdown
}
func allPointsYield(w http.ResponseWriter) (map[string]*protocol.Series, func(*protocol.Series) error) {
memSeries := map[string]*protocol.Series{}
return memSeries, func(series *protocol.Series) error {
oldSeries := memSeries[*series.Name]
if oldSeries == nil {
memSeries[*series.Name] = series
return nil
}
oldSeries.Points = append(oldSeries.Points, series.Points...)
return nil
}
}
func (self *HttpServer) query(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query().Get("q")
db := r.URL.Query().Get(":db")
memSeeries, yield := allPointsYield(w)
err := self.engine.RunQuery(db, query, yield)
if err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
}
data, err := serializeSeries(memSeeries)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
}
w.Write(data)
w.WriteHeader(http.StatusOK)
}
func (self *HttpServer) writePoints(w http.ResponseWriter, r *http.Request) {
}
type Point struct {
Timestamp int64 `json:"timestamp"`
SequenceNumber uint32 `json:"sequenceNumber"`
Values []interface{} `json:"values"`
}
type SerializedSeries struct {
Name string `json:"name"`
Columns []string `json:"columns"`
Points [][]interface{} `json:"points"`
}
func serializeSeries(memSeries map[string]*protocol.Series) ([]byte, error) {
serializedSeries := []*SerializedSeries{}
for _, series := range memSeries {
columns := []string{"time", "sequence_number"}
for _, field := range series.Fields {
columns = append(columns, *field.Name)
}
points := [][]interface{}{}
for _, row := range series.Points {
rowValues := []interface{}{*row.Timestamp, *row.SequenceNumber}
for idx, value := range row.Values {
switch *series.Fields[idx].Type {
case protocol.FieldDefinition_STRING:
rowValues = append(rowValues, *value.StringValue)
case protocol.FieldDefinition_INT32:
rowValues = append(rowValues, *value.IntValue)
case protocol.FieldDefinition_INT64:
rowValues = append(rowValues, *value.Int64Value)
case protocol.FieldDefinition_DOUBLE:
rowValues = append(rowValues, *value.DoubleValue)
case protocol.FieldDefinition_BOOL:
rowValues = append(rowValues, *value.BoolValue)
}
}
points = append(points, rowValues)
}
serializedSeries = append(serializedSeries, &SerializedSeries{
Name: *series.Name,
Columns: columns,
Points: points,
})
}
return json.Marshal(serializedSeries)
}

115
src/hapi/api_test.go Normal file
View File

@ -0,0 +1,115 @@
package hapi
import (
"common"
"encoding/json"
"fmt"
"io/ioutil"
. "launchpad.net/gocheck"
"net"
"net/http"
"net/url"
"protocol"
"testing"
"time"
)
// Hook up gocheck into the gotest runner.
func Test(t *testing.T) {
TestingT(t)
}
type ApiSuite struct {
listener net.Listener
server *HttpServer
}
var _ = Suite(&ApiSuite{})
type MockEngine struct{}
func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Series) error) error {
series, err := common.StringToSeriesArray(`
[
{
"points": [
{
"values": [
{
"string_value": "some_value"
},
{
"int_value": 1
}
],
"timestamp": 1381346631,
"sequence_number": 1
},
{
"values": [
{
"string_value": "some_value"
},
{
"int_value": 2
}
],
"timestamp": 1381346631,
"sequence_number": 2
}
],
"name": "foo",
"fields": [
{
"type": "STRING",
"name": "column_one"
},
{
"type": "INT32",
"name": "column_two"
}
]
}
]
`)
if err != nil {
return err
}
return yield(series[0])
}
func (self *ApiSuite) SetUpSuite(c *C) {
self.server = NewHttpServer(nil, &MockEngine{})
var err error
self.listener, err = net.Listen("tcp4", ":")
c.Assert(err, IsNil)
go func() {
self.server.Serve(self.listener)
}()
time.Sleep(1 * time.Second)
}
func (self *ApiSuite) TearDownSuite(c *C) {
self.server.Close()
}
func (self *ApiSuite) TestQuerying(c *C) {
port := self.listener.Addr().(*net.TCPAddr).Port
query := "select * from foo where column_one == 'some_value';"
query = url.QueryEscape(query)
addr := fmt.Sprintf("http://localhost:%d/api/db/foo/series?q=%s", port, query)
resp, err := http.Get(addr)
c.Assert(err, IsNil)
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series := []SerializedSeries{}
err = json.Unmarshal(data, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Name, Equals, "foo")
// time, seq, column_one, column_two
c.Assert(series[0].Columns, HasLen, 4)
c.Assert(series[0].Points, HasLen, 2)
}

5
src/hapi/config.go Normal file
View File

@ -0,0 +1,5 @@
package hapi
type Configuration struct {
HttpAddr string
}

19
src/hapi/cors.go Normal file
View File

@ -0,0 +1,19 @@
package hapi
import (
"net/http"
)
func CorsHeaderHandler(handler http.HandlerFunc) http.HandlerFunc {
return func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Add("Access-Control-Allow-Origin", "*")
rw.Header().Add("Access-Control-Max-Age", "2592000")
rw.Header().Add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE")
rw.Header().Add("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept")
handler(rw, req)
}
}
func CorsAndCompressionHeaderHandler(handler http.HandlerFunc) http.HandlerFunc {
return CorsHeaderHandler(CompressionHandler(true, handler))
}

View File

@ -0,0 +1,64 @@
package hapi
import (
"compress/gzip"
"compress/zlib"
"io"
"net/http"
"strings"
)
type CompressedResponseWriter struct {
responseWriter http.ResponseWriter
writer io.Writer
}
func NewCompressionResponseWriter(useCompression bool, rw http.ResponseWriter, req *http.Request) *CompressedResponseWriter {
var writer io.Writer = rw
if req.Header.Get("Accept-Encoding") != "" {
encodings := strings.Split(req.Header.Get("Accept-Encoding"), ",")
for _, val := range encodings {
if val == "gzip" {
rw.Header().Set("Content-Encoding", "gzip")
writer, _ = gzip.NewWriterLevel(writer, gzip.BestSpeed)
break
} else if val == "deflate" {
rw.Header().Set("Content-Encoding", "deflate")
writer, _ = zlib.NewWriterLevel(writer, zlib.BestSpeed)
break
}
}
}
return &CompressedResponseWriter{rw, writer}
}
func (self *CompressedResponseWriter) Header() http.Header {
return self.responseWriter.Header()
}
func (self *CompressedResponseWriter) Write(bs []byte) (int, error) {
return self.writer.Write(bs)
}
func (self *CompressedResponseWriter) WriteHeader(responseCode int) {
self.responseWriter.WriteHeader(responseCode)
}
func CompressionHandler(enableCompression bool, handler http.HandlerFunc) http.HandlerFunc {
if !enableCompression {
return handler
}
return func(rw http.ResponseWriter, req *http.Request) {
crw := NewCompressionResponseWriter(true, rw, req)
handler(crw, req)
switch x := crw.writer.(type) {
case *gzip.Writer:
x.Close()
case *zlib.Writer:
x.Close()
}
}
}