Add simple influx backend through cli option.

pull/96/head
Chris Goller 2016-09-21 15:03:07 -07:00
parent f055a0f325
commit 113fa01323
6 changed files with 75 additions and 95 deletions

47
handlers/proxy.go Normal file
View File

@ -0,0 +1,47 @@
package handlers
import (
"github.com/go-openapi/runtime/middleware"
"github.com/influxdata/mrfusion"
"github.com/influxdata/mrfusion/models"
"golang.org/x/net/context"
op "github.com/influxdata/mrfusion/restapi/operations"
)
type InfluxProxy struct {
TimeSeries mrfusion.TimeSeries
}
func (h *InfluxProxy) Proxy(ctx context.Context, params op.PostSourcesIDProxyParams) middleware.Responder {
// TODO: Add support for multiple TimeSeries with lookup based on params.ID
query := mrfusion.Query{
Command: *params.Query.Query,
Database: params.Query.Database,
RP: params.Query.Rp,
}
response, err := h.TimeSeries.Query(ctx, query)
if err != nil {
if err == mrfusion.ErrUpstreamTimeout {
e := &models.Error{
Code: 408,
Message: "Timeout waiting for Influx response",
}
return op.NewPostSourcesIDProxyRequestTimeout().WithPayload(e)
}
// TODO: Here I want to return the error code from influx.
e := &models.Error{
Code: 400,
Message: err.Error(),
}
return op.NewPostSourcesIDProxyBadRequest().WithPayload(e)
}
res := &models.ProxyResponse{
Results: response,
}
return op.NewPostSourcesIDProxyOK().WithPayload(res)
}

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/mrfusion"
)
type response struct {
@ -15,7 +14,3 @@ type response struct {
func (r response) MarshalJSON() ([]byte, error) {
return json.Marshal(r.Results)
}
func (r response) Results() ([]mrfusion.Result, error) {
return []mrfusion.Result{}, nil
}

View File

@ -67,13 +67,8 @@ func (m *Handler) Proxy(ctx context.Context, params op.PostSourcesIDProxyParams)
return op.NewPostSourcesIDProxyDefault(500)
}
results, err := response.Results()
if err != nil {
return op.NewPostSourcesIDProxyDefault(500)
}
res := &models.ProxyResponse{
Results: results,
Results: response,
}
return op.NewPostSourcesIDProxyOK().WithPayload(res)
}

View File

