write the http wrapper for the user manager.
parent
5b1c5d3a60
commit
21750b7c55
|
@ -19,14 +19,16 @@ type HttpServer struct {
|
|||
httpAddr string
|
||||
engine engine.EngineI
|
||||
coordinator coordinator.Coordinator
|
||||
userManager coordinator.UserManager
|
||||
shutdown chan bool
|
||||
}
|
||||
|
||||
func NewHttpServer(httpAddr string, theEngine engine.EngineI, theCoordinator coordinator.Coordinator) *HttpServer {
|
||||
func NewHttpServer(httpAddr string, theEngine engine.EngineI, theCoordinator coordinator.Coordinator, userManager coordinator.UserManager) *HttpServer {
|
||||
self := &HttpServer{}
|
||||
self.httpAddr = httpAddr
|
||||
self.engine = theEngine
|
||||
self.coordinator = theCoordinator
|
||||
self.userManager = userManager
|
||||
self.shutdown = make(chan bool)
|
||||
return self
|
||||
}
|
||||
|
@ -51,18 +53,18 @@ func (self *HttpServer) Serve(listener net.Listener) {
|
|||
p.Post("/db/:db/series", CorsHeaderHandler(self.writePoints))
|
||||
p.Post("/db", CorsHeaderHandler(self.createDatabase))
|
||||
|
||||
// user management interface
|
||||
// cluster admins management interface
|
||||
|
||||
p.Post("/admin/users/:user", CorsHeaderHandler(self.createUser))
|
||||
p.Del("/admin/users/:user", CorsHeaderHandler(self.removeUser))
|
||||
// {"password": "password"}
|
||||
p.Post("/admin/users/:user/password", CorsHeaderHandler(self.changePassword))
|
||||
// no body, adds/remove the given user to the cluster admins
|
||||
p.Post("/admin/cluster_admins/:user", CorsHeaderHandler(self.addToClusterAdmins))
|
||||
p.Del("/admin/clutser_admins/:user", CorsHeaderHandler(self.deleteFromClusterAdmins))
|
||||
// no body, adss/remove the given user as a db admin
|
||||
p.Post("/admin/db/:db/admins/:user", CorsHeaderHandler(self.addToDbAdmins))
|
||||
p.Del("/admin/db/:db/admins/:user", CorsHeaderHandler(self.deleteFromDbAdmins))
|
||||
p.Post("/cluster_admins/:user", CorsHeaderHandler(self.createClusterAdmin))
|
||||
p.Post("/cluster_admins/:user/password/:password", CorsHeaderHandler(self.changeClusterAdminPassword))
|
||||
p.Del("/cluster_admins/:user", CorsHeaderHandler(self.deleteClusterAdmin))
|
||||
|
||||
// // db users management interface
|
||||
|
||||
p.Post("/db/:db/users/:user", CorsHeaderHandler(self.createDbUser))
|
||||
p.Del("/db/:db/users/:user", CorsHeaderHandler(self.deleteDbUser))
|
||||
p.Post("/db/:db/users/:user/password/:password", CorsHeaderHandler(self.changeDbUserPassword))
|
||||
p.Post("/db/:db/admins/:user", CorsHeaderHandler(self.setDbAdmin))
|
||||
|
||||
if err := libhttp.Serve(listener, p); err != nil && !strings.Contains(err.Error(), "closed network") {
|
||||
panic(err)
|
||||
|
@ -411,166 +413,136 @@ func serializeSeries(memSeries map[string]*protocol.Series, precision TimePrecis
|
|||
return serializedSeries
|
||||
}
|
||||
|
||||
// user management
|
||||
// // cluster admins management interface
|
||||
|
||||
func (self *HttpServer) getUserRequestingOperation(r *libhttp.Request) (*coordinator.User, error) {
|
||||
func (self *HttpServer) tryAsClusterAdmin(w libhttp.ResponseWriter, r *libhttp.Request, yield func(coordinator.User) (int, string)) {
|
||||
username := r.URL.Query().Get("username")
|
||||
password := r.URL.Query().Get("password")
|
||||
|
||||
if username == "" {
|
||||
return nil, fmt.Errorf("Empty username")
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
w.Write([]byte("Invalid username/password"))
|
||||
}
|
||||
|
||||
return self.coordinator.GetUser(username, password)
|
||||
}
|
||||
|
||||
func (self *HttpServer) createUser(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
user, err := self.getUserRequestingOperation(r)
|
||||
user, err := self.userManager.AuthenticateClusterAdmin(username, password)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
w.Write([]byte(err.Error()))
|
||||
}
|
||||
|
||||
username := r.URL.Query().Get(":user")
|
||||
newUser, err := user.CreateUser(username)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
statusCode, errStr := yield(user)
|
||||
w.WriteHeader(statusCode)
|
||||
if statusCode != libhttp.StatusOK {
|
||||
w.Write([]byte(errStr))
|
||||
}
|
||||
|
||||
err = self.coordinator.SaveUser(newUser)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(libhttp.StatusOK)
|
||||
}
|
||||
|
||||
func (self *HttpServer) removeUser(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
user, err := self.getUserRequestingOperation(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
func (self *HttpServer) createClusterAdmin(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
|
||||
username := r.URL.Query().Get(":user")
|
||||
newUser := self.coordinator.GetUserWithoutPassword(username)
|
||||
if newUser == nil {
|
||||
w.WriteHeader(libhttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
err = user.DeleteUser(newUser)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
err = self.coordinator.SaveUser(newUser)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(libhttp.StatusOK)
|
||||
self.tryAsClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.CreateClusterAdminUser(u, newUser); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) changePassword(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
user, err := self.getUserRequestingOperation(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
func (self *HttpServer) deleteClusterAdmin(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
|
||||
username := r.URL.Query().Get(":user")
|
||||
password := r.URL.Query().Get(":password")
|
||||
|
||||
newUser := self.coordinator.GetUserWithoutPassword(username)
|
||||
if newUser == nil {
|
||||
w.WriteHeader(libhttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
err = user.ChangePassword(newUser, password)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(libhttp.StatusOK)
|
||||
self.tryAsClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.DeleteClusterAdminUser(u, newUser); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) deleteFromClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.addDeleteToClusterAdmins(w, r, false)
|
||||
func (self *HttpServer) changeClusterAdminPassword(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
newPassword := r.URL.Query().Get(":password")
|
||||
|
||||
self.tryAsClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.ChangeClusterAdminPassword(u, newUser, newPassword); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) addToClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.addDeleteToClusterAdmins(w, r, true)
|
||||
}
|
||||
// // db users management interface
|
||||
|
||||
func (self *HttpServer) addDeleteToClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request, add bool) {
|
||||
user, err := self.getUserRequestingOperation(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
username := r.URL.Query().Get(":user")
|
||||
otherUser := self.coordinator.GetUserWithoutPassword(username)
|
||||
if otherUser == nil {
|
||||
w.WriteHeader(libhttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
err = user.SetClusterAdmin(otherUser, add)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
err = self.coordinator.SaveUser(user)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(libhttp.StatusOK)
|
||||
}
|
||||
|
||||
func (self *HttpServer) deleteFromDbAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.addDeleteToDbAdmins(w, r, false)
|
||||
}
|
||||
|
||||
func (self *HttpServer) addToDbAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
self.addDeleteToDbAdmins(w, r, true)
|
||||
}
|
||||
|
||||
func (self *HttpServer) addDeleteToDbAdmins(w libhttp.ResponseWriter, r *libhttp.Request, add bool) {
|
||||
user, err := self.getUserRequestingOperation(r)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
username := r.URL.Query().Get(":user")
|
||||
func (self *HttpServer) tryAsDbUserAndClusterAdmin(w libhttp.ResponseWriter, r *libhttp.Request, yield func(coordinator.User) (int, string)) {
|
||||
username := r.URL.Query().Get("username")
|
||||
password := r.URL.Query().Get("password")
|
||||
db := r.URL.Query().Get(":db")
|
||||
otherUser := self.coordinator.GetUserWithoutPassword(username)
|
||||
if otherUser == nil {
|
||||
w.WriteHeader(libhttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if add {
|
||||
err = user.SetDbAdmin(otherUser, db)
|
||||
} else {
|
||||
err = user.RemoveDbAdmin(otherUser, db)
|
||||
}
|
||||
if err != nil {
|
||||
|
||||
if username == "" {
|
||||
w.WriteHeader(libhttp.StatusUnauthorized)
|
||||
return
|
||||
w.Write([]byte("Invalid username/password"))
|
||||
}
|
||||
err = self.coordinator.SaveUser(user)
|
||||
|
||||
user, err := self.userManager.AuthenticateDbUser(db, username, password)
|
||||
if err != nil {
|
||||
w.WriteHeader(libhttp.StatusInternalServerError)
|
||||
self.tryAsClusterAdmin(w, r, yield)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(libhttp.StatusOK)
|
||||
|
||||
statusCode, _ := yield(user)
|
||||
if statusCode != libhttp.StatusOK {
|
||||
self.tryAsClusterAdmin(w, r, yield)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
// // no body, adss/remove the given user as a db admin
|
||||
// p.Post("/users/:user/db_admin/:db", CorsHeaderHandler(self.deleteToDbAdmins))
|
||||
// p.Del("/users/:user/db_admin/:db", CorsHeaderHandler(self.deleteFromDbAdmins))
|
||||
func (self *HttpServer) createDbUser(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
self.tryAsDbUserAndClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.CreateDbUser(u, db, newUser); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) deleteDbUser(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
self.tryAsDbUserAndClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.DeleteDbUser(u, db, newUser); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) changeDbUserPassword(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
newPassword := r.URL.Query().Get(":password")
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
self.tryAsDbUserAndClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.ChangeDbUserPassword(u, db, newUser, newPassword); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
||||
func (self *HttpServer) setDbAdmin(w libhttp.ResponseWriter, r *libhttp.Request) {
|
||||
newUser := r.URL.Query().Get(":user")
|
||||
db := r.URL.Query().Get(":db")
|
||||
|
||||
self.tryAsDbUserAndClusterAdmin(w, r, func(u coordinator.User) (int, string) {
|
||||
if err := self.userManager.SetDbAdmin(u, db, newUser); err != nil {
|
||||
return libhttp.StatusUnauthorized, err.Error()
|
||||
}
|
||||
return libhttp.StatusOK, ""
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ type ApiSuite struct {
|
|||
listener net.Listener
|
||||
server *HttpServer
|
||||
coordinator *MockCoordinator
|
||||
manager *MockUserManager
|
||||
}
|
||||
|
||||
var _ = Suite(&ApiSuite{})
|
||||
|
@ -108,35 +109,6 @@ func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) GetUser(username, password string) (*coordinator.User, error) {
|
||||
user := self.users[username]
|
||||
if user != nil {
|
||||
// assume that username and password are the same
|
||||
if username != password {
|
||||
return nil, fmt.Errorf("invlaid password")
|
||||
}
|
||||
|
||||
return user, nil
|
||||
}
|
||||
|
||||
if username == "root" {
|
||||
user = coordinator.CreateTestUser("root", true)
|
||||
}
|
||||
user = coordinator.CreateTestUser(username, false)
|
||||
self.users[username] = user
|
||||
return user, nil
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) GetUserWithoutPassword(username string) *coordinator.User {
|
||||
user, _ := self.GetUser(username, username)
|
||||
return user
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) SaveUser(user *coordinator.User) error {
|
||||
self.users[user.GetName()] = user
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ApiSuite) formatUrl(path string, args ...interface{}) string {
|
||||
path = fmt.Sprintf(path, args...)
|
||||
port := self.listener.Addr().(*net.TCPAddr).Port
|
||||
|
@ -147,7 +119,8 @@ func (self *ApiSuite) SetUpSuite(c *C) {
|
|||
self.coordinator = &MockCoordinator{
|
||||
users: map[string]*coordinator.User{},
|
||||
}
|
||||
self.server = NewHttpServer("", &MockEngine{}, self.coordinator)
|
||||
self.manager = &MockUserManager{}
|
||||
self.server = NewHttpServer("", &MockEngine{}, self.coordinator, self.manager)
|
||||
var err error
|
||||
self.listener, err = net.Listen("tcp4", ":8081")
|
||||
c.Assert(err, IsNil)
|
||||
|
@ -163,6 +136,9 @@ func (self *ApiSuite) TearDownSuite(c *C) {
|
|||
|
||||
func (self *ApiSuite) SetUpTest(c *C) {
|
||||
self.coordinator.series = nil
|
||||
self.manager.operation = ""
|
||||
self.manager.username = ""
|
||||
self.manager.password = ""
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestQueryWithSecondsPrecision(c *C) {
|
||||
|
@ -355,10 +331,58 @@ func (self *ApiSuite) TestCreateDatabase(c *C) {
|
|||
c.Assert(self.coordinator.requestingApiKey, Equals, "asdf")
|
||||
}
|
||||
|
||||
// func (self *ApiSuite) TestClusterAdminOperations(c *C) {
|
||||
// url := self.formatUrl("/admin/users/new_user?username=root&password=root")
|
||||
// resp, err := http.Post(url, "", nil)
|
||||
// c.Assert(err, IsNil)
|
||||
// defer resp.Body.Close()
|
||||
// c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
// }
|
||||
func (self *ApiSuite) TestClusterAdminOperations(c *C) {
|
||||
url := self.formatUrl("/cluster_admins/new_user?username=root&password=root")
|
||||
resp, err := libhttp.Post(url, "", nil)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "cluster_admin_add")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
|
||||
url = self.formatUrl("/cluster_admins/new_user/password/new_pass?username=root&password=root")
|
||||
resp, err = libhttp.Post(url, "", nil)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "cluster_admin_passwd")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
c.Assert(self.manager.password, Equals, "new_pass")
|
||||
|
||||
url = self.formatUrl("/cluster_admins/new_user?username=root&password=root")
|
||||
req, _ := libhttp.NewRequest("DELETE", url, nil)
|
||||
resp, err = libhttp.DefaultClient.Do(req)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "cluster_admin_del")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestDbUSerOperations(c *C) {
|
||||
url := self.formatUrl("/db/db1/users/new_user?username=root&password=root")
|
||||
resp, err := libhttp.Post(url, "", nil)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "db_user_add")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
|
||||
url = self.formatUrl("/db/db1/users/new_user/password/new_pass?username=root&password=root")
|
||||
resp, err = libhttp.Post(url, "", nil)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "db_user_passwd")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
c.Assert(self.manager.password, Equals, "new_pass")
|
||||
|
||||
url = self.formatUrl("/db/db1/users/new_user?username=root&password=root")
|
||||
req, _ := libhttp.NewRequest("DELETE", url, nil)
|
||||
resp, err = libhttp.DefaultClient.Do(req)
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
c.Assert(self.manager.operation, Equals, "db_user_del")
|
||||
c.Assert(self.manager.username, Equals, "new_user")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue