diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 9d6775812d..a9123af0b7 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -64,8 +64,11 @@ type Handler struct { Ping(checkAllMetaServers bool) error } - QueryAuthorizer *meta.QueryAuthorizer - QueryExecutor influxql.QueryExecutor + QueryAuthorizer interface { + AuthorizeQuery(u *meta.UserInfo, query *influxql.Query, database string) error + } + + QueryExecutor influxql.QueryExecutor PointsWriter interface { WritePoints(p *cluster.WritePointsRequest) error diff --git a/services/meta/data.go b/services/meta/data.go index 45a6b6e460..6a6c57e7b2 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -1,8 +1,10 @@ package meta import ( + "errors" "fmt" "sort" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -1524,6 +1526,50 @@ func (ui *UserInfo) unmarshal(pb *internal.UserInfo) { } } +type Lease struct { + Name string `json:"name"` + Expiration time.Time `json:"expiration"` + Owner uint64 `json:"owner"` +} + +type Leases struct { + mu sync.Mutex + m map[string]*Lease + d time.Duration +} + +func NewLeases(d time.Duration) *Leases { + return &Leases{ + m: make(map[string]*Lease), + d: d, + } +} + +func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) { + leases.mu.Lock() + defer leases.mu.Unlock() + + l, ok := leases.m[name] + if ok { + if time.Now().After(l.Expiration) || l.Owner == nodeID { + l.Expiration = time.Now().Add(leases.d) + l.Owner = nodeID + return l, nil + } + return l, errors.New("another node has the lease") + } + + l = &Lease{ + Name: name, + Expiration: time.Now().Add(leases.d), + Owner: nodeID, + } + + leases.m[name] = l + + return l, nil +} + // MarshalTime converts t to nanoseconds since epoch. A zero time returns 0. func MarshalTime(t time.Time) int64 { if t.IsZero() { diff --git a/services/meta/handler.go b/services/meta/handler.go index 9698ef9b4f..82d10f47c6 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -479,47 +479,3 @@ func (h *handler) httpError(err error, w http.ResponseWriter, status int) { } http.Error(w, "", status) } - -type Lease struct { - Name string `json:"name"` - Expiration time.Time `json:"expiration"` - Owner uint64 `json:"owner"` -} - -type Leases struct { - mu sync.Mutex - m map[string]*Lease - d time.Duration -} - -func NewLeases(d time.Duration) *Leases { - return &Leases{ - m: make(map[string]*Lease), - d: d, - } -} - -func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) { - leases.mu.Lock() - defer leases.mu.Unlock() - - l, ok := leases.m[name] - if ok { - if time.Now().After(l.Expiration) || l.Owner == nodeID { - l.Expiration = time.Now().Add(leases.d) - l.Owner = nodeID - return l, nil - } - return l, errors.New("another node has the lease") - } - - l = &Lease{ - Name: name, - Expiration: time.Now().Add(leases.d), - Owner: nodeID, - } - - leases.m[name] = l - - return l, nil -}