From 9f51e96cbca5deb81f92e202b336e576e44374d2 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Mon, 17 Feb 2014 15:55:39 -0500 Subject: [PATCH] Wire up drop series query. * Added ability to have a query spec execute against all servers in a shard. * Ensured that both deletes and drops run against all servers. --- src/api/http/api.go | 5 +- src/cluster/cluster_configuration.go | 16 +++- src/cluster/shard.go | 81 +++++++++++----- src/coordinator/coordinator.go | 100 ++++---------------- src/coordinator/interface.go | 1 - src/coordinator/protobuf_request_handler.go | 32 ------- src/datastore/leveldb_shard.go | 25 +++++ src/integration/server_test.go | 3 +- src/parser/parser.go | 17 ++-- src/parser/query_spec.go | 25 +++-- 10 files changed, 147 insertions(+), 158 deletions(-) diff --git a/src/api/http/api.go b/src/api/http/api.go index f75b2d540a..178c8b3b0e 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -508,7 +508,10 @@ func (self *HttpServer) dropSeries(w libhttp.ResponseWriter, r *libhttp.Request) series := r.URL.Query().Get(":series") self.tryAsDbUserAndClusterAdmin(w, r, func(user common.User) (int, interface{}) { - err := self.coordinator.DropSeries(user, db, series) + f := func(s *protocol.Series) error { + return nil + } + err := self.coordinator.RunQuery(user, db, fmt.Sprintf("drop series %s", series), f) if err != nil { return errorToStatusCode(err), err.Error() } diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index e9b555b383..2495855fc9 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -43,6 +43,10 @@ type ShardCreator interface { CreateShards(shards []*NewShardData) ([]*ShardData, error) } +const ( + FIRST_LOWER_CASE_CHARACTER = 97 +) + /* This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running. @@ -714,7 +718,7 @@ func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series st shardType := SHORT_TERM firstChar := series[0] - if firstChar < 97 { + if firstChar < FIRST_LOWER_CASE_CHARACTER { shardType = LONG_TERM shards = self.longTermShards // split = self.config.LongTermShard.Split @@ -813,7 +817,15 @@ func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoc return &startTime, &endTime } -func (self *ClusterConfiguration) GetShards(querySpec QuerySpec) []*ShardData { +func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*ShardData { + if querySpec.IsDropSeriesQuery() { + seriesName := querySpec.Query().DropSeriesQuery.GetTableName() + if seriesName[0] < FIRST_LOWER_CASE_CHARACTER { + return self.longTermShards + } + return self.shortTermShards + } + shouldQueryShortTerm, shouldQueryLongTerm := querySpec.ShouldQueryShortTermAndLongTerm() if shouldQueryLongTerm && shouldQueryShortTerm { diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 31f9ed65ab..98442c3535 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -178,65 +178,79 @@ func (self *ShardData) WriteLocalOnly(request *protocol.Request) error { } func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protocol.Response) error { + // This is only for queries that are deletes or drops. They need to be sent everywhere as opposed to just the local or one of the remote shards. + // But this boolean should only be set to true on the server that receives the initial query. + if querySpec.RunAgainstAllServersInShard { + if querySpec.IsDeleteFromSeriesQuery() { + return self.logAndHandleDeleteQuery(querySpec, response) + } else if querySpec.IsDropSeriesQuery() { + return self.logAndHandleDropSeriesQuery(querySpec, response) + } + } + if self.localShard != nil { var processor QueryProcessor if querySpec.IsListSeriesQuery() { processor = engine.NewListSeriesEngine(response) - } else if querySpec.IsDeleteFromSeriesQuery() { + } else if querySpec.IsDeleteFromSeriesQuery() || querySpec.IsDropSeriesQuery() { processor = engine.NewPassthroughEngine(response) } else { processor = engine.NewQueryEngine(querySpec.SelectQuery(), response) } + fmt.Println("SHARD query local: ", self.id) err := self.localShard.Query(querySpec, processor) processor.Close() return err } - if querySpec.IsDeleteFromSeriesQuery() { - return self.logAndHandleDeleteQuery(querySpec, response) - } - randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers))) server := self.clusterServers[randServerIndex] - queryString := querySpec.GetQueryString() - user := querySpec.User() - userName := user.GetName() - database := querySpec.Database() - isDbUser := !user.IsClusterAdmin() - request := &protocol.Request{Type: &queryRequest, ShardId: &self.id, Query: &queryString, UserName: &userName, Database: &database, IsDbUser: &isDbUser} + request := self.createRequest(querySpec) + return server.MakeRequest(request, response) } func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error { - user := querySpec.User() - userName := user.GetName() - database := querySpec.Database() - isDbUser := !user.IsClusterAdmin() - queryString := querySpec.GetQueryStringWithTimeCondition() - request := &protocol.Request{ - Type: &queryRequest, - ShardId: &self.id, - Query: &queryString, - UserName: &userName, - Database: &database, - IsDbUser: &isDbUser, - } + request := self.createRequest(querySpec) + request.Query = &queryString + return self.logAndHandleDestructiveQuery(querySpec, request, response) +} + +func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error { + return self.logAndHandleDestructiveQuery(querySpec, self.createRequest(querySpec), response) +} + +func (self *ShardData) logAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *protocol.Request, response chan *protocol.Response) error { requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers) if err != nil { return err } responses := make([]chan *protocol.Response, len(self.clusterServers), len(self.clusterServers)) for i, server := range self.clusterServers { + fmt.Println("SHARD: requesting to server: ", server.id) responseChan := make(chan *protocol.Response, 1) responses[i] = responseChan server.MakeRequest(request, responseChan) } + if self.localShard != nil { + responseChan := make(chan *protocol.Response, 1) + responses = append(responses, responseChan) + processor := engine.NewPassthroughEngine(responseChan) + err := self.localShard.Query(querySpec, processor) + processor.Close() + if err != nil { + return err + } + } for i, responseChan := range responses { for { res := <-responseChan if *res.Type == endStreamResponse { - self.wal.Commit(requestNumber, self.clusterServers[i]) + // don't need to do a commit for the local datastore for now. + if i < len(self.clusterServers) { + self.wal.Commit(requestNumber, self.clusterServers[i]) + } break } response <- res @@ -246,6 +260,23 @@ func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, resp return nil } +func (self *ShardData) createRequest(querySpec *parser.QuerySpec) *protocol.Request { + queryString := querySpec.GetQueryString() + user := querySpec.User() + userName := user.GetName() + database := querySpec.Database() + isDbUser := !user.IsClusterAdmin() + + return &protocol.Request{ + Type: &queryRequest, + ShardId: &self.id, + Query: &queryString, + UserName: &userName, + Database: &database, + IsDbUser: &isDbUser, + } +} + // used to serialize shards when sending around in raft or when snapshotting in the log func (self *ShardData) ToNewShardData() *NewShardData { return &NewShardData{ diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 3e7a3fe697..911ac7d9f6 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -50,8 +50,6 @@ var ( proxyWrite = protocol.Request_PROXY_WRITE proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE - proxyDropSeries = protocol.Request_PROXY_DROP_SERIES - replicateDropSeries = protocol.Request_REPLICATION_DROP_SERIES queryRequest = protocol.Request_QUERY endStreamResponse = protocol.Response_END_STREAM queryResponse = protocol.Response_QUERY @@ -129,7 +127,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt } if query.DropSeriesQuery != nil { - err := self.DropSeries(user, database, query.DropSeriesQuery.GetTableName()) + err := self.runDropSeriesQuery(querySpec, yield) if err != nil { return err } @@ -150,29 +148,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt // This should only get run for SelectQuery types func (self *CoordinatorImpl) runQuery(query *parser.Query, user common.User, database string, yield seriesYieldFunc) error { querySpec := parser.NewQuerySpec(user, database, query) - shards := self.clusterConfiguration.GetShards(querySpec) - fmt.Println("COORD: runQuery shards ") - for _, s := range shards { - fmt.Println("shard: ", s) - } - fmt.Println("**************************") - responses := make([]chan *protocol.Response, len(shards), len(shards)) - for i, shard := range shards { - responseChan := make(chan *protocol.Response, 1) - go shard.Query(querySpec, responseChan) - responses[i] = responseChan - } - - for _, responseChan := range responses { - for { - response := <-responseChan - if *response.Type == endStreamResponse { - break - } - yield(response.Series) - } - } - return nil + return self.runQuerySpec(querySpec, yield) } func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, yield seriesYieldFunc) error { @@ -220,11 +196,27 @@ func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield s if !querySpec.User().IsDbAdmin(db) { return common.NewAuthorizationError("Insufficient permission to write to %s", db) } + querySpec.RunAgainstAllServersInShard = true + return self.runQuerySpec(querySpec, yield) +} + +func (self *CoordinatorImpl) runDropSeriesQuery(querySpec *parser.QuerySpec, yield seriesYieldFunc) error { + user := querySpec.User() + db := querySpec.Database() + series := querySpec.Query().DropSeriesQuery.GetTableName() + if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) { + return common.NewAuthorizationError("Insufficient permission to drop series") + } + querySpec.RunAgainstAllServersInShard = true + return self.runQuerySpec(querySpec, yield) +} + +func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, yield seriesYieldFunc) error { shards := self.clusterConfiguration.GetShards(querySpec) responses := make([]chan *protocol.Response, 0) for _, shard := range shards { responseChan := make(chan *protocol.Response, 1) - shard.Query(querySpec, responseChan) + go shard.Query(querySpec, responseChan) responses = append(responses, responseChan) } @@ -238,6 +230,7 @@ func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield s } } return nil + } func recoverFunc(database, query string) { @@ -902,40 +895,6 @@ func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, d return self.proxyUntilSuccess(servers, request) } -func (self *CoordinatorImpl) handleDropSeries(server *cluster.ClusterServer, database, series string) error { - owner, servers := self.clusterConfiguration.GetReplicas(server, &database) - - request := self.createRequest(proxyDropSeries, &database) - request.OriginatingServerId = &self.clusterConfiguration.LocalServerId - request.ClusterVersion = &self.clusterConfiguration.ClusterVersion - ownId := owner.Id() - request.OwnerServerId = &ownId - request.Series = &protocol.Series{Name: &series} - replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database)) - request.ReplicationFactor = &replicationFactor - - if server.Id() == self.clusterConfiguration.LocalServerId { - // this is a local delete - replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database) - err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &ownId) - if err != nil { - return self.proxyUntilSuccess(servers, request) - } - self.datastore.DropSeries(database, series) - if err != nil { - log.Error("Couldn't write data to local store: ", err, request) - } - - // ignoring the error because we still want to send to replicas - request.Type = &replicateDropSeries - self.sendRequestToReplicas(request, servers) - return nil - } - - // otherwise, proxy the request - return self.proxyUntilSuccess(servers, request) -} - func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protocol.Series) error { return self.datastore.WriteSeriesData(*db, series) } @@ -1121,25 +1080,6 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { return nil } -func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error { - if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) { - return common.NewAuthorizationError("Insufficient permission to drop series") - } - - if self.clusterConfiguration.IsSingleServer() { - return self.datastore.DropSeries(db, series) - } - - servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db) - for _, server := range servers { - if err := self.handleDropSeries(server.Server, db, series); err != nil { - return err - } - } - - return nil -} - func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) { log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) user, err := self.clusterConfiguration.AuthenticateDbUser(db, username, password) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index b46fc4ddb7..16b60e237c 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -20,7 +20,6 @@ type Coordinator interface { DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error WriteSeriesData(user common.User, db string, series *protocol.Series) error DropDatabase(user common.User, db string) error - DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ForceCompaction(user common.User) error ListDatabases(user common.User) ([]*cluster.Database, error) diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index e163122db2..94a3b739d4 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -62,38 +62,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con // TODO: add quorum writes? self.coordinator.ReplicateWrite(request) return err - } else if *request.Type == protocol.Request_PROXY_DROP_SERIES { - response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - - request.OriginatingServerId = &self.clusterConfig.LocalServerId - replicationFactor := uint8(*request.ReplicationFactor) - // TODO: make request logging and datastore write atomic - err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) - if err != nil { - return err - } - err = self.db.DropSeries(*request.Database, *request.Series.Name) - if err != nil { - return err - } - err = self.WriteResponse(conn, response) - self.coordinator.ReplicateWrite(request) - return err - } else if *request.Type == protocol.Request_REPLICATION_DROP_SERIES { - replicationFactor := uint8(*request.ReplicationFactor) - // TODO: make request logging and datastore write atomic - err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) - if err != nil { - switch err := err.(type) { - case datastore.SequenceMissingRequestsError: - log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence) - go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence) - return nil - default: - return err - } - } - return self.db.DropSeries(*request.Database, *request.Series.Name) } else if *request.Type == protocol.Request_PROXY_DROP_DATABASE { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} diff --git a/src/datastore/leveldb_shard.go b/src/datastore/leveldb_shard.go index 7410961e9e..88f47f66a3 100644 --- a/src/datastore/leveldb_shard.go +++ b/src/datastore/leveldb_shard.go @@ -92,6 +92,8 @@ func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.Q return self.executeListSeriesQuery(querySpec, processor) } else if querySpec.IsDeleteFromSeriesQuery() { return self.executeDeleteQuery(querySpec, processor) + } else if querySpec.IsDropSeriesQuery() { + return self.executeDropSeriesQuery(querySpec, processor) } seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() @@ -301,6 +303,29 @@ func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, proces return nil } +func (self *LevelDbShard) executeDropSeriesQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + database := querySpec.Database() + series := querySpec.Query().DropSeriesQuery.GetTableName() + fmt.Println("DROP SERIES QUERY: ", database, series) + startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} + endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF} + + wb := levigo.NewWriteBatch() + defer wb.Close() + + for _, name := range self.getColumnNamesForSeries(database, series) { + if err := self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes); err != nil { + return err + } + + indexKey := append(SERIES_COLUMN_INDEX_PREFIX, []byte(database+"~"+series+"~"+name)...) + wb.Delete(indexKey) + } + + // remove the column indeces for this time series + return self.db.Write(self.writeOptions, wb) +} + func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte { timeBuffer := bytes.NewBuffer(make([]byte, 0, 8)) binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time)) diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 582ec590a5..da061fba7a 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -598,8 +598,7 @@ func (self *ServerSuite) TestDropSeries(c *C) { for _, s := range self.serverProcesses { fmt.Printf("Running query against: %d\n", s.apiPort) collection := s.Query("drop_series", "select * from cluster_query", true, c) - c.Assert(collection.GetSeries("cluster_query", c).Points, HasLen, 0) - c.Assert(collection.GetSeries("cluster_query", c).Columns, DeepEquals, []string{"time", "sequence_number"}) + c.Assert(collection.Members, HasLen, 0) } } } diff --git a/src/parser/parser.go b/src/parser/parser.go index 8048d95b28..29f97d1710 100644 --- a/src/parser/parser.go +++ b/src/parser/parser.go @@ -177,6 +177,7 @@ type DeleteQuery struct { } type Query struct { + QueryString string SelectQuery *SelectQuery DeleteQuery *DeleteQuery ListQuery *ListQuery @@ -189,8 +190,10 @@ func (self *Query) GetQueryString() string { return self.SelectQuery.GetQueryString() } else if self.ListQuery != nil { return "list series" + } else if self.DeleteQuery != nil { + return self.DeleteQuery.GetQueryString() } - return self.DeleteQuery.GetQueryString() + return self.QueryString } func (self *Query) IsListQuery() bool { @@ -484,11 +487,11 @@ func ParseQuery(query string) ([]*Query, error) { } if q.list_series_query != 0 { - return []*Query{&Query{ListQuery: &ListQuery{Type: Series}}}, nil + return []*Query{&Query{QueryString: query, ListQuery: &ListQuery{Type: Series}}}, nil } if q.list_continuous_queries_query != 0 { - return []*Query{&Query{ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil + return []*Query{&Query{QueryString: query, ListQuery: &ListQuery{Type: ContinuousQueries}}}, nil } if q.select_query != nil { @@ -497,22 +500,22 @@ func ParseQuery(query string) ([]*Query, error) { return nil, err } - return []*Query{&Query{SelectQuery: selectQuery}}, nil + return []*Query{&Query{QueryString: query, SelectQuery: selectQuery}}, nil } else if q.delete_query != nil { deleteQuery, err := parseDeleteQuery(query, q.delete_query) if err != nil { return nil, err } - return []*Query{&Query{DeleteQuery: deleteQuery}}, nil + return []*Query{&Query{QueryString: query, DeleteQuery: deleteQuery}}, nil } else if q.drop_series_query != nil { dropSeriesQuery, err := parseDropSeriesQuery(query, q.drop_series_query) if err != nil { return nil, err } - return []*Query{&Query{DropSeriesQuery: dropSeriesQuery}}, nil + return []*Query{&Query{QueryString: query, DropSeriesQuery: dropSeriesQuery}}, nil } else if q.drop_query != nil { fmt.Println(q.drop_query.id) - return []*Query{&Query{DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil + return []*Query{&Query{QueryString: query, DropQuery: &DropQuery{Id: int(q.drop_query.id)}}}, nil } return nil, fmt.Errorf("Unknown query type encountered") } diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 1a4d99325b..c4f159bc47 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -6,14 +6,15 @@ import ( ) type QuerySpec struct { - query *Query - database string - isRegex bool - names []string - user common.User - startTime time.Time - endTime time.Time - seriesValuesAndColumns map[*Value][]string + query *Query + database string + isRegex bool + names []string + user common.User + startTime time.Time + endTime time.Time + seriesValuesAndColumns map[*Value][]string + RunAgainstAllServersInShard bool } func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec { @@ -144,3 +145,11 @@ func (self *QuerySpec) GetQueryStringWithTimeCondition() string { } return self.query.GetQueryString() } + +func (self *QuerySpec) IsDropSeriesQuery() bool { + return self.query.DropSeriesQuery != nil +} + +func (self *QuerySpec) Query() *Query { + return self.query +}