Wire up delete from series.

Hits shards and replicates out.
Gets logged into the WAL on the receiving server.
Removed old implementation.
Add the passthrough query processor. This can be used later if deletes return the number of rows deleted.
pull/249/head
Paul Dix 2014-02-17 10:37:48 -05:00
parent d46e29aaf8
commit f66d16831c
9 changed files with 260 additions and 119 deletions

View File

@ -182,6 +182,8 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco
var processor QueryProcessor var processor QueryProcessor
if querySpec.IsListSeriesQuery() { if querySpec.IsListSeriesQuery() {
processor = engine.NewListSeriesEngine(response) processor = engine.NewListSeriesEngine(response)
} else if querySpec.IsDeleteFromSeriesQuery() {
processor = engine.NewPassthroughEngine(response)
} else { } else {
processor = engine.NewQueryEngine(querySpec.SelectQuery(), response) processor = engine.NewQueryEngine(querySpec.SelectQuery(), response)
} }
@ -190,6 +192,10 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco
return err return err
} }
if querySpec.IsDeleteFromSeriesQuery() {
return self.logAndHandleDeleteQuery(querySpec, response)
}
randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers))) randServerIndex := int(time.Now().UnixNano() % int64(len(self.clusterServers)))
server := self.clusterServers[randServerIndex] server := self.clusterServers[randServerIndex]
queryString := querySpec.GetQueryString() queryString := querySpec.GetQueryString()
@ -201,6 +207,45 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco
return server.MakeRequest(request, response) return server.MakeRequest(request, response)
} }
func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *protocol.Response) error {
user := querySpec.User()
userName := user.GetName()
database := querySpec.Database()
isDbUser := !user.IsClusterAdmin()
queryString := querySpec.GetQueryStringWithTimeCondition()
request := &protocol.Request{
Type: &queryRequest,
ShardId: &self.id,
Query: &queryString,
UserName: &userName,
Database: &database,
IsDbUser: &isDbUser,
}
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self, self.servers)
if err != nil {
return err
}
responses := make([]chan *protocol.Response, len(self.clusterServers), len(self.clusterServers))
for i, server := range self.clusterServers {
responseChan := make(chan *protocol.Response, 1)
responses[i] = responseChan
server.MakeRequest(request, responseChan)
}
for i, responseChan := range responses {
for {
res := <-responseChan
if *res.Type == endStreamResponse {
self.wal.Commit(requestNumber, self.clusterServers[i])
break
}
response <- res
}
}
response <- &protocol.Response{Type: &endStreamResponse}
return nil
}
// used to serialize shards when sending around in raft or when snapshotting in the log // used to serialize shards when sending around in raft or when snapshotting in the log
func (self *ShardData) ToNewShardData() *NewShardData { func (self *ShardData) ToNewShardData() *NewShardData {
return &NewShardData{ return &NewShardData{

View File

@ -48,7 +48,6 @@ var (
// shorter constants for readability // shorter constants for readability
var ( var (
proxyWrite = protocol.Request_PROXY_WRITE proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE
replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE
proxyDropSeries = protocol.Request_PROXY_DROP_SERIES proxyDropSeries = protocol.Request_PROXY_DROP_SERIES
@ -97,7 +96,7 @@ func (self *CoordinatorImpl) RunQuery(user common.User, database string, querySt
querySpec := parser.NewQuerySpec(user, database, query) querySpec := parser.NewQuerySpec(user, database, query)
if query.DeleteQuery != nil { if query.DeleteQuery != nil {
if err := self.DeleteSeriesData(user, database, query.DeleteQuery, false); err != nil { if err := self.runDeleteQuery(querySpec, yield); err != nil {
return err return err
} }
continue continue
@ -214,6 +213,31 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, yie
return nil return nil
} }
func (self *CoordinatorImpl) runDeleteQuery(querySpec *parser.QuerySpec, yield func(*protocol.Series) error) error {
db := querySpec.Database()
if !querySpec.User().IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
}
shards := self.clusterConfiguration.GetShards(querySpec)
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
responseChan := make(chan *protocol.Response, 1)
shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
for _, responseChan := range responses {
for {
response := <-responseChan
if *response.Type == endStreamResponse {
break
}
yield(response.Series)
}
}
return nil
}
func recoverFunc(database, query string) { func recoverFunc(database, query string) {
if err := recover(); err != nil { if err := recover(); err != nil {
fmt.Fprintf(os.Stderr, "********************************BUG********************************\n") fmt.Fprintf(os.Stderr, "********************************BUG********************************\n")
@ -743,8 +767,8 @@ func (self *CoordinatorImpl) handleReplayRequest(r *protocol.Request, replicatio
log.Debug("Replaying write request") log.Debug("Replaying write request")
self.datastore.WriteSeriesData(*r.Database, r.Series) self.datastore.WriteSeriesData(*r.Database, r.Series)
} else if *r.Type == protocol.Request_PROXY_DELETE || *r.Type == protocol.Request_REPLICATION_DELETE { } else if *r.Type == protocol.Request_PROXY_DELETE || *r.Type == protocol.Request_REPLICATION_DELETE {
query, _ := parser.ParseQuery(*r.Query) // query, _ := parser.ParseQuery(*r.Query)
err = self.datastore.DeleteSeriesData(*r.Database, query[0].DeleteQuery) // err = self.datastore.DeleteSeriesData(*r.Database, query[0].DeleteQuery)
} }
} }
func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error { func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series *protocol.Series) error {
@ -838,67 +862,11 @@ func (self *CoordinatorImpl) write(db string, series *protocol.Series, shard clu
return shard.Write(request) return shard.Write(request)
} }
func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error {
if !user.IsDbAdmin(db) {
return common.NewAuthorizationError("Insufficient permission to write to %s", db)
}
if self.clusterConfiguration.IsSingleServer() || localOnly {
return self.deleteSeriesDataLocally(user, db, query)
}
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
for _, server := range servers {
if err := self.handleSeriesDelete(user, server.Server, db, query); err != nil {
return err
}
}
return nil
}
func (self *CoordinatorImpl) deleteSeriesDataLocally(user common.User, database string, query *parser.DeleteQuery) error {
return self.datastore.DeleteSeriesData(database, query)
}
func (self *CoordinatorImpl) createRequest(requestType protocol.Request_Type, database *string) *protocol.Request { func (self *CoordinatorImpl) createRequest(requestType protocol.Request_Type, database *string) *protocol.Request {
id := atomic.AddUint32(&self.requestId, uint32(1)) id := atomic.AddUint32(&self.requestId, uint32(1))
return &protocol.Request{Type: &requestType, Database: database, Id: &id} return &protocol.Request{Type: &requestType, Database: database, Id: &id}
} }
func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *cluster.ClusterServer, database string, query *parser.DeleteQuery) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
request := self.createRequest(proxyDelete, &database)
queryStr := query.GetQueryStringWithTimeCondition()
request.Query = &queryStr
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
ownId := owner.Id()
request.OwnerServerId = &ownId
if server.Id() == self.clusterConfiguration.LocalServerId {
// this is a local delete
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &ownId)
if err != nil {
return self.proxyUntilSuccess(servers, request)
}
self.deleteSeriesDataLocally(user, database, query)
if err != nil {
log.Error("Couldn't write data to local store: ", err, request)
}
// ignoring the error because we still want to send to replicas
request.Type = &replicateDelete
self.sendRequestToReplicas(request, servers)
return nil
}
// otherwise, proxy the delete
return self.proxyUntilSuccess(servers, request)
}
func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, database string) error { func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, database string) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database) owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
@ -1329,16 +1297,6 @@ func (self *CoordinatorImpl) ReplicateWrite(request *protocol.Request) error {
return nil return nil
} }
func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error {
id := atomic.AddUint32(&self.requestId, uint32(1))
request.Id = &id
server := self.clusterConfiguration.GetServerById(request.OwnerServerId)
_, replicas := self.clusterConfiguration.GetReplicas(server, request.Database)
request.Type = &replicateDelete
self.sendRequestToReplicas(request, replicas)
return nil
}
func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*cluster.ClusterServer) { func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*cluster.ClusterServer) {
for _, server := range replicas { for _, server := range replicas {
if server.Id() != self.clusterConfiguration.LocalServerId { if server.Id() != self.clusterConfiguration.LocalServerId {

View File

@ -19,14 +19,12 @@ type Coordinator interface {
// 5. TODO: Aggregation on the nodes // 5. TODO: Aggregation on the nodes
DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error
WriteSeriesData(user common.User, db string, series *protocol.Series) error WriteSeriesData(user common.User, db string, series *protocol.Series) error
DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error
DropDatabase(user common.User, db string) error DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*cluster.Database, error) ListDatabases(user common.User) ([]*cluster.Database, error)
ReplicateWrite(request *protocol.Request) error ReplicateWrite(request *protocol.Request) error
ReplicateDelete(request *protocol.Request) error
ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64)
GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error) GetLastSequenceNumber(replicationFactor uint8, ownerServerId, originatingServerId uint32) (uint64, error)
DeleteContinuousQuery(user common.User, db string, id uint32) error DeleteContinuousQuery(user common.User, db string, id uint32) error

View File

@ -126,28 +126,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
} }
} }
return self.db.DropDatabase(*request.Database) return self.db.DropDatabase(*request.Database)
} else if *request.Type == protocol.Request_PROXY_DELETE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
request.OriginatingServerId = &self.clusterConfig.LocalServerId
// TODO: make request logging and datastore write atomic
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
query, err := parser.ParseQuery(*request.Query)
if err != nil {
return err
}
err = self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery)
if err != nil {
return err
}
err = self.WriteResponse(conn, response)
// TODO: add quorum writes?
self.coordinator.ReplicateDelete(request)
return err
} else if *request.Type == protocol.Request_REPLICATION_WRITE { } else if *request.Type == protocol.Request_REPLICATION_WRITE {
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
// TODO: make request logging and datastore write atomic // TODO: make request logging and datastore write atomic
@ -164,24 +142,6 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
} }
self.db.WriteSeriesData(*request.Database, request.Series) self.db.WriteSeriesData(*request.Database, request.Series)
return nil return nil
} else if *request.Type == protocol.Request_REPLICATION_DELETE {
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
switch err := err.(type) {
case datastore.SequenceMissingRequestsError:
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
return nil
default:
return err
}
}
query, err := parser.ParseQuery(*request.Query)
if err != nil {
return err
}
return self.db.DeleteSeriesData(*request.Database, query[0].DeleteQuery)
} else if *request.Type == protocol.Request_QUERY { } else if *request.Type == protocol.Request_QUERY {
go self.handleQuery(request, conn) go self.handleQuery(request, conn)
} else if *request.Type == protocol.Request_REPLICATION_REPLAY { } else if *request.Type == protocol.Request_REPLICATION_REPLAY {

View File

@ -90,6 +90,8 @@ func (self *LevelDbShard) Write(database string, series *protocol.Series) error
func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error { func (self *LevelDbShard) Query(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
if querySpec.IsListSeriesQuery() { if querySpec.IsListSeriesQuery() {
return self.executeListSeriesQuery(querySpec, processor) return self.executeListSeriesQuery(querySpec, processor)
} else if querySpec.IsDeleteFromSeriesQuery() {
return self.executeDeleteQuery(querySpec, processor)
} }
seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns() seriesAndColumns := querySpec.SelectQuery().GetReferencedColumns()
@ -276,6 +278,109 @@ func (self *LevelDbShard) executeListSeriesQuery(querySpec *parser.QuerySpec, pr
return nil return nil
} }
func (self *LevelDbShard) executeDeleteQuery(querySpec *parser.QuerySpec, processor cluster.QueryProcessor) error {
query := querySpec.DeleteQuery()
series := query.GetFromClause()
database := querySpec.Database()
if series.Type != parser.FromClauseArray {
return fmt.Errorf("Merge and Inner joins can't be used with a delete query", series.Type)
}
for _, name := range series.Names {
var err error
if regex, ok := name.Name.GetCompiledRegex(); ok {
err = self.deleteRangeOfRegex(database, regex, query.GetStartTime(), query.GetEndTime())
} else {
err = self.deleteRangeOfSeries(database, name.Name.Name, query.GetStartTime(), query.GetEndTime())
}
if err != nil {
return err
}
}
return nil
}
func (self *LevelDbShard) byteArrayForTimeInt(time int64) []byte {
timeBuffer := bytes.NewBuffer(make([]byte, 0, 8))
binary.Write(timeBuffer, binary.BigEndian, self.convertTimestampToUint(&time))
bytes := timeBuffer.Bytes()
return bytes
}
func (self *LevelDbShard) byteArraysForStartAndEndTimes(startTime, endTime int64) ([]byte, []byte) {
return self.byteArrayForTimeInt(startTime), self.byteArrayForTimeInt(endTime)
}
func (self *LevelDbShard) deleteRangeOfSeriesCommon(database, series string, startTimeBytes, endTimeBytes []byte) error {
columns := self.getColumnNamesForSeries(database, series)
fields, err := self.getFieldsForSeries(database, series, columns)
if err != nil {
// because a db is distributed across the cluster, it's possible we don't have the series indexed here. ignore
switch err := err.(type) {
case FieldLookupError:
return nil
default:
return err
}
}
ro := levigo.NewReadOptions()
defer ro.Close()
ro.SetFillCache(false)
rangesToCompact := make([]*levigo.Range, 0)
for _, field := range fields {
it := self.db.NewIterator(ro)
defer it.Close()
wb := levigo.NewWriteBatch()
defer wb.Close()
startKey := append(field.Id, startTimeBytes...)
endKey := startKey
it.Seek(startKey)
if it.Valid() {
if !bytes.Equal(it.Key()[:8], field.Id) {
it.Next()
if it.Valid() {
startKey = it.Key()
}
}
}
for it = it; it.Valid(); it.Next() {
k := it.Key()
if len(k) < 16 || !bytes.Equal(k[:8], field.Id) || bytes.Compare(k[8:16], endTimeBytes) == 1 {
break
}
wb.Delete(k)
endKey = k
}
err = self.db.Write(self.writeOptions, wb)
if err != nil {
return err
}
rangesToCompact = append(rangesToCompact, &levigo.Range{startKey, endKey})
}
for _, r := range rangesToCompact {
self.db.CompactRange(*r)
}
return nil
}
func (self *LevelDbShard) deleteRangeOfSeries(database, series string, startTime, endTime time.Time) error {
startTimeBytes, endTimeBytes := self.byteArraysForStartAndEndTimes(common.TimeToMicroseconds(startTime), common.TimeToMicroseconds(endTime))
return self.deleteRangeOfSeriesCommon(database, series, startTimeBytes, endTimeBytes)
}
func (self *LevelDbShard) deleteRangeOfRegex(database string, regex *regexp.Regexp, startTime, endTime time.Time) error {
series := self.getSeriesForDbAndRegex(database, regex)
for _, name := range series {
err := self.deleteRangeOfSeries(database, name, startTime, endTime)
if err != nil {
return err
}
}
return nil
}
func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) { func (self *LevelDbShard) getFieldsForSeries(db, series string, columns []string) ([]*Field, error) {
isCountQuery := false isCountQuery := false
if len(columns) > 0 && columns[0] == "*" { if len(columns) > 0 && columns[0] == "*" {

View File

@ -0,0 +1,54 @@
package engine
// This engine buffers points and passes them through without modification. Works for queries
// that can't be aggregated locally or queries that don't require it like deletes and drops.
import (
"protocol"
)
const (
MAX_POINTS_IN_RESPONSE = 10000
)
type PassthroughEngine struct {
responseChan chan *protocol.Response
response *protocol.Response
}
func NewPassthroughEngine(responseChan chan *protocol.Response) *PassthroughEngine {
return &PassthroughEngine{
responseChan: responseChan,
}
}
func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool {
if self.response == nil {
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
}
} else if self.response.Series.Name != seriesName {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
}
} else if len(self.response.Series.Points) > MAX_POINTS_IN_RESPONSE {
self.responseChan <- self.response
self.response = &protocol.Response{
Type: &queryResponse,
Series: &protocol.Series{Name: seriesName, Points: []*protocol.Point{point}, Fields: columnNames},
}
} else {
self.response.Series.Points = append(self.response.Series.Points, point)
}
return true
}
func (self *PassthroughEngine) Close() {
if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil {
self.responseChan <- self.response
}
response := &protocol.Response{Type: &endStreamResponse}
self.responseChan <- response
}

View File

@ -480,9 +480,11 @@ func (self *ServerSuite) TestDeleteReplication(c *C) {
series := collection.GetSeries("test_delete_replication", c) series := collection.GetSeries("test_delete_replication", c)
c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, float64(1)) c.Assert(series.GetValueForPointAndColumn(0, "count", c), Equals, float64(1))
self.serverProcesses[0].Query("test_rep", "delete from test_delete_replication", false, c) for _, s := range self.serverProcesses {
collection = self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_replication", false, c) s.Query("test_rep", "delete from test_delete_replication", false, c)
c.Assert(collection.Members, HasLen, 0) collection = self.serverProcesses[0].Query("test_rep", "select count(val_1) from test_delete_replication", false, c)
c.Assert(collection.Members, HasLen, 0)
}
} }
// Reported by Alex in the following thread // Reported by Alex in the following thread

View File

@ -46,6 +46,10 @@ func (self *QuerySpec) User() common.User {
return self.user return self.user
} }
func (self *QuerySpec) DeleteQuery() *DeleteQuery {
return self.query.DeleteQuery
}
func (self *QuerySpec) TableNames() []string { func (self *QuerySpec) TableNames() []string {
if self.names != nil { if self.names != nil {
return self.names return self.names
@ -124,6 +128,19 @@ func (self *QuerySpec) IsListSeriesQuery() bool {
return self.query.IsListSeriesQuery() return self.query.IsListSeriesQuery()
} }
func (self *QuerySpec) IsDeleteFromSeriesQuery() bool {
return self.query.DeleteQuery != nil
}
func (self *QuerySpec) GetQueryString() string { func (self *QuerySpec) GetQueryString() string {
return self.query.GetQueryString() return self.query.GetQueryString()
} }
func (self *QuerySpec) GetQueryStringWithTimeCondition() string {
if self.query.SelectQuery != nil {
return self.query.SelectQuery.GetQueryStringWithTimeCondition()
} else if self.query.DeleteQuery != nil {
return self.query.DeleteQuery.GetQueryStringWithTimeCondition()
}
return self.query.GetQueryString()
}

View File

@ -33,10 +33,12 @@ func (self *WAL) SetServerId(id uint32) {
// Will assign sequence numbers if null. Returns a unique id that should be marked as committed for each server // Will assign sequence numbers if null. Returns a unique id that should be marked as committed for each server
// as it gets confirmed. // as it gets confirmed.
func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Shard, servers []Server) (uint32, error) { func (self *WAL) AssignSequenceNumbersAndLog(request *protocol.Request, shard Shard, servers []Server) (uint32, error) {
for _, point := range request.Series.Points { if request.Series != nil {
if point.SequenceNumber == nil { for _, point := range request.Series.Points {
sn := self.getNextSequenceNumber(shard) if point.SequenceNumber == nil {
point.SequenceNumber = &sn sn := self.getNextSequenceNumber(shard)
point.SequenceNumber = &sn
}
} }
} }