Add continuously streaming topic readers.

pull/1935/head
Ben Johnson 2015-03-09 22:32:12 -06:00
parent 5f5c6ca297
commit 4160d0b785
3 changed files with 188 additions and 14 deletions

View File

@ -17,6 +17,11 @@ import (
"github.com/influxdb/influxdb/raft"
)
// DefaultPollInterval is the default amount of time a topic reader will wait
// between checks for new segments or new data on an existing segment. This
// only occurs when the reader is at the end of all the data.
const DefaultPollInterval = 100 * time.Millisecond
// Broker represents distributed messaging system segmented into topics.
// Each topic represents a linear series of events.
type Broker struct {
@ -892,6 +897,9 @@ type TopicReader struct {
file *os.File // current segment file handler
closed bool
// The time between file system polling to check for new segments.
PollInterval time.Duration
}
// NewTopicReader returns a new instance of TopicReader that reads segments
@ -901,6 +909,8 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader {
path: path,
index: index,
streaming: streaming,
PollInterval: DefaultPollInterval,
}
}
@ -908,19 +918,33 @@ func NewTopicReader(path string, index uint64, streaming bool) *TopicReader {
func (r *TopicReader) Read(p []byte) (int, error) {
for {
// Retrieve current segment file handle.
// If the reader is closed then return EOF.
// If we don't have a file and we're streaming then sleep and retry.
f, err := r.File()
if err != nil {
if err == ErrReaderClosed {
return 0, io.EOF
} else if err != nil {
return 0, fmt.Errorf("file: %s", err)
} else if f == nil {
if r.streaming {
time.Sleep(r.PollInterval)
continue
}
return 0, io.EOF
}
// Write data to buffer.
// Read under lock so the underlying file cannot be closed.
r.mu.Lock()
n, err := f.Read(p)
r.mu.Unlock()
// Read into buffer.
// If no more data is available, then retry with the next segment.
if n, err := r.file.Read(p); err == io.EOF {
if err == io.EOF {
if err := r.nextSegment(); err != nil {
return 0, fmt.Errorf("next segment: %s", err)
}
time.Sleep(r.PollInterval)
continue
} else {
return n, err
@ -936,7 +960,7 @@ func (r *TopicReader) File() (*os.File, error) {
// Exit if closed.
if r.closed {
return nil, nil
return nil, ErrReaderClosed
}
// If the first file hasn't been opened then open it and seek.
@ -993,30 +1017,42 @@ func (r *TopicReader) nextSegment() error {
r.mu.Lock()
defer r.mu.Unlock()
// Skip if the reader is closed.
if r.closed {
return nil
}
// Find current segment index.
index, err := strconv.ParseUint(filepath.Base(r.file.Name()), 10, 64)
if err != nil {
return fmt.Errorf("parse current segment index: %s", err)
}
// Clear file.
if r.file != nil {
r.file.Close()
r.file = nil
}
// Read current segment list.
// If no segments exist then exit.
// If current segment is the last segment then ignore.
segments, err := ReadSegments(r.path)
if err != nil {
return fmt.Errorf("read segments: %s", err)
} else if len(segments) == 0 {
return nil
} else if segments[len(segments)-1].Index == index {
if !r.streaming {
r.closed = true
}
return nil
}
// Loop over segments and find the next one.
for i := range segments[:len(segments)-1] {
if segments[i].Index == index {
// Clear current file.
if r.file != nil {
r.file.Close()
r.file = nil
}
// Open next segment.
f, err := os.Open(segments[i+1].Path)
if err != nil {
return fmt.Errorf("open next segment: %s", err)
@ -1026,8 +1062,7 @@ func (r *TopicReader) nextSegment() error {
}
}
// If we didn't find the current segment or the current segment is the
// last segment then mark the reader as closed.
// This should only occur if our current segment was deleted.
r.closed = true
return nil
}
@ -1135,14 +1170,16 @@ func NewMessageDecoder(r io.Reader) *MessageDecoder {
func (dec *MessageDecoder) Decode(m *Message) error {
// Read header bytes.
var b [messageHeaderSize]byte
if _, err := io.ReadFull(dec.r, b[:]); err != nil {
if _, err := io.ReadFull(dec.r, b[:]); err == io.EOF {
return err
} else if err != nil {
return fmt.Errorf("read header: %s", err)
}
m.unmarshalHeader(b[:])
// Read data.
if _, err := io.ReadFull(dec.r, m.Data); err != nil {
return err
return fmt.Errorf("read body: %s", err)
}
return nil

View File

@ -5,11 +5,14 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"time"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
@ -500,7 +503,138 @@ func TestTopicReader(t *testing.T) {
t.Fatalf("%d. %v: result mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.index, tt.results, results)
}
}
}
// Ensure a topic reader can stream new messages.
func TestTopicReader_streaming(t *testing.T) {
path, _ := ioutil.TempDir("", "")
defer os.RemoveAll(path)
// Start topic reader from the beginning.
r := messaging.NewTopicReader(path, 0, true)
r.PollInterval = 1 * time.Millisecond
// Write a segments with delays.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Millisecond)
MustWriteFile(filepath.Join(path, "6"),
MustMarshalMessages([]*messaging.Message{
{Index: 6},
{Index: 7},
{Index: 10},
}),
)
// Write two more segments.
time.Sleep(5 * time.Millisecond)
MustWriteFile(filepath.Join(path, "12"),
MustMarshalMessages([]*messaging.Message{
{Index: 12},
}),
)
MustWriteFile(filepath.Join(path, "13"),
MustMarshalMessages([]*messaging.Message{
{Index: 13},
{Index: 14},
}),
)
// Close reader.
time.Sleep(5 * time.Millisecond)
r.Close()
}()
// Slurp all message ids from the reader.
indices := make([]uint64, 0)
dec := messaging.NewMessageDecoder(r)
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
break
} else if err != nil {
t.Fatalf("decode error: %s", err)
} else {
indices = append(indices, m.Index)
}
}
// Verify we received the correct indices.
if !reflect.DeepEqual(indices, []uint64{6, 7, 10, 12, 13, 14}) {
t.Fatalf("unexpected indices: %#v", indices)
}
wg.Wait()
}
// Ensure multiple topic readers can read from the same topic directory.
func BenchmarkTopicReaderStreaming(b *testing.B) {
path, _ := ioutil.TempDir("", "")
defer os.RemoveAll(path)
// Configurable settings.
readerN := 10 // number of readers
messageN := b.N // total message count
dataSize := 50 // per message data size
pollInterval := 1 * time.Millisecond
// Create a topic to write into.
topic := messaging.NewTopic(1, path)
topic.MaxSegmentSize = 64 * 1024 // 64KB
if err := topic.Open(); err != nil {
b.Fatal(err)
}
defer topic.Close()
// Stream from multiple readers in parallel.
var wg sync.WaitGroup
wg.Add(readerN)
readers := make([]*messaging.TopicReader, readerN)
for i := range readers {
r := messaging.NewTopicReader(path, 0, true)
r.PollInterval = pollInterval
readers[i] = r
// Read messages in sequence.
go func(r *messaging.TopicReader) {
defer r.Close()
defer wg.Done()
var index uint64
dec := messaging.NewMessageDecoder(r)
for {
var m messaging.Message
if err := dec.Decode(&m); err == io.EOF {
b.Fatalf("unexpected EOF")
} else if err != nil {
b.Fatalf("decode error: %s", err)
} else if index+1 != m.Index {
b.Fatalf("out of order: %d..%d", index, m.Index)
}
index = m.Index
if index == uint64(messageN) {
break
}
}
}(r)
}
// Write messages into topic but stagger them by small, random intervals.
for i := 0; i < messageN; i++ {
time.Sleep(time.Duration(rand.Intn(int(pollInterval))))
index := uint64(i) + 1
if err := topic.WriteMessage(&messaging.Message{Index: index, Data: make([]byte, dataSize)}); err != nil {
b.Fatalf("write message error: %s", err)
}
}
wg.Wait()
}
// Broker is a wrapper for broker.Broker that creates the broker in a temporary location.

View File

@ -69,4 +69,7 @@ var (
// ErrStaleWrite is returned when writing a message with an old index to a topic.
ErrStaleWrite = errors.New("stale write")
// ErrReaderClosed is returned when reading from a closed topic reader.
ErrReaderClosed = errors.New("reader closed")
)