fix #341. Limit the amount of memory taken by query
Remove the setting for shard query buffer size and add logic for max number of shards to query in parallel.pull/341/head
parent
f959c439c9
commit
c3bf4876d7
|
@ -74,9 +74,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must
|
|||
# will be replayed from the WAL
|
||||
write-buffer-size = 10000
|
||||
|
||||
# When queries get distributed out, the go in parallel. However, the responses must be sent in time order.
|
||||
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
|
||||
query-shard-buffer-size = 1000
|
||||
# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
|
||||
# in memory since results will come in any order, but have to be processed in the correct time order.
|
||||
# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure
|
||||
# that you don't need to buffer in memory, but you won't get the best performance.
|
||||
concurrent-shard-query-limit = 10
|
||||
|
||||
[leveldb]
|
||||
|
||||
|
|
|
@ -72,11 +72,13 @@ type ShardData struct {
|
|||
shardType ShardType
|
||||
durationIsSplit bool
|
||||
shardDuration time.Duration
|
||||
shardSeconds int64
|
||||
localServerId uint32
|
||||
IsLocal bool
|
||||
}
|
||||
|
||||
func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData {
|
||||
shardDuration := endTime.Sub(startTime)
|
||||
return &ShardData{
|
||||
id: id,
|
||||
startTime: startTime,
|
||||
|
@ -87,7 +89,8 @@ func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, dura
|
|||
serverIds: make([]uint32, 0),
|
||||
shardType: shardType,
|
||||
durationIsSplit: durationIsSplit,
|
||||
shardDuration: endTime.Sub(startTime),
|
||||
shardDuration: shardDuration,
|
||||
shardSeconds: int64(shardDuration.Seconds()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -332,6 +335,36 @@ func (self *ShardData) ShouldAggregateLocally(querySpec *parser.QuerySpec) bool
|
|||
return false
|
||||
}
|
||||
|
||||
func (self *ShardData) QueryResponseBufferSize(querySpec *parser.QuerySpec, batchPointSize int) int {
|
||||
groupByTime := querySpec.GetGroupByInterval()
|
||||
if groupByTime == nil {
|
||||
// If the group by time is nil, we shouldn't have to use a buffer since the shards should be queried sequentially.
|
||||
// However, set this to something high just to be safe.
|
||||
log.Info("BUFFER SIZE: 1000")
|
||||
return 1000
|
||||
}
|
||||
tickCount := int(self.shardSeconds / int64(groupByTime.Seconds()))
|
||||
if tickCount < 10 {
|
||||
tickCount = 100
|
||||
} else if tickCount > 1000 {
|
||||
// cap this because each response should have up to this number of points in it.
|
||||
tickCount = tickCount / batchPointSize
|
||||
|
||||
// but make sure it's at least 1k
|
||||
if tickCount < 1000 {
|
||||
tickCount = 1000
|
||||
}
|
||||
}
|
||||
columnCount := querySpec.GetGroupByColumnCount()
|
||||
if columnCount > 1 {
|
||||
// we don't really know the cardinality for any column up front. This is a just a multiplier so we'll see how this goes.
|
||||
// each response can have many points, so having a buffer of the ticks * 100 should be safe, but we'll see.
|
||||
tickCount = tickCount * 100
|
||||
}
|
||||
log.Info("BUFFER SIZE: ", tickCount)
|
||||
return tickCount
|
||||
}
|
||||
|
||||
func (self *ShardData) logAndHandleDeleteQuery(querySpec *parser.QuerySpec, response chan *p.Response) {
|
||||
queryString := querySpec.GetQueryStringWithTimeCondition()
|
||||
request := self.createRequest(querySpec)
|
||||
|
|
|
@ -71,9 +71,11 @@ protobuf_heartbeat = "200ms" # the heartbeat interval between the servers. must
|
|||
# will be replayed from the WAL
|
||||
write-buffer-size = 10000
|
||||
|
||||
# When queries get distributed out, the go in parallel. However, the responses must be sent in time order.
|
||||
# This setting determines how many responses can be buffered in memory per shard before data starts gettind dropped.
|
||||
query-shard-buffer-size = 1000
|
||||
# When queries get distributed out to shards, they go in parallel. This means that results can get buffered
|
||||
# in memory since results will come in any order, but have to be processed in the correct time order.
|
||||
# Setting this higher will give better performance, but you'll need more memory. Setting this to 1 will ensure
|
||||
# that you don't need to buffer in memory, but you won't get the best performance.
|
||||
concurrent-shard-query-limit = 10
|
||||
|
||||
[leveldb]
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ type ClusterConfig struct {
|
|||
ProtobufTimeout duration `toml:"protobuf_timeout"`
|
||||
ProtobufHeartbeatInterval duration `toml:"protobuf_heartbeat"`
|
||||
WriteBufferSize int `toml"write-buffer-size"`
|
||||
QueryShardBufferSize int `toml:"query-shard-buffer-size"`
|
||||
ConcurrentShardQueryLimit int `toml:"concurrent-shard-query-limit"`
|
||||
}
|
||||
|
||||
type LoggingConfig struct {
|
||||
|
@ -215,7 +215,7 @@ type Configuration struct {
|
|||
WalRequestsPerLogFile int
|
||||
LocalStoreWriteBufferSize int
|
||||
PerServerWriteBufferSize int
|
||||
QueryShardBufferSize int
|
||||
ConcurrentShardQueryLimit int
|
||||
}
|
||||
|
||||
func LoadConfiguration(fileName string) *Configuration {
|
||||
|
@ -254,9 +254,9 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
|
|||
tomlConfiguration.WalConfig.RequestsPerLogFile = 10 * tomlConfiguration.WalConfig.IndexAfterRequests
|
||||
}
|
||||
|
||||
defaultQueryShardBufferSize := 100
|
||||
if tomlConfiguration.Cluster.QueryShardBufferSize != 0 {
|
||||
defaultQueryShardBufferSize = tomlConfiguration.Cluster.QueryShardBufferSize
|
||||
defaultConcurrentShardQueryLimit := 10
|
||||
if tomlConfiguration.Cluster.ConcurrentShardQueryLimit != 0 {
|
||||
defaultConcurrentShardQueryLimit = tomlConfiguration.Cluster.ConcurrentShardQueryLimit
|
||||
}
|
||||
|
||||
if tomlConfiguration.Raft.Timeout.Duration == 0 {
|
||||
|
@ -297,7 +297,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
|
|||
WalRequestsPerLogFile: tomlConfiguration.WalConfig.RequestsPerLogFile,
|
||||
LocalStoreWriteBufferSize: tomlConfiguration.Storage.WriteBufferSize,
|
||||
PerServerWriteBufferSize: tomlConfiguration.Cluster.WriteBufferSize,
|
||||
QueryShardBufferSize: defaultQueryShardBufferSize,
|
||||
ConcurrentShardQueryLimit: defaultConcurrentShardQueryLimit,
|
||||
}
|
||||
|
||||
if config.LocalStoreWriteBufferSize == 0 {
|
||||
|
|
|
@ -159,20 +159,12 @@ func (self *CoordinatorImpl) runListSeriesQuery(querySpec *parser.QuerySpec, ser
|
|||
}
|
||||
seriesYielded := make(map[string]bool)
|
||||
|
||||
responses := make([]chan *protocol.Response, 0)
|
||||
for _, shard := range shortTermShards {
|
||||
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
|
||||
go shard.Query(querySpec, responseChan)
|
||||
responses = append(responses, responseChan)
|
||||
}
|
||||
for _, shard := range longTermShards {
|
||||
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
|
||||
go shard.Query(querySpec, responseChan)
|
||||
responses = append(responses, responseChan)
|
||||
}
|
||||
shards := append(shortTermShards, longTermShards...)
|
||||
|
||||
var err error
|
||||
for _, responseChan := range responses {
|
||||
for _, shard := range shards {
|
||||
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
|
||||
go shard.Query(querySpec, responseChan)
|
||||
for {
|
||||
response := <-responseChan
|
||||
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
|
||||
|
@ -224,6 +216,12 @@ func (self *CoordinatorImpl) shouldAggregateLocally(shards []*cluster.ShardData,
|
|||
return true
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) shouldQuerySequentially(shards []*cluster.ShardData, querySpec *parser.QuerySpec) bool {
|
||||
// if we're not aggregating locally, that means all the raw points are being sent back in this query. Do it
|
||||
// sequentially so we don't fill up memory like crazy.
|
||||
return !self.shouldAggregateLocally(shards, querySpec)
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec, writer SeriesWriter) ([]*cluster.ShardData, cluster.QueryProcessor, chan bool, error) {
|
||||
shards := self.clusterConfiguration.GetShards(querySpec)
|
||||
shouldAggregateLocally := self.shouldAggregateLocally(shards, querySpec)
|
||||
|
@ -281,16 +279,32 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
|
|||
return err
|
||||
}
|
||||
|
||||
responses := make([]chan *protocol.Response, 0)
|
||||
for _, shard := range shards {
|
||||
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
|
||||
responses := make([]chan *protocol.Response, len(shards), len(shards))
|
||||
|
||||
shardConcurrentLimit := self.config.ConcurrentShardQueryLimit
|
||||
if self.shouldQuerySequentially(shards, querySpec) {
|
||||
log.Debug("Querying shards sequentially")
|
||||
shardConcurrentLimit = 1
|
||||
}
|
||||
log.Debug("Shard concurrent limit: ", shardConcurrentLimit)
|
||||
for i := 0; i < shardConcurrentLimit && i < len(shards); i++ {
|
||||
shard := shards[i]
|
||||
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
|
||||
// We query shards for data and stream them to query processor
|
||||
go shard.Query(querySpec, responseChan)
|
||||
responses = append(responses, responseChan)
|
||||
responses[i] = responseChan
|
||||
}
|
||||
nextIndex := shardConcurrentLimit
|
||||
// don't queue up new shards to query if we've hit the limit for the query
|
||||
shouldContinue := false
|
||||
|
||||
for i, responseChan := range responses {
|
||||
log.Debug("READING: shard: ", shards[i].String())
|
||||
log.Debug("READING: shard: ", i, shards[i].String())
|
||||
|
||||
// Do this because it's possible should continue was false so we haven't set the other response channels.
|
||||
if responseChan == nil {
|
||||
break
|
||||
}
|
||||
for {
|
||||
response := <-responseChan
|
||||
|
||||
|
@ -300,6 +314,15 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
|
|||
if response.ErrorMessage != nil && err == nil {
|
||||
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
|
||||
}
|
||||
if nextIndex < len(shards) && shouldContinue {
|
||||
shard := shards[nextIndex]
|
||||
responseChan := make(chan *protocol.Response, shard.QueryResponseBufferSize(querySpec, self.config.LevelDbPointBatchSize))
|
||||
// We query shards for data and stream them to query processor
|
||||
log.Debug("Querying Shard: ", nextIndex, shard.String())
|
||||
go shard.Query(querySpec, responseChan)
|
||||
responses[nextIndex] = responseChan
|
||||
nextIndex += 1
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -315,7 +338,8 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
|
|||
// if the data wasn't aggregated at the shard level, aggregate
|
||||
// the data here
|
||||
log.Debug("YIELDING: %d points with %d columns", len(response.Series.Points), len(response.Series.Fields))
|
||||
processor.YieldSeries(response.Series)
|
||||
shouldContinue = processor.YieldSeries(response.Series)
|
||||
log.Debug("ShouldContinue: ", shouldContinue)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -179,6 +179,7 @@ func (self *ProtobufClient) sendResponse(response *protocol.Response) {
|
|||
case req.responseChan <- response:
|
||||
default:
|
||||
log.Error("ProtobufClient: Response buffer full! ", self.hostAndPort, response)
|
||||
panic("fuck, dropping shit!")
|
||||
// if it's an end stream response, we have to send it so start it in a goroutine so we can make sure it gets through without blocking the reading of responses.
|
||||
if *response.Type == protocol.Response_END_STREAM || *response.Type == protocol.Response_WRITE_OK || *response.Type == protocol.Response_ACCESS_DENIED {
|
||||
go func() {
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
. "launchpad.net/gocheck"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -15,7 +16,6 @@ import (
|
|||
"path/filepath"
|
||||
"syscall"
|
||||
"time"
|
||||
. "launchpad.net/gocheck"
|
||||
)
|
||||
|
||||
type ServerSuite struct {
|
||||
|
@ -216,13 +216,13 @@ func (self *ServerProcess) VerifyForbiddenQuery(database, query string, onlyLoca
|
|||
|
||||
func (self *ServerProcess) Post(url, data string, c *C) *http.Response {
|
||||
err := self.Request("POST", url, data, c)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
return err
|
||||
}
|
||||
|
||||
func (self *ServerProcess) Delete(url, body string, c *C) *http.Response {
|
||||
err := self.Request("DELETE", url, body, c)
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,8 @@ type QuerySpec struct {
|
|||
endTime time.Time
|
||||
seriesValuesAndColumns map[*Value][]string
|
||||
RunAgainstAllServersInShard bool
|
||||
groupByInterval *time.Duration
|
||||
groupByColumnCount int
|
||||
}
|
||||
|
||||
func NewQuerySpec(user common.User, database string, query *Query) *QuerySpec {
|
||||
|
@ -89,8 +91,20 @@ func (self *QuerySpec) GetGroupByInterval() *time.Duration {
|
|||
if self.query.SelectQuery == nil {
|
||||
return nil
|
||||
}
|
||||
duration, _ := self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
|
||||
return duration
|
||||
if self.groupByInterval == nil {
|
||||
self.groupByInterval, _ = self.query.SelectQuery.GetGroupByClause().GetGroupByTime()
|
||||
}
|
||||
return self.groupByInterval
|
||||
}
|
||||
|
||||
func (self *QuerySpec) GetGroupByColumnCount() int {
|
||||
if self.query.SelectQuery == nil {
|
||||
return 0
|
||||
}
|
||||
if self.groupByColumnCount == 0 {
|
||||
self.groupByColumnCount = len(self.query.SelectQuery.GetGroupByClause().Elems) - 1
|
||||
}
|
||||
return self.groupByColumnCount
|
||||
}
|
||||
|
||||
func (self *QuerySpec) IsRegex() bool {
|
||||
|
|
Loading…
Reference in New Issue