Return redirect to client if topic is truncated

pull/2433/head
Philip O'Toole 2015-04-24 22:50:33 -07:00 committed by Philip O'Toole
parent 6eefb9ffdc
commit 8ab44301b9
3 changed files with 45 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package messaging
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"io"
@ -32,6 +33,16 @@ const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB
// DefaultMaxSegmentSize is the largest a segment can get before starting a new segment.
const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB
var (
// ErrTopicTruncated is returned when topic data is unavailable due to truncation.
ErrTopicTruncated = errors.New("topic truncated")
// ErrTopicNodesNotFound is returned when requested topic data has been truncated
// but there are no nodes in the cluster that can provide a replica. This is a system
// failure and should not occur on a healthy replicated system.
ErrTopicNodesNotFound = errors.New("topic nodes not found")
)
// Broker represents distributed messaging system segmented into topics.
// Each topic represents a linear series of events.
type Broker struct {
@ -1126,14 +1137,30 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) {
return nil, fmt.Errorf("read segments: %s", err)
}
// If there are no segments then ignore.
// If index is zero then start from the first segment.
// If index is less than the first segment range then return error.
// Determine if this topic has been truncated.
var truncated bool
if _, err := os.Stat(filepath.Join(path, "tombstone")); !os.IsNotExist(err) {
truncated = true
}
// If the requested index is less than that available, one of two things will
// happen. If the topic has been truncated it means that topic data may exist
// somewhere on the cluster that is not available here. Therefore this broker
// cannot provide the data. If truncation has never taken place however, then
// this broker can safely provide whatever data it has.
if len(segments) == 0 {
if truncated {
return nil, ErrTopicTruncated
}
return nil, nil
} else if index == 0 {
return segments[0], nil
} else if index < segments[0].Index {
}
// Is requested index lower than the broker can provide?
if index < segments[0].Index {
if truncated {
return nil, ErrTopicTruncated
}
return segments[0], nil
}

View File

@ -23,6 +23,7 @@ type Handler struct {
io.ReadCloser
io.Seeker
}
DataURLsForTopic(id, index uint64) []url.URL
Publish(m *Message) (uint64, error)
SetTopicMaxIndex(topicID, index uint64, u url.URL) error
}
@ -100,7 +101,14 @@ func (h *Handler) getMessages(w http.ResponseWriter, req *http.Request) {
// Write out all data from the topic reader.
// Automatically flush as reads come in.
if _, err := CopyFlush(w, r); err != nil {
if _, err := CopyFlush(w, r); err == ErrTopicTruncated {
// Broker unable to provide data for the requested index, provide another URL.
urls := h.Broker.DataURLsForTopic(topicID, index)
if urls == nil {
h.error(w, ErrTopicNodesNotFound, http.StatusInternalServerError)
}
http.Redirect(w, req, urls[0].String(), http.StatusTemporaryRedirect)
} else if err != nil {
log.Printf("message stream error: %s", err)
}
}

View File

@ -296,6 +296,9 @@ func (b *HandlerBroker) TopicReader(topicID, index uint64, streaming bool) inter
func (b *HandlerBroker) SetTopicMaxIndex(topicID, index uint64, dataURL url.URL) error {
return b.SetTopicMaxIndexFunc(topicID, index, dataURL)
}
func (b *HandlerBroker) DataURLsForTopic(id, index uint64) []url.URL {
return nil
}
// MustParseURL parses a string into a URL. Panic on error.
func MustParseURL(s string) *url.URL {