feat: add an influxql service that communicates with influxdb 1.x
It creates a simple client that follows the QueryService interface and uses the `influxql.Compiler` type to determine where the query should be routed to and to return the query as a `flux.ResultIterator`. This will be useful for replaying transpiler queries against influxdb 1.x servers to verify correctness.pull/10616/head
parent
8cf045d7cc
commit
37ffbfc687
|
@ -0,0 +1,82 @@
|
|||
package influxql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/platform/query"
|
||||
)
|
||||
|
||||
// Endpoint contains the necessary information to connect to a specific cluster.
|
||||
type Endpoint struct {
|
||||
URL string `json:"url"`
|
||||
Username string `json:"username,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
}
|
||||
|
||||
// Service is a client for the influxdb 1.x endpoint that implements the QueryService
|
||||
// for the influxql compiler type.
|
||||
type Service struct {
|
||||
// Endpoints maps a cluster name to the influxdb 1.x endpoint.
|
||||
Endpoints map[string]Endpoint
|
||||
}
|
||||
|
||||
// Query will execute a query for the influxql.Compiler type against an influxdb 1.x endpoint.
|
||||
func (s *Service) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
|
||||
// Verify that this is an influxql query in the compiler.
|
||||
compiler, ok := req.Compiler.(*Compiler)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("influxql query service does not support the '%s' compiler type", req.Compiler.CompilerType())
|
||||
}
|
||||
|
||||
// Lookup the endpoint information for the cluster.
|
||||
endpoint, ok := s.Endpoints[compiler.Cluster]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("no endpoint found for cluster %s", compiler.Cluster)
|
||||
}
|
||||
|
||||
// Prepare the HTTP request.
|
||||
u, err := url.Parse(endpoint.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path += "/query"
|
||||
|
||||
params := url.Values{}
|
||||
params.Set("q", compiler.Query)
|
||||
if compiler.DB != "" {
|
||||
params.Set("db", compiler.DB)
|
||||
}
|
||||
if compiler.RP != "" {
|
||||
params.Set("rp", compiler.RP)
|
||||
}
|
||||
u.RawQuery = params.Encode()
|
||||
|
||||
hreq, err := http.NewRequest("POST", u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hreq.WithContext(ctx)
|
||||
hreq.SetBasicAuth(endpoint.Username, endpoint.Password)
|
||||
|
||||
// Perform the request and look at the status code.
|
||||
resp, err := http.DefaultClient.Do(hreq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if resp.StatusCode/100 != 2 {
|
||||
return nil, fmt.Errorf("unexpected http status: %s", resp.Status)
|
||||
}
|
||||
|
||||
// Decode the response into the JSON structure.
|
||||
var results Response
|
||||
if err := json.NewDecoder(resp.Body).Decode(&results); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Return a result iterator using the response.
|
||||
return NewResponseIterator(&results), nil
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package influxql_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/query/influxql"
|
||||
)
|
||||
|
||||
func TestService(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Verify the parameters were passed correctly.
|
||||
if want, got := "POST", r.Method; want != got {
|
||||
t.Errorf("unexpected method -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
if want, got := "SHOW DATABASES", r.FormValue("q"); want != got {
|
||||
t.Errorf("unexpected query -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
if want, got := "db0", r.FormValue("db"); want != got {
|
||||
t.Errorf("unexpected database -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
if want, got := "rp0", r.FormValue("rp"); want != got {
|
||||
t.Errorf("unexpected retention policy -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
user, pass, ok := r.BasicAuth()
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
if want, got := "me", user; want != got {
|
||||
t.Errorf("unexpected username -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
if want, got := "secretpassword", pass; want != got {
|
||||
t.Errorf("unexpected password -want/+got\n\t- %q\n\t+ %q", want, got)
|
||||
}
|
||||
io.WriteString(w, `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["db0"]]}]}]}`)
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
service := &influxql.Service{
|
||||
Endpoints: map[string]influxql.Endpoint{
|
||||
"myserver": {
|
||||
URL: server.URL,
|
||||
Username: "me",
|
||||
Password: "secretpassword",
|
||||
},
|
||||
},
|
||||
}
|
||||
results, err := service.Query(context.Background(), &query.Request{
|
||||
Compiler: &influxql.Compiler{
|
||||
Cluster: "myserver",
|
||||
DB: "db0",
|
||||
RP: "rp0",
|
||||
Query: "SHOW DATABASES",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
defer results.Cancel()
|
||||
}
|
Loading…
Reference in New Issue