Wire up drop series query.

* Added ability to have a query spec execute against all servers in a shard.
* Ensured that both deletes and drops run against all servers.
pull/269/head
Paul Dix 2014-02-17 15:55:39 -05:00
parent a3ebd57316
commit 9f51e96cbc
10 changed files with 147 additions and 158 deletions

View File

@ -508,7 +508,10 @@ func (self *HttpServer) dropSeries(w libhttp.ResponseWriter, r *libhttp.Request)
series := r.URL.Query().Get(":series")
self.tryAsDbUserAndClusterAdmin(w, r, func(user common.User) (int, interface{}) {
err := self.coordinator.DropSeries(user, db, series)
f := func(s *protocol.Series) error {
return nil
}
err := self.coordinator.RunQuery(user, db, fmt.Sprintf("drop series %s", series), f)
if err != nil {
return errorToStatusCode(err), err.Error()
}

View File

@ -43,6 +43,10 @@ type ShardCreator interface {
CreateShards(shards []*NewShardData) ([]*ShardData, error)
}
const (
FIRST_LOWER_CASE_CHARACTER = 97
)
/*
This struct stores all the metadata confiugration information about a running cluster. This includes
the servers in the cluster and their state, databases, users, and which continuous queries are running.
@ -714,7 +718,7 @@ func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series st
shardType := SHORT_TERM
firstChar := series[0]
if firstChar < 97 {
if firstChar < FIRST_LOWER_CASE_CHARACTER {
shardType = LONG_TERM
shards = self.longTermShards
// split = self.config.LongTermShard.Split
@ -813,7 +817,15 @@ func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoc
return &startTime, &endTime
}
func (self *ClusterConfiguration) GetShards(querySpec QuerySpec) []*ShardData {
func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*ShardData {
if querySpec.IsDropSeriesQuery() {
seriesName := querySpec.Query().DropSeriesQuery.GetTableName()
if seriesName[0] < FIRST_LOWER_CASE_CHARACTER {
return self.longTermShards
}
return self.shortTermShards
}
shouldQueryShortTerm, shouldQueryLongTerm := querySpec.ShouldQueryShortTermAndLongTerm()
if shouldQueryLongTerm && shouldQueryShortTerm {

View File

@ -178,65 +178,79 @@ func (self *ShardData) WriteLocalOnly(request *protocol.Request) error {
}
func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protocol.Response) error {
// This is only for queries that are deletes or drops. They need to be sent everywhere as opposed to just the local or one of the remote shards.
// But this boolean should only be set to true on the server that receives the initial query.
if querySpec.RunAgainstAllServersInShard {
if querySpec.IsDeleteFromSeriesQuery() {
return self.logAndHandleDeleteQuery(querySpec, response)
} else if querySpec.IsDropSeriesQuery() {
return self.logAndHandleDropSeriesQuery(querySpec, response)
}
}
if self.localShard != nil {
var processor QueryProcessor
if querySpec.IsListSeriesQuery() {
processor = engine.NewListSeriesEngine(response)
} else if querySpec.IsDeleteFromSeriesQuery() {
} else if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() {
processor = engine.NewPassthroughEngine(response)
} else {
processor = engine.NewQueryEngine(querySpec.SelectQuery(), response)
}
fmt.Println("SHARD query local: ", self.id)
err := self.localShard.Query(querySpec, processor)
processor.Close()
return err
}
if querySpec.IsDeleteFromSeriesQuery() {
return self.logAndHandleDeleteQuery(querySpec, response)
}
randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers)))
server := self.clusterServers[randServerIndex]
queryString := querySpec.GetQueryString()
user := querySpec.User()
userName := user.GetName()
database := querySpec.Database()
isDbUser := !user.IsClusterAdmin()
request := &protocol.Request{Type: &queryRequest, ShardId: &self.id, Query: &queryString, UserName: &userName, Database: &database, IsDbUser: &isDbUser}
request := self.createRequest(querySpec)
return server.MakeRequest(request, response)
}
func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error {
user := querySpec.User()
userName := user.GetName()
database := querySpec.Database()
isDbUser := !user.IsClusterAdmin()
queryString := querySpec.GetQueryStringWithTimeCondition()
request := &protocol.Request{
Type: &queryRequest,
ShardId: &self.id,
Query: &queryString,
UserName: &userName,
Database: &database,
IsDbUser: &isDbUser,
}
request := self.createRequest(querySpec)
request.Query = &queryString
return self.logAndHandleDestructiveQuery(querySpec, request, response)
}
func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error {
return self.logAndHandleDestructiveQuery(querySpec, self.createRequest(querySpec), response)
}
func (self *ShardData) logAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *protocol.Request, response chan *protocol.Response) error {
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers)
if err != nil {
return err
}
responses := make([]chan *protocol.Response, len(self.clusterServers), len(self.clusterServers))
for i, server := range self.clusterServers {
fmt.Println("SHARD: requesting to server: ", server.id)
responseChan := make(chan *protocol.Response, 1)
responses[i] = responseChan
server.MakeRequest(request, responseChan)
}
if self.localShard != nil {
responseChan := make(chan *protocol.Response, 1)
responses = append(responses, responseChan)
processor := engine.NewPassthroughEngine(responseChan)
err := self.localShard.Query(querySpec, processor)
processor.Close()
if err != nil {
return err
}
}
for i, responseChan := range responses {
for {
res := <-responseChan
if *res.Type == endStreamResponse {
self.wal.Commit(requestNumber, self.clusterServers[i])
// don't need to do a commit for the local datastore for now.
if i < len(self.clusterServers) {
self.wal.Commit(requestNumber, self.clusterServers[i])
}
break
}
response <- res
@ -246,6 +260,23 @@ func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, resp
return nil
}
func (self *ShardData) createRequest(querySpec *parser.QuerySpec) *protocol.Request {
queryString := querySpec.GetQueryString()
user := querySpec.User()
userName := user.GetName()
database := querySpec.Database()
isDbUser := !user.IsClusterAdmin()
return &protocol.Request{
Type: &queryRequest,
ShardId: &self.id,
Query: &queryString,
UserName: &userName,
Database: &database,
IsDbUser: &isDbUser,
}
}
// used to serialize shards when sending around in raft or when snapshotting in the log
func (self *ShardData) ToNewShardData() *NewShardData {
return &NewShardData{

View File

@ -50,8 +50,6 @@ var (
proxyWrite = protocol.Request_PROXY_WRITE
proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE
replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE
proxyDropSeries = protocol.Request_PROXY_DROP_SERIES
replicateDropSeries = protocol.Request_REPLICATION_DROP_SERIES
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
@ -129,7 +127,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt
}
if query.DropSeriesQuery != nil {
err := self.DropSeries(user, database, query.DropSeriesQuery.GetTableName())
err := self.runDropSeriesQuery(querySpec, yield)
if err != nil {
return err
}
@ -150,29 +148,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt
// This should only get run for SelectQuery types
func (self *CoordinatorImpl) runQuery(query *parser.Query, user common.User, database string, yield seriesYieldFunc) error {
querySpec := parser.NewQuerySpec(user, database, query)
shards := self.clusterConfiguration.GetShards(querySpec)
fmt.Println("COORD: runQuery shards ")
for _, s := range shards {
fmt.Println("shard: ", s)
}
fmt.Println("**************************")
responses := make([]chan *protocol.Response, len(shards), len(shards))
for i, shard := range shards {
responseChan := make(chan *protocol.Response, 1)
go shard.Query(querySpec, responseChan)
responses[i] = responseChan
}
for _, responseChan := range responses {
for {
response := <-responseChan
if *response.Type == endStreamResponse {
break
}
yield(response.Series)
}
}
return nil
return self.runQuerySpec(querySpec, yield)
}
func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, yield seriesYieldFunc) error {
@ -220,11 +196,27 @@ func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield s
if !querySpec.User().IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
}
querySpec.RunAgainstAllServersInShard = true
return self.runQuerySpec(querySpec, yield)
}
func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, yield seriesYieldFunc) error {
user := querySpec.User()
db := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) {
return common.NewAuthorizationError("Insufficient permission to drop series")
}
querySpec.RunAgainstAllServersInShard = true
return self.runQuerySpec(querySpec, yield)
}
func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, yield seriesYieldFunc) error {
shards := self.clusterConfiguration.GetShards(querySpec)
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
responseChan := make(chan *protocol.Response, 1)
shard.Query(querySpec, responseChan)
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
@ -238,6 +230,7 @@ func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield s
}
}
return nil
}
func recoverFunc(database, query string) {
@ -902,40 +895,6 @@ func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, d
return self.proxyUntilSuccess(servers, request)
}
func (self *CoordinatorImpl) handleDropSeries(server *cluster.ClusterServer, database, series string) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
request := self.createRequest(proxyDropSeries, &database)
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
ownId := owner.Id()
request.OwnerServerId = &ownId
request.Series = &protocol.Series{Name: &series}
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
request.ReplicationFactor = &replicationFactor
if server.Id() == self.clusterConfiguration.LocalServerId {
// this is a local delete
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &ownId)
if err != nil {
return self.proxyUntilSuccess(servers, request)
}
self.datastore.DropSeries(database, series)
if err != nil {
log.Error("Couldn't write data to local store: ", err, request)
}
// ignoring the error because we still want to send to replicas
request.Type = &replicateDropSeries
self.sendRequestToReplicas(request, servers)
return nil
}
// otherwise, proxy the request
return self.proxyUntilSuccess(servers, request)
}
func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protocol.Series) error {
return self.datastore.WriteSeriesData(*db, series)
}
@ -1121,25 +1080,6 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
return nil
}
func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) {
return common.NewAuthorizationError("Insufficient permission to drop series")
}
if self.clusterConfiguration.IsSingleServer() {
return self.datastore.DropSeries(db, series)
}
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
for _, server := range servers {
if err := self.handleDropSeries(server.Server, db, series); err != nil {
return err
}
}
return nil
}
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {
log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
user, err := self.clusterConfiguration.AuthenticateDbUser(db, username, password)

View File

@ -20,7 +20,6 @@ type Coordinator interface {
DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error
WriteSeriesData(user common.User, db string, series *protocol.Series) error
DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*cluster.Database, error)

View File

@ -62,38 +62,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
// TODO: add quorum writes?
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_PROXY_DROP_SERIES {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
request.OriginatingServerId = &self.clusterConfig.LocalServerId
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
err = self.db.DropSeries(*request.Database, *request.Series.Name)
if err != nil {
return err
}
err = self.WriteResponse(conn, response)
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_REPLICATION_DROP_SERIES {
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
switch err := err.(type) {
case datastore.SequenceMissingRequestsError:
log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence)
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
return nil
default:
return err
}
}
return self.db.DropSeries(*request.Database, *request.Series.Name)
} else if *request.Type == protocol.Request_PROXY_DROP_DATABASE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

View File

@ -92,6 +92,8 @@ func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.Q
return self.executeListSeriesQuery(querySpec, processor)
} else if querySpec.IsDeleteFromSeriesQuery() {
return self.executeDeleteQuery(querySpec, processor)
} else if querySpec.IsDropSeriesQuery() {
return self.executeDropSeriesQuery(querySpec, processor)
}
seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns()
@ -301,6 +303,29 @@ func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, proces
return nil
}
func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
database := querySpec.Database()
series := querySpec.Query().DropSeriesQuery.GetTableName()
fmt.Println("DROP SERIES QUERY: ", database, series)
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}
wb := levigo.NewWriteBatch()
defer wb.Close()
for _, name := range self.getColumnNamesForSeries(database, series) {
if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil {
return err
}
indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...)
wb.Delete(indexKey)
}
// remove the column indeces for this time series
return self.db.Write(self.writeOptions, wb)
}
func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte {
timeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time))

View File

@ -598,8 +598,7 @@ func (self *ServerSuite) TestDropSeries(c *C) {
for _, s := range self.serverProcesses {
fmt.Printf("Running query against: %d\n", s.apiPort)
collection := s.Query("drop_series", "select * from cluster_query", true, c)
c.Assert(collection.GetSeries("cluster_query", c).Points, HasLen, 0)
c.Assert(collection.GetSeries("cluster_query", c).Columns, DeepEquals, []string{"time", "sequence_number"})
c.Assert(collection.Members, HasLen, 0)
}
}
}

View File

@ -177,6 +177,7 @@ type DeleteQuery struct {
}
type Query struct {
QueryString string
SelectQuery *SelectQuery
DeleteQuery *DeleteQuery
ListQuery *ListQuery
@ -189,8 +190,10 @@ func (self *Query) GetQueryString() string {
return self.SelectQuery.GetQueryString()
} else if self.ListQuery != nil {
return "list series"
} else if self.DeleteQuery != nil {
return self.DeleteQuery.GetQueryString()
}
return self.DeleteQuery.GetQueryString()
return self.QueryString
}
func (self *Query) IsListQuery() bool {
@ -484,11 +487,11 @@ func ParseQuery(query string) ([]*Query, error) {
}
if q.list_series_query != 0 {
return []*Query{&Query{ListQuery: &ListQuery{Type: Series}}}, nil
return []*Query{&Query{QueryString: query, ListQuery: &ListQuery{Type: Series}}}, nil
}
if q.list_continuous_queries_query != 0 {
return []*Query{&Query{ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil
return []*Query{&Query{QueryString: query, ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil
}
if q.select_query != nil {
@ -497,22 +500,22 @@ func ParseQuery(query string) ([]*Query, error) {
return nil, err
}
return []*Query{&Query{SelectQuery: selectQuery}}, nil
return []*Query{&Query{QueryString: query, SelectQuery: selectQuery}}, nil
} else if q.delete_query != nil {
deleteQuery, err := parseDeleteQuery(query, q.delete_query)
if err != nil {
return nil, err
}
return []*Query{&Query{DeleteQuery: deleteQuery}}, nil
return []*Query{&Query{QueryString: query, DeleteQuery: deleteQuery}}, nil
} else if q.drop_series_query != nil {
dropSeriesQuery, err := parseDropSeriesQuery(query, q.drop_series_query)
if err != nil {
return nil, err
}
return []*Query{&Query{DropSeriesQuery: dropSeriesQuery}}, nil
return []*Query{&Query{QueryString: query, DropSeriesQuery: dropSeriesQuery}}, nil
} else if q.drop_query != nil {
fmt.Println(q.drop_query.id)
return []*Query{&Query{DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil
return []*Query{&Query{QueryString: query, DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil
}
return nil, fmt.Errorf("Unknown query type encountered")
}

View File

@ -6,14 +6,15 @@ import (
)
type QuerySpec struct {
query *Query
database string
isRegex bool
names []string
user common.User
startTime time.Time
endTime time.Time
seriesValuesAndColumns map[*Value][]string
query *Query
database string
isRegex bool
names []string
user common.User
startTime time.Time
endTime time.Time
seriesValuesAndColumns map[*Value][]string
RunAgainstAllServersInShard bool
}
func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec {
@ -144,3 +145,11 @@ func (self *QuerySpec) GetQueryStringWithTimeCondition() string {
}
return self.query.GetQueryString()
}
func (self *QuerySpec) IsDropSeriesQuery() bool {
return self.query.DropSeriesQuery != nil
}
func (self *QuerySpec) Query() *Query {
return self.query
}