test: expose tcpaddr for enterprise tests (#22172)
* docs: update comment for series updates * fix: expose TCP address for Enterprise test harness * refactor: remove dead RemoteServer codepull/22195/head
parent
f71ab14888
commit
fd81373937
|
@ -43,10 +43,6 @@ func TestServer_BackupAndRestore(t *testing.T) {
|
|||
s := OpenServer(config)
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot modify remote server config")
|
||||
}
|
||||
|
||||
if err := s.CreateDatabaseAndRetentionPolicy(db, NewRetentionPolicySpec(rp, 1, 0), true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -22,10 +22,6 @@ func TestConcurrentServer_WriteValues(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Not implemented on remote server")
|
||||
}
|
||||
|
||||
// The first %%d becomes a %d once fmt is done, so we can then inject new
|
||||
// measurement names later on.
|
||||
write := strings.Join([]string{
|
||||
|
@ -54,10 +50,6 @@ func TestConcurrentServer_TagValues(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Not implemented on remote server")
|
||||
}
|
||||
|
||||
write := strings.Join([]string{
|
||||
fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`a,host=serverA,region=useast val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
|
@ -113,10 +105,6 @@ func TestConcurrentServer_ShowMeasurements(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Not implemented on remote server")
|
||||
}
|
||||
|
||||
write := strings.Join([]string{
|
||||
fmt.Sprintf(`a,host=serverA,region=uswest val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
fmt.Sprintf(`a,host=serverA,region=useast val=23.2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
|
||||
|
|
|
@ -131,10 +131,6 @@ func TestServer_DELETE_DROP_SERIES_DROP_MEASUREMENT(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Not implemented on remote server")
|
||||
}
|
||||
|
||||
localServer := s.(*LocalServer)
|
||||
|
||||
// First initialize some writes such that we end up with 10 shards.
|
||||
|
|
|
@ -32,6 +32,7 @@ var seed int64
|
|||
// Server represents a test wrapper for run.Server.
|
||||
type Server interface {
|
||||
URL() string
|
||||
TcpAddr() string
|
||||
Open() error
|
||||
SetLogOutput(w io.Writer)
|
||||
Close()
|
||||
|
@ -51,111 +52,6 @@ type Server interface {
|
|||
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
|
||||
}
|
||||
|
||||
// RemoteServer is a Server that is accessed remotely via the HTTP API
|
||||
type RemoteServer struct {
|
||||
*client
|
||||
url string
|
||||
}
|
||||
|
||||
func (s *RemoteServer) URL() string {
|
||||
return s.url
|
||||
}
|
||||
|
||||
func (s *RemoteServer) Open() error {
|
||||
resp, err := http.Get(s.URL() + "/ping")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body := strings.TrimSpace(string(MustReadAll(resp.Body)))
|
||||
if resp.StatusCode != http.StatusNoContent {
|
||||
return fmt.Errorf("unexpected status code: code=%d, body=%s", resp.StatusCode, body)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *RemoteServer) Close() {
|
||||
// ignore, we can't shutdown a remote server
|
||||
}
|
||||
|
||||
func (s *RemoteServer) SetLogOutput(w io.Writer) {
|
||||
// ignore, we can't change the logging of a remote server
|
||||
}
|
||||
|
||||
func (s *RemoteServer) Closed() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *RemoteServer) CreateDatabase(db string) (*meta.DatabaseInfo, error) {
|
||||
stmt := fmt.Sprintf("CREATE+DATABASE+%s", db)
|
||||
|
||||
_, err := s.HTTPPost(s.URL()+"/query?q="+stmt, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meta.DatabaseInfo{}, nil
|
||||
}
|
||||
|
||||
func (s *RemoteServer) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec, makeDefault bool) error {
|
||||
if _, err := s.CreateDatabase(db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stmt := fmt.Sprintf("CREATE+RETENTION+POLICY+%s+ON+\"%s\"+DURATION+%s+REPLICATION+%v+SHARD+DURATION+%s",
|
||||
rp.Name, db, rp.Duration, *rp.ReplicaN, rp.ShardGroupDuration)
|
||||
if makeDefault {
|
||||
stmt += "+DEFAULT"
|
||||
}
|
||||
|
||||
_, err := s.HTTPPost(s.URL()+"/query?q="+stmt, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *RemoteServer) CreateSubscription(database, rp, name, mode string, destinations []string) error {
|
||||
dests := make([]string, 0, len(destinations))
|
||||
for _, d := range destinations {
|
||||
dests = append(dests, "'"+d+"'")
|
||||
}
|
||||
|
||||
stmt := fmt.Sprintf("CREATE+SUBSCRIPTION+%s+ON+\"%s\".\"%s\"+DESTINATIONS+%v+%s",
|
||||
name, database, rp, mode, strings.Join(dests, ","))
|
||||
|
||||
_, err := s.HTTPPost(s.URL()+"/query?q="+stmt, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *RemoteServer) DropDatabase(db string) error {
|
||||
stmt := fmt.Sprintf("DROP+DATABASE+%s", db)
|
||||
|
||||
_, err := s.HTTPPost(s.URL()+"/query?q="+stmt, nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// Reset attempts to remove all database state by dropping everything
|
||||
func (s *RemoteServer) Reset() error {
|
||||
stmt := "SHOW+DATABASES"
|
||||
results, err := s.HTTPPost(s.URL()+"/query?q="+stmt, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp := &httpd.Response{}
|
||||
if resp.UnmarshalJSON([]byte(results)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, db := range resp.Results[0].Series[0].Values {
|
||||
if err := s.DropDatabase(fmt.Sprintf("%s", db[0])); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func (s *RemoteServer) WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error {
|
||||
panic("WritePoints not implemented")
|
||||
}
|
||||
|
||||
// NewServer returns a new instance of Server.
|
||||
func NewServer(c *Config) Server {
|
||||
buildInfo := &run.BuildInfo{
|
||||
|
@ -164,22 +60,6 @@ func NewServer(c *Config) Server {
|
|||
Branch: "testBranch",
|
||||
}
|
||||
|
||||
// If URL exists, create a server that will run against a remote endpoint
|
||||
if url := os.Getenv("URL"); url != "" {
|
||||
s := &RemoteServer{
|
||||
url: url,
|
||||
client: &client{
|
||||
URLFn: func() string {
|
||||
return url
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := s.Reset(); err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// Otherwise create a local server
|
||||
srv, _ := run.NewServer(c.Config, buildInfo)
|
||||
s := LocalServer{
|
||||
|
@ -295,6 +175,12 @@ func (s *LocalServer) URL() string {
|
|||
panic("httpd server not found in services")
|
||||
}
|
||||
|
||||
func (s *LocalServer) TcpAddr() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return "tcp://" + s.Listener.Addr().String()
|
||||
}
|
||||
|
||||
func (s *LocalServer) CreateDatabase(db string) (*meta.DatabaseInfo, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
|
|
@ -333,10 +333,6 @@ func TestServer_RetentionPolicyCommands(t *testing.T) {
|
|||
s := OpenServer(c)
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot alter auto create rp remotely")
|
||||
}
|
||||
|
||||
test := tests.load(t, "retention_policy_commands")
|
||||
|
||||
// Create a database.
|
||||
|
@ -426,10 +422,6 @@ func TestServer_ShowDatabases_WithAuth(t *testing.T) {
|
|||
s := OpenServer(c)
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot enable auth on remote server")
|
||||
}
|
||||
|
||||
adminParams := map[string][]string{"u": []string{"admin"}, "p": []string{"admin"}}
|
||||
readerParams := map[string][]string{"u": []string{"reader"}, "p": []string{"r"}}
|
||||
writerParams := map[string][]string{"u": []string{"writer"}, "p": []string{"w"}}
|
||||
|
@ -1280,10 +1272,6 @@ func TestServer_Query_MaxSelectSeriesN(t *testing.T) {
|
|||
s := OpenServer(config)
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot modify MaxSelectSeriesN remotely")
|
||||
}
|
||||
|
||||
test := NewTest("db0", "rp0")
|
||||
test.writes = Writes{
|
||||
&Write{data: `cpu,host=server01 value=1.0 0`},
|
||||
|
@ -9324,10 +9312,6 @@ func TestServer_Query_LargeTimestamp(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot restart remote server")
|
||||
}
|
||||
|
||||
writes := []string{
|
||||
fmt.Sprintf(`cpu value=100 %d`, models.MaxNanoTime),
|
||||
}
|
||||
|
@ -9424,9 +9408,6 @@ func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
|
|||
s := OpenDefaultServer(NewConfig())
|
||||
defer s.Close()
|
||||
|
||||
if _, ok := s.(*RemoteServer); ok {
|
||||
t.Skip("Skipping. Cannot access PointsWriter remotely")
|
||||
}
|
||||
// goroutine to write points
|
||||
done := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
|
|
|
@ -342,6 +342,8 @@ func (s *Shard) Open() error {
|
|||
}
|
||||
|
||||
// Load metadata index for the inmem index only.
|
||||
// If the shard already has data, this is what loads the series ids into the series file, even
|
||||
// if we are on TSI indexing.
|
||||
if err := e.LoadMetadataIndex(s.id, s.index); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue