Add /cluster/servers endpoint to get server information.
parent
8947bb7673
commit
95d7831860
|
@ -1,6 +1,7 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
. "common"
|
||||
"coordinator"
|
||||
|
@ -29,15 +30,17 @@ type HttpServer struct {
|
|||
coordinator coordinator.Coordinator
|
||||
userManager coordinator.UserManager
|
||||
shutdown chan bool
|
||||
clusterConfig *cluster.ClusterConfiguration
|
||||
}
|
||||
|
||||
func NewHttpServer(httpPort string, adminAssetsDir string, theCoordinator coordinator.Coordinator, userManager coordinator.UserManager) *HttpServer {
|
||||
func NewHttpServer(httpPort string, adminAssetsDir string, theCoordinator coordinator.Coordinator, userManager coordinator.UserManager, clusterConfig *cluster.ClusterConfiguration) *HttpServer {
|
||||
self := &HttpServer{}
|
||||
self.httpPort = httpPort
|
||||
self.adminAssetsDir = adminAssetsDir
|
||||
self.coordinator = theCoordinator
|
||||
self.userManager = userManager
|
||||
self.shutdown = make(chan bool, 2)
|
||||
self.clusterConfig = clusterConfig
|
||||
return self
|
||||
}
|
||||
|
||||
|
@ -122,6 +125,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
|
|||
// fetch current list of available interfaces
|
||||
self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces)
|
||||
|
||||
// cluster config endpoints
|
||||
self.registerEndpoint(p, "get", "/cluster/servers", self.listServers)
|
||||
|
||||
go self.startSsl(p)
|
||||
|
||||
if listener == nil {
|
||||
|
@ -856,3 +862,14 @@ func (self *HttpServer) deleteDbContinuousQueries(w libhttp.ResponseWriter, r *l
|
|||
return libhttp.StatusOK, nil
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) listServers(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) {
|
||||
servers := self.clusterConfig.Servers()
|
||||
serverMaps := make([]map[string]interface{}, len(servers), len(servers))
|
||||
for i, s := range servers {
|
||||
serverMaps[i] = map[string]interface{}{"id": s.Id(), "protobufConnectString": s.ProtobufConnectionString}
|
||||
}
|
||||
return libhttp.StatusOK, serverMaps
|
||||
})
|
||||
}
|
||||
|
|
|
@ -180,7 +180,7 @@ func (self *ApiSuite) SetUpSuite(c *C) {
|
|||
dbUsers: map[string][]string{"db1": []string{"db_user1"}},
|
||||
}
|
||||
dir := c.MkDir()
|
||||
self.server = NewHttpServer("", dir, self.coordinator, self.manager)
|
||||
self.server = NewHttpServer("", dir, self.coordinator, self.manager, nil)
|
||||
var err error
|
||||
self.listener, err = net.Listen("tcp4", ":8081")
|
||||
c.Assert(err, IsNil)
|
||||
|
|
|
@ -147,6 +147,20 @@ func (self *ServerProcess) Post(url, data string, c *C) *http.Response {
|
|||
return self.Request("POST", url, data, c)
|
||||
}
|
||||
|
||||
func (self *ServerProcess) PostGetBody(url, data string, c *C) []byte {
|
||||
resp := self.Request("POST", url, data, c)
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
return body
|
||||
}
|
||||
|
||||
func (self *ServerProcess) Get(url string, c *C) []byte {
|
||||
resp := self.Request("GET", url, "", c)
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
return body
|
||||
}
|
||||
|
||||
func (self *ServerProcess) Request(method, url, data string, c *C) *http.Response {
|
||||
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.apiPort, url)
|
||||
req, err := http.NewRequest(method, fullUrl, bytes.NewBufferString(data))
|
||||
|
@ -894,3 +908,16 @@ func (self *ServerSuite) TestContinuousQueryGroupByOperations(c *C) {
|
|||
self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 1;", false, c)
|
||||
self.serverProcesses[0].QueryAsRoot("test_cq", "drop continuous query 2;", false, c)
|
||||
}
|
||||
|
||||
func (self *ServerSuite) TestGetServers(c *C) {
|
||||
body := self.serverProcesses[0].Get("/cluster/servers?u=root&p=root", c)
|
||||
|
||||
res := make([]interface{}, 0)
|
||||
err := json.Unmarshal(body, &res)
|
||||
c.Assert(err, IsNil)
|
||||
for _, js := range res {
|
||||
server := js.(map[string]interface{})
|
||||
c.Assert(server["id"], NotNil)
|
||||
c.Assert(server["protobufConnectString"], NotNil)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
|
|||
protobufServer := coordinator.NewProtobufServer(config.ProtobufPortString(), requestHandler)
|
||||
|
||||
raftServer.AssignCoordinator(coord)
|
||||
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord)
|
||||
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.AdminAssetsDir, coord, coord, clusterConfig)
|
||||
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
|
||||
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())
|
||||
|
||||
|
|
Loading…
Reference in New Issue