Make MaxTopicSize and MaxSegmentSize configurable

pull/2276/head
Philip O'Toole 2015-04-13 18:59:50 -07:00
parent 03d5ffc0bb
commit 8470ee27b6
3 changed files with 30 additions and 6 deletions

View File

@ -46,6 +46,12 @@ const (
// shard groups need to be created in advance for writing // shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute DefaultRetentionCreatePeriod = 45 * time.Minute
// DefaultMaxTopicSize is the default maximum size in bytes a topic can consume on disk of a broker.
DefaultBrokerMaxTopicSize = 1024 * 1024 * 1024
// DefaultMaxTopicSize is the default maximum size in bytes a segment can consume on disk of a broker.
DefaultBrokerMaxSegmentSize = 10 * 1024 * 1024
// DefaultGraphiteDatabaseName is the default Graphite database if none is specified // DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite" DefaultGraphiteDatabaseName = "graphite"
@ -93,9 +99,11 @@ var DefaultSnapshotURL = url.URL{
// Broker represents the configuration for a broker node // Broker represents the configuration for a broker node
type Broker struct { type Broker struct {
Dir string `toml:"dir"` Dir string `toml:"dir"`
Enabled bool `toml:"enabled"` Enabled bool `toml:"enabled"`
Timeout Duration `toml:"election-timeout"` Timeout Duration `toml:"election-timeout"`
MaxTopicSize int64 `toml:"max-topic-size"`
MaxSegmentSize int64 `toml:"max-segment-size"`
} }
// Snapshot represents the configuration for a snapshot service. Snapshot configuration // Snapshot represents the configuration for a snapshot service. Snapshot configuration
@ -233,6 +241,9 @@ func NewConfig() *Config {
c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval c.ContinuousQuery.ComputeRunsPerInterval = DefaultContinuousQueryComputeRunsPerInterval
c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan) c.ContinuousQuery.ComputeNoMoreThan = Duration(DefaultContinousQueryComputeNoMoreThan)
c.Broker.MaxTopicSize = DefaultBrokerMaxTopicSize
c.Broker.MaxSegmentSize = DefaultBrokerMaxSegmentSize
// Detect hostname (or set to localhost). // Detect hostname (or set to localhost).
if c.Hostname, _ = os.Hostname(); c.Hostname == "" { if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
c.Hostname = "localhost" c.Hostname = "localhost"

View File

@ -476,6 +476,8 @@ func (cmd *RunCommand) openBroker(brokerURLs []url.URL) {
// Create broker // Create broker
b := influxdb.NewBroker() b := influxdb.NewBroker()
b.MaxTopicSize = cmd.config.Broker.MaxTopicSize
b.MaxSegmentSize = cmd.config.Broker.MaxSegmentSize
cmd.node.Broker = b cmd.node.Broker = b
// Create raft log. // Create raft log.

View File

@ -24,6 +24,12 @@ import (
// only occurs when the reader is at the end of all the data. // only occurs when the reader is at the end of all the data.
const DefaultPollInterval = 100 * time.Millisecond const DefaultPollInterval = 100 * time.Millisecond
// DefaultMaxTopicSize is the largest a topic can get before truncation.
const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 10MB
// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
// Broker represents distributed messaging system segmented into topics. // Broker represents distributed messaging system segmented into topics.
// Each topic represents a linear series of events. // Each topic represents a linear series of events.
type Broker struct { type Broker struct {
@ -34,6 +40,9 @@ type Broker struct {
meta *bolt.DB // metadata meta *bolt.DB // metadata
topics map[uint64]*Topic // topics by id topics map[uint64]*Topic // topics by id
MaxTopicSize int64 // Maximum size of a topic in bytes
MaxSegmentSize int64 // Maximum size of a segment in bytes
// Log is the distributed raft log that commands are applied to. // Log is the distributed raft log that commands are applied to.
Log interface { Log interface {
URL() url.URL URL() url.URL
@ -53,6 +62,8 @@ func NewBroker() *Broker {
topics: make(map[uint64]*Topic), topics: make(map[uint64]*Topic),
Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags),
} }
b.MaxTopicSize = DefaultMaxTopicSize
b.MaxSegmentSize = DefaultMaxSegmentSize
return b return b
} }
@ -189,6 +200,7 @@ func (b *Broker) openTopics() error {
return fmt.Errorf("open topic: id=%d, err=%s", t.id, err) return fmt.Errorf("open topic: id=%d, err=%s", t.id, err)
} }
b.topics[t.id] = t b.topics[t.id] = t
b.topics[t.id].MaxSegmentSize = b.MaxSegmentSize
} }
// Retrieve the highest index across all topics. // Retrieve the highest index across all topics.
@ -386,6 +398,7 @@ func (b *Broker) ReadFrom(r io.Reader) (int64, error) {
// Copy topic files from snapshot to local disk. // Copy topic files from snapshot to local disk.
for _, st := range sh.Topics { for _, st := range sh.Topics {
t := NewTopic(st.ID, b.topicPath(st.ID)) t := NewTopic(st.ID, b.topicPath(st.ID))
t.MaxSegmentSize = b.MaxSegmentSize
// Create topic directory. // Create topic directory.
if err := os.MkdirAll(t.Path(), 0777); err != nil { if err := os.MkdirAll(t.Path(), 0777); err != nil {
@ -545,6 +558,7 @@ func (b *Broker) Apply(m *Message) error {
t := b.topics[m.TopicID] t := b.topics[m.TopicID]
if t == nil { if t == nil {
t = NewTopic(m.TopicID, b.topicPath(m.TopicID)) t = NewTopic(m.TopicID, b.topicPath(m.TopicID))
t.MaxSegmentSize = b.MaxSegmentSize
if err := t.Open(); err != nil { if err := t.Open(); err != nil {
return fmt.Errorf("open topic: %s", err) return fmt.Errorf("open topic: %s", err)
} }
@ -625,9 +639,6 @@ func (fsm *RaftFSM) Apply(e *raft.LogEntry) error {
return nil return nil
} }
// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
// topic represents a single named queue of messages. // topic represents a single named queue of messages.
// Each topic is identified by a unique path. // Each topic is identified by a unique path.
// //