diff --git a/messaging/broker.go b/messaging/broker.go index c79cf480e0..73f13f602b 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -3,6 +3,7 @@ package messaging import ( "encoding/binary" "encoding/json" + "errors" "fmt" "hash/fnv" "io" @@ -32,6 +33,16 @@ const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB // DefaultMaxSegmentSize is the largest a segment can get before starting a new segment. const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB +var ( + // ErrTopicTruncated is returned when topic data is unavailable due to truncation. + ErrTopicTruncated = errors.New("topic truncated") + + // ErrTopicNodesNotFound is returned when requested topic data has been truncated + // but there are no nodes in the cluster that can provide a replica. This is a system + // failure and should not occur on a healthy replicated system. + ErrTopicNodesNotFound = errors.New("topic nodes not found") +) + // Broker represents distributed messaging system segmented into topics. // Each topic represents a linear series of events. type Broker struct { @@ -1126,14 +1137,30 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { return nil, fmt.Errorf("read segments: %s", err) } - // If there are no segments then ignore. - // If index is zero then start from the first segment. - // If index is less than the first segment range then return error. + // Determine if this topic has been truncated. + var truncated bool + if _, err := os.Stat(filepath.Join(path, "tombstone")); !os.IsNotExist(err) { + truncated = true + } + + // If the requested index is less than that available, one of two things will + // happen. If the topic has been truncated it means that topic data may exist + // somewhere on the cluster that is not available here. Therefore this broker + // cannot provide the data. If truncation has never taken place however, then + // this broker can safely provide whatever data it has. + if len(segments) == 0 { + if truncated { + return nil, ErrTopicTruncated + } return nil, nil - } else if index == 0 { - return segments[0], nil - } else if index < segments[0].Index { + } + + // Is requested index lower than the broker can provide? + if index < segments[0].Index { + if truncated { + return nil, ErrTopicTruncated + } return segments[0], nil } diff --git a/messaging/handler.go b/messaging/handler.go index bbba914130..8b47926cba 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -23,6 +23,7 @@ type Handler struct { io.ReadCloser io.Seeker } + DataURLsForTopic(id, index uint64) []url.URL Publish(m *Message) (uint64, error) SetTopicMaxIndex(topicID, index uint64, u url.URL) error } @@ -100,7 +101,14 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) { // Write out all data from the topic reader. // Automatically flush as reads come in. - if _, err := CopyFlush(w, r); err != nil { + if _, err := CopyFlush(w, r); err == ErrTopicTruncated { + // Broker unable to provide data for the requested index, provide another URL. + urls := h.Broker.DataURLsForTopic(topicID, index) + if urls == nil { + h.error(w, ErrTopicNodesNotFound, http.StatusInternalServerError) + } + http.Redirect(w, req, urls[0].String(), http.StatusTemporaryRedirect) + } else if err != nil { log.Printf("message stream error: %s", err) } } diff --git a/messaging/handler_test.go b/messaging/handler_test.go index 03d6259812..e444c49646 100644 --- a/messaging/handler_test.go +++ b/messaging/handler_test.go @@ -296,6 +296,9 @@ func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) inter func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error { return b.SetTopicMaxIndexFunc(topicID, index, dataURL) } +func (b *HandlerBroker) DataURLsForTopic(id, index uint64) []url.URL { + return nil +} // MustParseURL parses a string into a URL. Panic on error. func MustParseURL(s string) *url.URL {