@ -1,7 +1,6 @@
package mock
import (
"encoding/json"
"fmt"
"time"
@ -70,71 +69,11 @@ func (m *ExplorationStore) Update(ctx context.Context, e mrfusion.Exploration) e
return nil
}
type Row struct {
name string `json:"name,omitempty"`
tags map[string]string `json:"tags,omitempty"`
columns []string `json:"columns,omitempty"`
values [][]interface{} `json:"values,omitempty"`
}
func NewRow(row string) mrfusion.Row {
r := Row{}
json.Unmarshal([]byte(row), &r)
return &r
}
var SampleRow mrfusion.Row = NewRow(`{"name":"cpu","columns":["time","value"],"values":[["2000-01-01T01:00:00Z",1]]}`)
func (r *Row) Name() string {
return r.name
}
func (r *Row) Tags() map[string]string {
return r.tags
}
func (r *Row) Columns() []string {
return r.columns
}
func (r *Row) Values() [][]interface{} {
return r.values
}
type Result struct {
rows []mrfusion.Row
}
func NewResult(row mrfusion.Row) mrfusion.Result {
return &Result{
rows: []mrfusion.Row{row},
}
}
var SampleResult mrfusion.Result = NewResult(SampleRow)
func (r *Result) Series() ([]mrfusion.Row, error) {
return r.rows, nil
}
type Response struct {
results []mrfusion.Result
}
func (r *Response) MarshalJSON() ([]byte, error) {
return []byte(`{}`), nil
}
var SampleResponse mrfusion.Response = NewResponse(SampleResult)
func NewResponse(result mrfusion.Result) mrfusion.Response {
return &Response{
results: []mrfusion.Result{result},
}
}
func (r *Response) Results() ([]mrfusion.Result, error) {
return r.results, nil
return []byte(`[{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:00Z",1]]}]}]`), nil
}
type TimeSeries struct {
@ -149,7 +88,7 @@ func NewTimeSeries(hosts []string, response mrfusion.Response) mrfusion.TimeSeri
}
}
var DefaultTimeSeries mrfusion.TimeSeries = NewTimeSeries([]string{"hydrogen", "helium", "hadron", "howdy"}, SampleResponse)
var DefaultTimeSeries mrfusion.TimeSeries = NewTimeSeries([]string{"hydrogen", "helium", "hadron", "howdy"}, &Response{})
func (t *TimeSeries) Query(context.Context, mrfusion.Query) (mrfusion.Response, error) {
return t.Response, nil

View File

@ -14,6 +14,8 @@ import (
"github.com/influxdata/mrfusion"
"github.com/influxdata/mrfusion/dist"
"github.com/influxdata/mrfusion/handlers"
"github.com/influxdata/mrfusion/influx"
"github.com/influxdata/mrfusion/mock"
"github.com/influxdata/mrfusion/restapi/operations"
)
@ -26,6 +28,10 @@ var devFlags = struct {
Develop bool `short:"d" long:"develop" description:"Run server in develop mode."`
}{}
var influxFlags = struct {
Server string `short:"s" long:"server" description:"Full URL of InfluxDB server (http://localhost:8086)"`
}{}
func configureFlags(api *operations.MrFusionAPI) {
api.CommandLineOptionsGroups = []swag.CommandLineOptionsGroup{
swag.CommandLineOptionsGroup{
@ -33,6 +39,11 @@ func configureFlags(api *operations.MrFusionAPI) {
LongDescription: "Server will use the ui/build directory directly.",
Options: &devFlags,
},
swag.CommandLineOptionsGroup{
ShortDescription: "Default Time Series Backend",
LongDescription: "Specify the url of an InfxluDB server",
Options: &influxFlags,
},
}
}
@ -132,7 +143,18 @@ func configureAPI(api *operations.MrFusionAPI) http.Handler {
return middleware.NotImplemented("operation .PostSources has not yet been implemented")
})
api.PostSourcesIDProxyHandler = operations.PostSourcesIDProxyHandlerFunc(mockHandler.Proxy)
if len(influxFlags.Server) > 0 {
c, err := influx.NewClient(influxFlags.Server)
if err != nil {
panic(err)
}
h := handlers.InfluxProxy{
TimeSeries: c,
}
api.PostSourcesIDProxyHandler = operations.PostSourcesIDProxyHandlerFunc(h.Proxy)
} else {
api.PostSourcesIDProxyHandler = operations.PostSourcesIDProxyHandlerFunc(mockHandler.Proxy)
}
api.PostSourcesIDRolesHandler = operations.PostSourcesIDRolesHandlerFunc(func(ctx context.Context, params operations.PostSourcesIDRolesParams) middleware.Responder {
return middleware.NotImplemented("operation .PostSourcesIDRoles has not yet been implemented")

View File

@ -1,10 +1,6 @@
package mrfusion
import (
"encoding/json"
"golang.org/x/net/context"
)
import "golang.org/x/net/context"
// Query retrieves a Response from a TimeSeries.
type Query struct {
@ -13,23 +9,9 @@ type Query struct {
RP string // RP is a retention policy and optional; if empty will not be used.
}
// Row represents a single row returned from the execution of a single statement in a `Query`.
type Row interface {
Name() string
Tags() map[string]string
Columns() []string
Values() [][]interface{}
}
// Result represents a resultset returned from a single statement in a `Query`.
type Result interface {
Series() ([]Row, error)
}
// Response is the result of a query against a TimeSeries
type Response interface {
Results() ([]Result, error)
json.Marshaler
MarshalJSON() ([]byte, error)
}
// MonitoredService is a service sending monitoring data to a `TimeSeries`