diff --git a/messaging/broker.go b/messaging/broker.go index 040ac65848..c79cf480e0 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -321,6 +321,21 @@ func (b *Broker) setMaxIndex(index uint64) error { return nil } +// URLsForTopic returns a slice of URLs from where previously replicaed +// data for a given topic may be retrieved. The nodes at the URL will have +// data up to at least the given index. These URLs are provided when a node +// requests topic data that has been truncated. +func (b *Broker) DataURLsForTopic(id, index uint64) []url.URL { + b.mu.RLock() + defer b.mu.RUnlock() + + t := b.topics[id] + if t == nil { + return nil + } + return t.DataURLsForIndex(index) +} + // WriteTo writes a snapshot of the broker to w. func (b *Broker) WriteTo(w io.Writer) (int64, error) { // TODO: Prevent truncation during snapshot. @@ -754,6 +769,20 @@ func (t *Topic) DataURLs() []url.URL { return urls } +// DataURLsForIndex returns the data node URLs subscribed to this topic that have +// replicated at least up to the given index. +func (t *Topic) DataURLsForIndex(index uint64) []url.URL { + t.mu.Lock() + defer t.mu.Unlock() + var urls []url.URL + for u, idx := range t.indexByURL { + if idx >= index { + urls = append(urls, u) + } + } + return urls +} + // IndexForURL returns the highest index replicated for a given data URL. func (t *Topic) IndexForURL(u url.URL) uint64 { t.mu.RLock() diff --git a/messaging/broker_test.go b/messaging/broker_test.go index 3df722b41e..55c37750ec 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -107,7 +107,8 @@ func TestBroker_Apply(t *testing.T) { } } -// Ensure the broker can apply topic high water mark messages. +// Ensure the broker can apply topic high water mark messages, and provide correct URLs +// for subsequent topic peer replication. func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) { b := OpenBroker() defer b.Close() @@ -133,9 +134,37 @@ func TestBroker_Apply_SetMaxTopicIndex(t *testing.T) { t.Fatalf("apply error: %s", err) } - if topic := b.Topic(20); topic.IndexForURL(*testDataURL) != 5 { + topic := b.Topic(20) + if topic.IndexForURL(*testDataURL) != 5 { t.Fatalf("unexpected topic url index: %d", topic.IndexForURL(*testDataURL)) } + + // Ensure the URLs that can serve the topic are correct. + var urls []url.URL + urls = topic.DataURLsForIndex(10) + if urls != nil { + t.Fatalf("URLs unexpectedly available for index 10") + } + urls = topic.DataURLsForIndex(5) + if urls == nil { + t.Fatalf("no URLs available for index 5") + } + if urls[0].String() != "http://localhost:1234/data" { + t.Fatalf("unexpectedURL for topic index 5: %s", urls[0].String()) + } + + // Ensure the Broker can provide URLs for topics at a given index. + urls = b.DataURLsForTopic(20, 6) + if urls != nil { + t.Fatalf("Broker unexpectedly provided URLs for topic index 6") + } + urls = b.DataURLsForTopic(20, 5) + if urls == nil { + t.Fatalf("Broker failed to provide URLs for topic index 5") + } + if urls[0].String() != "http://localhost:1234/data" { + t.Fatalf("unexpected broker-provided URL for topic index 5: %s", urls[0].String()) + } } // Ensure the broker can read from topics after reopening.