diff --git a/messaging/broker.go b/messaging/broker.go index 4d0408ca95..61ffa837b6 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -756,10 +756,7 @@ func (t *Topic) TombstonePath() string { return filepath.Join(t.path, "tombstone // Truncated returns whether the topic has even been truncated. func (t *Topic) Truncated() bool { - if _, err := os.Stat(t.TombstonePath()); os.IsNotExist(err) { - return false - } - return true + return tombstoneExists(t.path) } // Index returns the highest replicated index for the topic. @@ -1138,10 +1135,7 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { } // Determine if this topic has been truncated. - var truncated bool - if _, err := os.Stat(filepath.Join(path, "tombstone")); !os.IsNotExist(err) { - truncated = true - } + truncated := tombstoneExists(path) // If the requested index is less than that available, one of two things will // happen. If the topic has been truncated it means that topic data may exist @@ -1587,6 +1581,14 @@ func (dec *MessageDecoder) Decode(m *Message) error { return nil } +// tombstoneExists returns whether the given directory contains a tombstone +func tombstoneExists(path string) bool { + if _, err := os.Stat(filepath.Join(path, "tombstone")); os.IsNotExist(err) { + return false + } + return true +} + // UnmarshalMessage decodes a byte slice into a message. func UnmarshalMessage(data []byte) (*Message, error) { m := &Message{} diff --git a/messaging/handler.go b/messaging/handler.go index 269feaebd3..c4448b753e 100644 --- a/messaging/handler.go +++ b/messaging/handler.go @@ -106,6 +106,7 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) { urls := h.Broker.DataURLsForTopic(topicID, index) if urls == nil { h.error(w, ErrTopicNodesNotFound, http.StatusInternalServerError) + return } // Send back a list of URLs where the topic data can be fetched. @@ -113,7 +114,7 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) { for _, u := range urls { redirects = append(redirects, u.String()) } - w.Header().Set("X-Broker-Truncated", strings.Join(redirects, ",")) + w.Header().Set("X-Broker-DataURLs", strings.Join(redirects, ",")) http.Redirect(w, req, urls[0].String(), http.StatusTemporaryRedirect) } else if err != nil { log.Printf("message stream error: %s", err)