Merge pull request #940 from influxdata/js-influxql-service

feat: add an influxql service that communicates with influxdb 1.x
pull/10616/head
Jonathan A. Sternberg 2018-10-03 12:00:40 -05:00 committed by GitHub
commit 33c1e95f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 147 additions and 0 deletions

82
query/influxql/service.go Normal file
View File

@ -0,0 +1,82 @@
package influxql
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/influxdata/flux"
"github.com/influxdata/platform/query"
)
// Endpoint contains the necessary information to connect to a specific cluster.
type Endpoint struct {
URL string `json:"url"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}
// Service is a client for the influxdb 1.x endpoint that implements the QueryService
// for the influxql compiler type.
type Service struct {
// Endpoints maps a cluster name to the influxdb 1.x endpoint.
Endpoints map[string]Endpoint
}
// Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint.
func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
// Verify that this is an influxql query in the compiler.
compiler, ok := req.Compiler.(*Compiler)
if !ok {
return nil, fmt.Errorf("influxql query service does not support the '%s' compiler type", req.Compiler.CompilerType())
}
// Lookup the endpoint information for the cluster.
endpoint, ok := s.Endpoints[compiler.Cluster]
if !ok {
return nil, fmt.Errorf("no endpoint found for cluster %s", compiler.Cluster)
}
// Prepare the HTTP request.
u, err := url.Parse(endpoint.URL)
if err != nil {
return nil, err
}
u.Path += "/query"
params := url.Values{}
params.Set("q", compiler.Query)
if compiler.DB != "" {
params.Set("db", compiler.DB)
}
if compiler.RP != "" {
params.Set("rp", compiler.RP)
}
u.RawQuery = params.Encode()
hreq, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
hreq.WithContext(ctx)
hreq.SetBasicAuth(endpoint.Username, endpoint.Password)
// Perform the request and look at the status code.
resp, err := http.DefaultClient.Do(hreq)
if err != nil {
return nil, err
} else if resp.StatusCode/100 != 2 {
return nil, fmt.Errorf("unexpected http status: %s", resp.Status)
}
// Decode the response into the JSON structure.
var results Response
if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
return nil, err
}
// Return a result iterator using the response.
return NewResponseIterator(&results), nil
}

View File

@ -0,0 +1,65 @@
package influxql_test
import (
"context"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
func TestService(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Verify the parameters were passed correctly.
if want, got := "POST", r.Method; want != got {
t.Errorf("unexpected method -want/+got\n\t- %q\n\t+ %q", want, got)
}
if want, got := "SHOW DATABASES", r.FormValue("q"); want != got {
t.Errorf("unexpected query -want/+got\n\t- %q\n\t+ %q", want, got)
}
if want, got := "db0", r.FormValue("db"); want != got {
t.Errorf("unexpected database -want/+got\n\t- %q\n\t+ %q", want, got)
}
if want, got := "rp0", r.FormValue("rp"); want != got {
t.Errorf("unexpected retention policy -want/+got\n\t- %q\n\t+ %q", want, got)
}
user, pass, ok := r.BasicAuth()
if !ok {
w.WriteHeader(http.StatusUnauthorized)
return
}
if want, got := "me", user; want != got {
t.Errorf("unexpected username -want/+got\n\t- %q\n\t+ %q", want, got)
}
if want, got := "secretpassword", pass; want != got {
t.Errorf("unexpected password -want/+got\n\t- %q\n\t+ %q", want, got)
}
io.WriteString(w, `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`)
}))
defer server.Close()
service := &influxql.Service{
Endpoints: map[string]influxql.Endpoint{
"myserver": {
URL: server.URL,
Username: "me",
Password: "secretpassword",
},
},
}
results, err := service.Query(context.Background(), &query.Request{
Compiler: &influxql.Compiler{
Cluster: "myserver",
DB: "db0",
RP: "rp0",
Query: "SHOW DATABASES",
},
})
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
defer results.Cancel()
}