diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 1cc54f792d..31f9ed65ab 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -182,6 +182,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco var processor QueryProcessor if querySpec.IsListSeriesQuery() { processor = engine.NewListSeriesEngine(response) + } else if querySpec.IsDeleteFromSeriesQuery() { + processor = engine.NewPassthroughEngine(response) } else { processor = engine.NewQueryEngine(querySpec.SelectQuery(), response) } @@ -190,6 +192,10 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco return err } + if querySpec.IsDeleteFromSeriesQuery() { + return self.logAndHandleDeleteQuery(querySpec, response) + } + randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers))) server := self.clusterServers[randServerIndex] queryString := querySpec.GetQueryString() @@ -201,6 +207,45 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco return server.MakeRequest(request, response) } +func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error { + user := querySpec.User() + userName := user.GetName() + database := querySpec.Database() + isDbUser := !user.IsClusterAdmin() + + queryString := querySpec.GetQueryStringWithTimeCondition() + request := &protocol.Request{ + Type: &queryRequest, + ShardId: &self.id, + Query: &queryString, + UserName: &userName, + Database: &database, + IsDbUser: &isDbUser, + } + requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers) + if err != nil { + return err + } + responses := make([]chan *protocol.Response, len(self.clusterServers), len(self.clusterServers)) + for i, server := range self.clusterServers { + responseChan := make(chan *protocol.Response, 1) + responses[i] = responseChan + server.MakeRequest(request, responseChan) + } + for i, responseChan := range responses { + for { + res := <-responseChan + if *res.Type == endStreamResponse { + self.wal.Commit(requestNumber, self.clusterServers[i]) + break + } + response <- res + } + } + response <- &protocol.Response{Type: &endStreamResponse} + return nil +} + // used to serialize shards when sending around in raft or when snapshotting in the log func (self *ShardData) ToNewShardData() *NewShardData { return &NewShardData{ diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index e3cf0c1575..6f302a1c29 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -48,7 +48,6 @@ var ( // shorter constants for readability var ( proxyWrite = protocol.Request_PROXY_WRITE - proxyDelete = protocol.Request_PROXY_DELETE proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE proxyDropSeries = protocol.Request_PROXY_DROP_SERIES @@ -97,7 +96,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt querySpec := parser.NewQuerySpec(user, database, query) if query.DeleteQuery != nil { - if err := self.DeleteSeriesData(user, database, query.DeleteQuery, false); err != nil { + if err := self.runDeleteQuery(querySpec, yield); err != nil { return err } continue @@ -214,6 +213,31 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, yie return nil } +func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield func(*protocol.Series) error) error { + db := querySpec.Database() + if !querySpec.User().IsDbAdmin(db) { + return common.NewAuthorizationError("Insufficient permission to write to %s", db) + } + shards := self.clusterConfiguration.GetShards(querySpec) + responses := make([]chan *protocol.Response, 0) + for _, shard := range shards { + responseChan := make(chan *protocol.Response, 1) + shard.Query(querySpec, responseChan) + responses = append(responses, responseChan) + } + + for _, responseChan := range responses { + for { + response := <-responseChan + if *response.Type == endStreamResponse { + break + } + yield(response.Series) + } + } + return nil +} + func recoverFunc(database, query string) { if err := recover(); err != nil { fmt.Fprintf(os.Stderr, "********************************BUG********************************\n") @@ -743,8 +767,8 @@ func (self *CoordinatorImpl) handleReplayRequest(r *protocol.Request, replicatio log.Debug("Replaying write request") self.datastore.WriteSeriesData(*r.Database, r.Series) } else if *r.Type == protocol.Request_PROXY_DELETE || *r.Type == protocol.Request_REPLICATION_DELETE { - query, _ := parser.ParseQuery(*r.Query) - err = self.datastore.DeleteSeriesData(*r.Database, query[0].DeleteQuery) + // query, _ := parser.ParseQuery(*r.Query) + // err = self.datastore.DeleteSeriesData(*r.Database, query[0].DeleteQuery) } } func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error { @@ -838,67 +862,11 @@ func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard clu return shard.Write(request) } -func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error { - if !user.IsDbAdmin(db) { - return common.NewAuthorizationError("Insufficient permission to write to %s", db) - } - - if self.clusterConfiguration.IsSingleServer() || localOnly { - return self.deleteSeriesDataLocally(user, db, query) - } - - servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db) - for _, server := range servers { - if err := self.handleSeriesDelete(user, server.Server, db, query); err != nil { - return err - } - } - - return nil -} - -func (self *CoordinatorImpl) deleteSeriesDataLocally(user common.User, database string, query *parser.DeleteQuery) error { - return self.datastore.DeleteSeriesData(database, query) -} - func (self *CoordinatorImpl) createRequest(requestType protocol.Request_Type, database *string) *protocol.Request { id := atomic.AddUint32(&self.requestId, uint32(1)) return &protocol.Request{Type: &requestType, Database: database, Id: &id} } -func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *cluster.ClusterServer, database string, query *parser.DeleteQuery) error { - owner, servers := self.clusterConfiguration.GetReplicas(server, &database) - - request := self.createRequest(proxyDelete, &database) - queryStr := query.GetQueryStringWithTimeCondition() - request.Query = &queryStr - request.OriginatingServerId = &self.clusterConfiguration.LocalServerId - request.ClusterVersion = &self.clusterConfiguration.ClusterVersion - ownId := owner.Id() - request.OwnerServerId = &ownId - - if server.Id() == self.clusterConfiguration.LocalServerId { - // this is a local delete - replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database) - err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &ownId) - if err != nil { - return self.proxyUntilSuccess(servers, request) - } - self.deleteSeriesDataLocally(user, database, query) - if err != nil { - log.Error("Couldn't write data to local store: ", err, request) - } - - // ignoring the error because we still want to send to replicas - request.Type = &replicateDelete - self.sendRequestToReplicas(request, servers) - return nil - } - - // otherwise, proxy the delete - return self.proxyUntilSuccess(servers, request) -} - func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, database string) error { owner, servers := self.clusterConfiguration.GetReplicas(server, &database) @@ -1329,16 +1297,6 @@ func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error { return nil } -func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error { - id := atomic.AddUint32(&self.requestId, uint32(1)) - request.Id = &id - server := self.clusterConfiguration.GetServerById(request.OwnerServerId) - _, replicas := self.clusterConfiguration.GetReplicas(server, request.Database) - request.Type = &replicateDelete - self.sendRequestToReplicas(request, replicas) - return nil -} - func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*cluster.ClusterServer) { for _, server := range replicas { if server.Id() != self.clusterConfiguration.LocalServerId { diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 21febfc4bc..a2435adf3f 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -19,14 +19,12 @@ type Coordinator interface { // 5. TODO: Aggregation on the nodes DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error WriteSeriesData(user common.User, db string, series *protocol.Series) error - DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error DropDatabase(user common.User, db string) error DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ForceCompaction(user common.User) error ListDatabases(user common.User) ([]*cluster.Database, error) ReplicateWrite(request *protocol.Request) error - ReplicateDelete(request *protocol.Request) error ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error) DeleteContinuousQuery(user common.User, db string, id uint32) error diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 101f76eaae..e163122db2 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -126,28 +126,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con } } return self.db.DropDatabase(*request.Database) - } else if *request.Type == protocol.Request_PROXY_DELETE { - response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - - request.OriginatingServerId = &self.clusterConfig.LocalServerId - // TODO: make request logging and datastore write atomic - replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) - err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) - if err != nil { - return err - } - query, err := parser.ParseQuery(*request.Query) - if err != nil { - return err - } - err = self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery) - if err != nil { - return err - } - err = self.WriteResponse(conn, response) - // TODO: add quorum writes? - self.coordinator.ReplicateDelete(request) - return err } else if *request.Type == protocol.Request_REPLICATION_WRITE { replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) // TODO: make request logging and datastore write atomic @@ -164,24 +142,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con } self.db.WriteSeriesData(*request.Database, request.Series) return nil - } else if *request.Type == protocol.Request_REPLICATION_DELETE { - replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) - // TODO: make request logging and datastore write atomic - err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) - if err != nil { - switch err := err.(type) { - case datastore.SequenceMissingRequestsError: - go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence) - return nil - default: - return err - } - } - query, err := parser.ParseQuery(*request.Query) - if err != nil { - return err - } - return self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery) } else if *request.Type == protocol.Request_QUERY { go self.handleQuery(request, conn) } else if *request.Type == protocol.Request_REPLICATION_REPLAY { diff --git a/src/datastore/leveldb_shard.go b/src/datastore/leveldb_shard.go index c7b67a9f60..7410961e9e 100644 --- a/src/datastore/leveldb_shard.go +++ b/src/datastore/leveldb_shard.go @@ -90,6 +90,8 @@ func (self *LevelDbShard) Write(database string, series *protocol.Series) error func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { if querySpec.IsListSeriesQuery() { return self.executeListSeriesQuery(querySpec, processor) + } else if querySpec.IsDeleteFromSeriesQuery() { + return self.executeDeleteQuery(querySpec, processor) } seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() @@ -276,6 +278,109 @@ func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, pr return nil } +func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { + query := querySpec.DeleteQuery() + series := query.GetFromClause() + database := querySpec.Database() + if series.Type != parser.FromClauseArray { + return fmt.Errorf("Merge and Inner joins can't be used with a delete query", series.Type) + } + + for _, name := range series.Names { + var err error + if regex, ok := name.Name.GetCompiledRegex(); ok { + err = self.deleteRangeOfRegex(database, regex, query.GetStartTime(), query.GetEndTime()) + } else { + err = self.deleteRangeOfSeries(database, name.Name.Name, query.GetStartTime(), query.GetEndTime()) + } + + if err != nil { + return err + } + } + return nil +} + +func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte { + timeBuffer := bytes.NewBuffer(make([]byte, 0, 8)) + binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time)) + bytes := timeBuffer.Bytes() + return bytes +} + +func (self *LevelDbShard) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) { + return self.byteArrayForTimeInt(startTime), self.byteArrayForTimeInt(endTime) +} + +func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, startTimeBytes, endTimeBytes []byte) error { + columns := self.getColumnNamesForSeries(database, series) + fields, err := self.getFieldsForSeries(database, series, columns) + if err != nil { + // because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore + switch err := err.(type) { + case FieldLookupError: + return nil + default: + return err + } + } + ro := levigo.NewReadOptions() + defer ro.Close() + ro.SetFillCache(false) + rangesToCompact := make([]*levigo.Range, 0) + for _, field := range fields { + it := self.db.NewIterator(ro) + defer it.Close() + wb := levigo.NewWriteBatch() + defer wb.Close() + + startKey := append(field.Id, startTimeBytes...) + endKey := startKey + it.Seek(startKey) + if it.Valid() { + if !bytes.Equal(it.Key()[:8], field.Id) { + it.Next() + if it.Valid() { + startKey = it.Key() + } + } + } + for it = it; it.Valid(); it.Next() { + k := it.Key() + if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 { + break + } + wb.Delete(k) + endKey = k + } + err = self.db.Write(self.writeOptions, wb) + if err != nil { + return err + } + rangesToCompact = append(rangesToCompact, &levigo.Range{startKey, endKey}) + } + for _, r := range rangesToCompact { + self.db.CompactRange(*r) + } + return nil +} + +func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error { + startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime)) + return self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes) +} + +func (self *LevelDbShard) deleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error { + series := self.getSeriesForDbAndRegex(database, regex) + for _, name := range series { + err := self.deleteRangeOfSeries(database, name, startTime, endTime) + if err != nil { + return err + } + } + return nil +} + func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) { isCountQuery := false if len(columns) > 0 && columns[0] == "*" { diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go new file mode 100644 index 0000000000..fa44df2e6d --- /dev/null +++ b/src/engine/passthrough_engine.go @@ -0,0 +1,54 @@ +package engine + +// This engine buffers points and passes them through without modification. Works for queries +// that can't be aggregated locally or queries that don't require it like deletes and drops. +import ( + "protocol" +) + +const ( + MAX_POINTS_IN_RESPONSE = 10000 +) + +type PassthroughEngine struct { + responseChan chan *protocol.Response + response *protocol.Response +} + +func NewPassthroughEngine(responseChan chan *protocol.Response) *PassthroughEngine { + return &PassthroughEngine{ + responseChan: responseChan, + } +} + +func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool { + if self.response == nil { + self.response = &protocol.Response{ + Type: &queryResponse, + Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + } + } else if self.response.Series.Name != seriesName { + self.responseChan <- self.response + self.response = &protocol.Response{ + Type: &queryResponse, + Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + } + } else if len(self.response.Series.Points) > MAX_POINTS_IN_RESPONSE { + self.responseChan <- self.response + self.response = &protocol.Response{ + Type: &queryResponse, + Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames}, + } + } else { + self.response.Series.Points = append(self.response.Series.Points, point) + } + return true +} + +func (self *PassthroughEngine) Close() { + if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { + self.responseChan <- self.response + } + response := &protocol.Response{Type: &endStreamResponse} + self.responseChan <- response +} diff --git a/src/integration/server_test.go b/src/integration/server_test.go index d82e044736..582ec590a5 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -480,9 +480,11 @@ func (self *ServerSuite) TestDeleteReplication(c *C) { series := collection.GetSeries("test_delete_replication", c) c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, float64(1)) - self.serverProcesses[0].Query("test_rep", "delete from test_delete_replication", false, c) - collection = self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_replication", false, c) - c.Assert(collection.Members, HasLen, 0) + for _, s := range self.serverProcesses { + s.Query("test_rep", "delete from test_delete_replication", false, c) + collection = self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_replication", false, c) + c.Assert(collection.Members, HasLen, 0) + } } // Reported by Alex in the following thread diff --git a/src/parser/query_spec.go b/src/parser/query_spec.go index 336c79311a..1a4d99325b 100644 --- a/src/parser/query_spec.go +++ b/src/parser/query_spec.go @@ -46,6 +46,10 @@ func (self *QuerySpec) User() common.User { return self.user } +func (self *QuerySpec) DeleteQuery() *DeleteQuery { + return self.query.DeleteQuery +} + func (self *QuerySpec) TableNames() []string { if self.names != nil { return self.names @@ -124,6 +128,19 @@ func (self *QuerySpec) IsListSeriesQuery() bool { return self.query.IsListSeriesQuery() } +func (self *QuerySpec) IsDeleteFromSeriesQuery() bool { + return self.query.DeleteQuery != nil +} + func (self *QuerySpec) GetQueryString() string { return self.query.GetQueryString() } + +func (self *QuerySpec) GetQueryStringWithTimeCondition() string { + if self.query.SelectQuery != nil { + return self.query.SelectQuery.GetQueryStringWithTimeCondition() + } else if self.query.DeleteQuery != nil { + return self.query.DeleteQuery.GetQueryStringWithTimeCondition() + } + return self.query.GetQueryString() +} diff --git a/src/wal/wal.go b/src/wal/wal.go index c507b8c28e..f255522c4e 100644 --- a/src/wal/wal.go +++ b/src/wal/wal.go @@ -33,10 +33,12 @@ func (self *WAL) SetServerId(id uint32) { // Will assign sequence numbers if null. Returns a unique id that should be marked as committed for each server // as it gets confirmed. func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Shard, servers []Server) (uint32, error) { - for _, point := range request.Series.Points { - if point.SequenceNumber == nil { - sn := self.getNextSequenceNumber(shard) - point.SequenceNumber = &sn + if request.Series != nil { + for _, point := range request.Series.Points { + if point.SequenceNumber == nil { + sn := self.getNextSequenceNumber(shard) + point.SequenceNumber = &sn + } } }