Integrate PR #2433 review comments

pull/2433/head
Philip O'Toole 2015-04-30 16:14:49 -07:00
parent 6460d52440
commit cc3f8f1c90
2 changed files with 12 additions and 9 deletions

View File

@ -756,10 +756,7 @@ func (t *Topic) TombstonePath() string { return filepath.Join(t.path, "tombstone
// Truncated returns whether the topic has even been truncated. // Truncated returns whether the topic has even been truncated.
func (t *Topic) Truncated() bool { func (t *Topic) Truncated() bool {
if _, err := os.Stat(t.TombstonePath()); os.IsNotExist(err) { return tombstoneExists(t.path)
return false
}
return true
} }
// Index returns the highest replicated index for the topic. // Index returns the highest replicated index for the topic.
@ -1138,10 +1135,7 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
} }
// Determine if this topic has been truncated. // Determine if this topic has been truncated.
var truncated bool truncated := tombstoneExists(path)
if _, err := os.Stat(filepath.Join(path, "tombstone")); !os.IsNotExist(err) {
truncated = true
}
// If the requested index is less than that available, one of two things will // If the requested index is less than that available, one of two things will
// happen. If the topic has been truncated it means that topic data may exist // happen. If the topic has been truncated it means that topic data may exist
@ -1587,6 +1581,14 @@ func (dec *MessageDecoder) Decode(m *Message) error {
return nil return nil
} }
// tombstoneExists returns whether the given directory contains a tombstone
func tombstoneExists(path string) bool {
if _, err := os.Stat(filepath.Join(path, "tombstone")); os.IsNotExist(err) {
return false
}
return true
}
// UnmarshalMessage decodes a byte slice into a message. // UnmarshalMessage decodes a byte slice into a message.
func UnmarshalMessage(data []byte) (*Message, error) { func UnmarshalMessage(data []byte) (*Message, error) {
m := &Message{} m := &Message{}

View File

@ -106,6 +106,7 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) {
urls := h.Broker.DataURLsForTopic(topicID, index) urls := h.Broker.DataURLsForTopic(topicID, index)
if urls == nil { if urls == nil {
h.error(w, ErrTopicNodesNotFound, http.StatusInternalServerError) h.error(w, ErrTopicNodesNotFound, http.StatusInternalServerError)
return
} }
// Send back a list of URLs where the topic data can be fetched. // Send back a list of URLs where the topic data can be fetched.
@ -113,7 +114,7 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) {
for _, u := range urls { for _, u := range urls {
redirects = append(redirects, u.String()) redirects = append(redirects, u.String())
} }
w.Header().Set("X-Broker-Truncated", strings.Join(redirects, ",")) w.Header().Set("X-Broker-DataURLs", strings.Join(redirects, ","))
http.Redirect(w, req, urls[0].String(), http.StatusTemporaryRedirect) http.Redirect(w, req, urls[0].String(), http.StatusTemporaryRedirect)
} else if err != nil { } else if err != nil {
log.Printf("message stream error: %s", err) log.Printf("message stream error: %s", err)