fix(server): timeout and fetch version concurrently
fixes an issue where version was not being fetched concurrently and not timing out for each source. Co-authored-by: Kelvin Wang <sherkrainwang@gmail.com>pull/5198/head
parent
d18a1aca36
commit
100148c087
|
@ -62,8 +62,9 @@ func (c *Client) FluxEnabled() (bool, error) {
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
hc := &http.Client{}
|
||||
hc := &http.Client{
|
||||
Timeout: c.Timeout,
|
||||
}
|
||||
if c.InsecureSkipVerify {
|
||||
hc.Transport = skipVerifyTransport
|
||||
} else {
|
||||
|
|
|
@ -2,12 +2,12 @@ package server
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
"crypto/tls"
|
||||
|
||||
"github.com/influxdata/chronograf/enterprise"
|
||||
"github.com/influxdata/chronograf/flux"
|
||||
|
@ -31,7 +31,7 @@ type sourceLinks struct {
|
|||
Databases string `json:"databases"` // URL for the databases contained within this source
|
||||
Annotations string `json:"annotations"` // URL for the annotations of this source
|
||||
Health string `json:"health"` // URL for source health
|
||||
Flux string `json:"flux,omitempty"` // URL for flux if it exists
|
||||
Flux string `json:"flux,omitempty"` // URL for flux if it exists
|
||||
}
|
||||
|
||||
type sourceResponse struct {
|
||||
|
@ -76,8 +76,7 @@ var (
|
|||
skipVerifyTransport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
defaultTransport = &http.Transport{
|
||||
}
|
||||
defaultTransport = &http.Transport{}
|
||||
)
|
||||
|
||||
func hasFlux(ctx context.Context, src chronograf.Source) (bool, error) {
|
||||
|
@ -87,8 +86,8 @@ func hasFlux(ctx context.Context, src chronograf.Source) (bool, error) {
|
|||
}
|
||||
|
||||
cli := &flux.Client{
|
||||
URL: url,
|
||||
Timeout: 500 *time.Millisecond,
|
||||
URL: url,
|
||||
Timeout: 500 * time.Millisecond,
|
||||
}
|
||||
|
||||
return cli.FluxEnabled()
|
||||
|
@ -215,6 +214,10 @@ func (s *Service) tsdbType(ctx context.Context, src *chronograf.Source) (string,
|
|||
if err := cli.Connect(ctx, src); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
return cli.Type(ctx)
|
||||
}
|
||||
|
||||
|
@ -225,29 +228,32 @@ type getSourcesResponse struct {
|
|||
// Sources returns all sources from the store.
|
||||
func (s *Service) Sources(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
res := getSourcesResponse{
|
||||
Sources: make([]sourceResponse, 0),
|
||||
}
|
||||
|
||||
srcs, err := s.Store.Sources(ctx).All(ctx)
|
||||
if err != nil {
|
||||
Error(w, http.StatusInternalServerError, "Error loading sources", s.Logger)
|
||||
return
|
||||
}
|
||||
|
||||
res := getSourcesResponse{
|
||||
Sources: make([]sourceResponse, len(srcs)),
|
||||
}
|
||||
|
||||
sources := make([]sourceResponse, 0)
|
||||
sourceCh := make(chan sourceResponse, len(srcs))
|
||||
for _, src := range srcs {
|
||||
dbVersion, err := s.tsdbVersion(ctx, &src)
|
||||
if err != nil {
|
||||
dbVersion = "Unknown"
|
||||
s.Logger.WithField("error", err.Error()).Info("Failed to retrieve database version")
|
||||
}
|
||||
src.Version = dbVersion
|
||||
|
||||
sources = append(sources, newSourceResponse(ctx, src))
|
||||
go func(src chronograf.Source) {
|
||||
dbVersion, err := s.tsdbVersion(ctx, &src)
|
||||
if err != nil {
|
||||
dbVersion = "Unknown"
|
||||
s.Logger.WithField("error", err.Error()).Info("Failed to retrieve database version")
|
||||
}
|
||||
src.Version = dbVersion
|
||||
sourceCh <- newSourceResponse(ctx, src)
|
||||
}(src)
|
||||
}
|
||||
for i := 0; i < len(srcs); i++ {
|
||||
res.Sources = append(res.Sources, <-sourceCh)
|
||||
}
|
||||
|
||||
res.Sources = sources
|
||||
|
||||
encodeJSON(w, http.StatusOK, res, s.Logger)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue