Provide URLs as nodes for truncated topic data

pull/2433/head
Philip O'Toole 2015-04-24 22:19:54 -07:00 committed by Philip O'Toole
parent 50501624d5
commit 6eefb9ffdc
2 changed files with 60 additions and 2 deletions

View File

@ -321,6 +321,21 @@ func (b *Broker) setMaxIndex(index uint64) error {
return nil
}
// URLsForTopic returns a slice of URLs from where previously replicaed
// data for a given topic may be retrieved. The nodes at the URL will have
// data up to at least the given index. These URLs are provided when a node
// requests topic data that has been truncated.
func (b *Broker) DataURLsForTopic(id, index uint64) []url.URL {
b.mu.RLock()
defer b.mu.RUnlock()
t := b.topics[id]
if t == nil {
return nil
}
return t.DataURLsForIndex(index)
}
// WriteTo writes a snapshot of the broker to w.
func (b *Broker) WriteTo(w io.Writer) (int64, error) {
// TODO: Prevent truncation during snapshot.
@ -754,6 +769,20 @@ func (t *Topic) DataURLs() []url.URL {
return urls
}
// DataURLsForIndex returns the data node URLs subscribed to this topic that have
// replicated at least up to the given index.
func (t *Topic) DataURLsForIndex(index uint64) []url.URL {
t.mu.Lock()
defer t.mu.Unlock()
var urls []url.URL
for u, idx := range t.indexByURL {
if idx >= index {
urls = append(urls, u)
}
}
return urls
}
// IndexForURL returns the highest index replicated for a given data URL.
func (t *Topic) IndexForURL(u url.URL) uint64 {
t.mu.RLock()

View File

@ -107,7 +107,8 @@ func TestBroker_Apply(t *testing.T) {
}
}
// Ensure the broker can apply topic high water mark messages.
// Ensure the broker can apply topic high water mark messages, and provide correct URLs
// for subsequent topic peer replication.
func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
b := OpenBroker()
defer b.Close()
@ -133,9 +134,37 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) {
t.Fatalf("apply error: %s", err)
}
if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 {
topic := b.Topic(20)
if topic.IndexForURL(*testDataURL) != 5 {
t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL))
}
// Ensure the URLs that can serve the topic are correct.
var urls []url.URL
urls = topic.DataURLsForIndex(10)
if urls != nil {
t.Fatalf("URLs unexpectedly available for index 10")
}
urls = topic.DataURLsForIndex(5)
if urls == nil {
t.Fatalf("no URLs available for index 5")
}
if urls[0].String() != "http://localhost:1234/data" {
t.Fatalf("unexpectedURL for topic index 5: %s", urls[0].String())
}
// Ensure the Broker can provide URLs for topics at a given index.
urls = b.DataURLsForTopic(20, 6)
if urls != nil {
t.Fatalf("Broker unexpectedly provided URLs for topic index 6")
}
urls = b.DataURLsForTopic(20, 5)
if urls == nil {
t.Fatalf("Broker failed to provide URLs for topic index 5")
}
if urls[0].String() != "http://localhost:1234/data" {
t.Fatalf("unexpected broker-provided URL for topic index 5: %s", urls[0].String())
}
}
// Ensure the broker can read from topics after reopening.