package messaging import ( "encoding/binary" "encoding/json" "fmt" "hash/fnv" "io" "log" "net/url" "os" "path/filepath" "sort" "strconv" "sync" "time" "github.com/boltdb/bolt" "github.com/influxdb/influxdb/raft" ) // DefaultPollInterval is the default amount of time a topic reader will wait // between checks for new segments or new data on an existing segment. This // only occurs when the reader is at the end of all the data. const DefaultPollInterval = 100 * time.Millisecond const DefaultTruncationInterval = 10 * time.Minute // DefaultMaxTopicSize is the largest a topic can get before truncation. const DefaultMaxTopicSize = 1024 * 1024 * 1024 // 1GB // DefaultMaxSegmentSize is the largest a segment can get before starting a new segment. const DefaultMaxSegmentSize = 10 * 1024 * 1024 // 10MB // Broker represents distributed messaging system segmented into topics. // Each topic represents a linear series of events. type Broker struct { mu sync.RWMutex path string // data directory index uint64 // highest applied index meta *bolt.DB // metadata topics map[uint64]*Topic // topics by id TruncationInterval time.Duration MaxTopicSize int64 // Maximum size of a topic in bytes MaxSegmentSize int64 // Maximum size of a segment in bytes // Goroutinte shutdown done chan struct{} wg sync.WaitGroup // Log is the distributed raft log that commands are applied to. Log interface { URL() url.URL URLs() []url.URL Leader() (uint64, url.URL) IsLeader() bool ClusterID() uint64 Apply(data []byte) (index uint64, err error) } Logger *log.Logger } // NewBroker returns a new instance of a Broker with default values. func NewBroker() *Broker { b := &Broker{ topics: make(map[uint64]*Topic), Logger: log.New(os.Stderr, "[broker] ", log.LstdFlags), TruncationInterval: DefaultTruncationInterval, MaxTopicSize: DefaultMaxTopicSize, MaxSegmentSize: DefaultMaxSegmentSize, done: make(chan struct{}), } return b } // Path returns the path used when opening the broker. // Returns empty string if the broker is not open. func (b *Broker) Path() string { return b.path } // metaPath returns the file path to the broker's metadata file. func (b *Broker) metaPath() string { if b.path == "" { return "" } return filepath.Join(b.path, "meta") } // URL returns the URL of the broker. func (b *Broker) URL() url.URL { return b.Log.URL() } // URLs returns a list of all broker URLs in the cluster. func (b *Broker) URLs() []url.URL { return b.Log.URLs() } // IsLeader returns true if the broker is the current cluster leader. func (b *Broker) IsLeader() bool { return b.Log.IsLeader() } // LeaderURL returns the URL to the leader broker. func (b *Broker) LeaderURL() url.URL { _, u := b.Log.Leader() return u } // ClusterID returns the identifier for the cluster. func (b *Broker) ClusterID() uint64 { return b.Log.ClusterID() } // TopicPath returns the file path to a topic's data. // Returns a blank string if the broker is closed. func (b *Broker) TopicPath(id uint64) string { b.mu.RLock() defer b.mu.RUnlock() return b.topicPath(id) } func (b *Broker) topicPath(id uint64) string { if b.path == "" { return "" } return filepath.Join(b.path, strconv.FormatUint(id, 10)) } // Topic returns a topic on a broker by id. // Returns nil if the topic doesn't exist or the broker is closed. func (b *Broker) Topic(id uint64) *Topic { b.mu.RLock() defer b.mu.RUnlock() return b.topics[id] } // Index returns the highest index seen by the broker across all topics. // Returns 0 if the broker is closed. func (b *Broker) Index() uint64 { b.mu.RLock() defer b.mu.RUnlock() return b.index } // opened returns true if the broker is in an open and running state. func (b *Broker) opened() bool { return b.path != "" } // Open initializes the log. // The broker then must be initialized or join a cluster before it can be used. func (b *Broker) Open(path string) error { b.mu.Lock() defer b.mu.Unlock() // Require a non-blank path. if path == "" { return ErrPathRequired } if err := func() error { b.path = path // Ensure root directory exists. if err := os.MkdirAll(path, 0777); err != nil { return fmt.Errorf("mkdir: %s", err) } // Open meta file. meta, err := bolt.Open(b.metaPath(), 0666, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return fmt.Errorf("open meta: %s", err) } b.meta = meta // Initialize data from meta store. if err := b.meta.Update(func(tx *bolt.Tx) error { tx.CreateBucketIfNotExists([]byte("meta")) // Read in index from meta store, if set. if v := tx.Bucket([]byte("meta")).Get([]byte("index")); v != nil { b.index = btou64(v) } return nil }); err != nil { return err } // Read all topic metadata into memory. if err := b.openTopics(); err != nil { return fmt.Errorf("open topics: %s", err) } // Start periodic deletion of replicated topic segments. b.startTopicTruncation() return nil }(); err != nil { _ = b.close() return err } return nil } // openTopics reads all topic metadata into memory. func (b *Broker) openTopics() error { // Read all topics from the broker directory. topics, err := ReadTopics(b.path) if err != nil { return fmt.Errorf("read topics: %s", err) } // Open each topic and append to the map. b.topics = make(map[uint64]*Topic) for _, t := range topics { if err := t.Open(); err != nil { return fmt.Errorf("open topic: id=%d, err=%s", t.id, err) } b.topics[t.id] = t b.topics[t.id].MaxSegmentSize = b.MaxSegmentSize } // Retrieve the highest index across all topics. for _, t := range b.topics { if t.index > b.index { b.index = t.index } } return nil } // Close closes the broker and all topics. func (b *Broker) Close() error { b.mu.Lock() defer b.mu.Unlock() return b.close() } func (b *Broker) close() error { // Return error if the broker is already closed. if !b.opened() { return ErrClosed } b.path = "" // Close meta data. if b.meta != nil { _ = b.meta.Close() b.meta = nil } // Close all topics. b.closeTopics() // Shutdown all goroutines. close(b.done) b.wg.Wait() b.done = nil return nil } // closeTopics closes all topic files and clears the topics map. func (b *Broker) closeTopics() { for _, t := range b.topics { _ = t.Close() } b.topics = make(map[uint64]*Topic) } // startTopicTruncation starts periodic topic truncation. func (b *Broker) startTopicTruncation() { b.wg.Add(1) go func() { defer b.wg.Done() tick := time.NewTicker(b.TruncationInterval) defer tick.Stop() for { select { case <-tick.C: b.TruncateTopics() case <-b.done: return } } }() } // TruncateTopics forces topics to truncate such that they are equal to // or less than the requested size, if possible. func (b *Broker) TruncateTopics() { for _, t := range b.topics { if n, err := t.Truncate(b.MaxTopicSize); err != nil { b.Logger.Printf("error truncating topic %s: %s", t.Path(), err.Error()) } else if n > 0 { b.Logger.Printf("topic %s, %d bytes deleted", t.Path(), n) } } return } // SetMaxIndex sets the highest index applied by the broker. // This is only used for internal log messages. Topics may have a higher index. func (b *Broker) SetMaxIndex(index uint64) error { b.mu.Lock() defer b.mu.Unlock() return b.setMaxIndex(index) } func (b *Broker) setMaxIndex(index uint64) error { // Update index in meta database. if err := b.meta.Update(func(tx *bolt.Tx) error { return tx.Bucket([]byte("meta")).Put([]byte("index"), u64tob(index)) }); err != nil { return err } // Set in-memory index. b.index = index return nil } // WriteTo writes a snapshot of the broker to w. func (b *Broker) WriteTo(w io.Writer) (int64, error) { // TODO: Prevent truncation during snapshot. // Calculate header under lock. b.mu.RLock() hdr, err := b.createSnapshotHeader() b.mu.RUnlock() if err != nil { return 0, fmt.Errorf("create snapshot: %s", err) } // Encode snapshot header. buf, err := json.Marshal(&hdr) if err != nil { return 0, fmt.Errorf("encode snapshot header: %s", err) } // Write header frame. if err := binary.Write(w, binary.BigEndian, uint32(len(buf))); err != nil { return 0, fmt.Errorf("write header size: %s", err) } if _, err := w.Write(buf); err != nil { return 0, fmt.Errorf("write header: %s", err) } // Stream each topic sequentially. for _, t := range hdr.Topics { for _, s := range t.Segments { if _, err := copyFileN(w, s.path, s.Size); err != nil { return 0, err } } } return 0, nil } // createSnapshotHeader creates a snapshot header. func (b *Broker) createSnapshotHeader() (*snapshotHeader, error) { // Create parent header. sh := &snapshotHeader{Index: b.index} // Append topics. for _, t := range b.topics { // Create snapshot topic. st := &snapshotTopic{ID: t.id} // Read segments from disk. segments, err := ReadSegments(t.path) if err != nil && !os.IsNotExist(err) { return nil, fmt.Errorf("read segments: %s", err) } // Add segments to topic. for _, s := range segments { // Retrieve current segment file size from disk. var size int64 fi, err := os.Stat(s.Path) if os.IsNotExist(err) { size = 0 } else if err == nil { size = fi.Size() } else { return nil, fmt.Errorf("stat segment: %s", err) } // Append segment. st.Segments = append(st.Segments, &snapshotTopicSegment{ Index: s.Index, Size: size, path: s.Path, }) } // Append topic to the snapshot. sh.Topics = append(sh.Topics, st) } return sh, nil } // copyFileN copies n bytes from a path to a writer. func copyFileN(w io.Writer, path string, n int64) (int64, error) { // Open file for reading. f, err := os.Open(path) if err != nil { return 0, err } defer func() { _ = f.Close() }() // Copy file up to n bytes. return io.CopyN(w, f, n) } // ReadFrom reads a broker snapshot from r. func (b *Broker) ReadFrom(r io.Reader) (int64, error) { b.mu.Lock() defer b.mu.Unlock() // Remove and recreate broker path. if err := b.reset(); err != nil && !os.IsNotExist(err) { return 0, fmt.Errorf("reset: %s", err) } else if err = os.MkdirAll(b.path, 0777); err != nil { return 0, fmt.Errorf("mkdir: %s", err) } // Read header frame. var sz uint32 if err := binary.Read(r, binary.BigEndian, &sz); err != nil { return 0, fmt.Errorf("read header size: %s", err) } buf := make([]byte, sz) if _, err := io.ReadFull(r, buf); err != nil { return 0, fmt.Errorf("read header: %s", err) } // Decode header. sh := &snapshotHeader{} if err := json.Unmarshal(buf, &sh); err != nil { return 0, fmt.Errorf("decode header: %s", err) } // Close any topics which might be open and clear them out. b.closeTopics() // Copy topic files from snapshot to local disk. for _, st := range sh.Topics { t := NewTopic(st.ID, b.topicPath(st.ID)) t.MaxSegmentSize = b.MaxSegmentSize // Create topic directory. if err := os.MkdirAll(t.Path(), 0777); err != nil { return 0, fmt.Errorf("make topic dir: %s", err) } // Copy data from snapshot into segment files. // We don't instantiate the segments because that will be done // automatically when calling Open() on the topic. for _, ss := range st.Segments { if err := func() error { // Create a new file with the starting index. f, err := os.Create(t.segmentPath(ss.Index)) if err != nil { return fmt.Errorf("open segment: %s", err) } defer func() { _ = f.Close() }() // Copy from stream into file. if _, err := io.CopyN(f, r, ss.Size); err != nil { return fmt.Errorf("copy segment: %s", err) } return nil }(); err != nil { return 0, err } } // Open topic. if err := t.Open(); err != nil { return 0, fmt.Errorf("open topic: %s", err) } b.topics[t.id] = t } // Set the highest seen index. if err := b.setMaxIndex(sh.Index); err != nil { return 0, fmt.Errorf("set max index: %s", err) } b.index = sh.Index return 0, nil } // reset removes all files in the broker directory besides the raft directory. func (b *Broker) reset() error { // Open handle to directory. f, err := os.Open(b.path) if err != nil { return err } defer func() { _ = f.Close() }() // Read directory items. fis, err := f.Readdir(0) if err != nil { return err } // Remove all files & directories besides raft. for _, fi := range fis { if fi.Name() == "raft" { continue } if err := os.RemoveAll(fi.Name()); err != nil { return fmt.Errorf("remove: %s", fi.Name()) } } return nil } // Publish writes a message. // Returns the index of the message. Otherwise returns an error. func (b *Broker) Publish(m *Message) (uint64, error) { buf, err := m.MarshalBinary() assert(err == nil, "marshal binary error: %s", err) return b.Log.Apply(buf) } // TopicReader returns a new topic reader for a topic starting from a given index. func (b *Broker) TopicReader(topicID, index uint64, streaming bool) interface { io.ReadCloser io.Seeker } { return NewTopicReader(b.TopicPath(topicID), index, streaming) } // SetTopicMaxIndex updates the highest replicated index for a topic and data URL. // If a higher index is already set on the topic then the call is ignored. // This index is only held in memory and is used for topic segment reclamation. // The higheset replicated index per data URL is tracked separately from the current index func (b *Broker) SetTopicMaxIndex(topicID, index uint64, u url.URL) error { _, err := b.Publish(&Message{ Type: SetTopicMaxIndexMessageType, Data: marshalTopicIndex(topicID, index, u), }) return err } func (b *Broker) applySetTopicMaxIndex(m *Message) { topicID, index, u := unmarshalTopicIndex(m.Data) // Track the highest replicated index for the topic, per data node URL. if t := b.topics[topicID]; t != nil { t.SetIndexForURL(index, u) } } func marshalTopicIndex(topicID, index uint64, u url.URL) []byte { s := []byte(u.String()) b := make([]byte, 16+2+len(s)) binary.BigEndian.PutUint64(b[0:8], topicID) binary.BigEndian.PutUint64(b[8:16], index) binary.BigEndian.PutUint16(b[16:18], uint16(len(s))) // URL string length n := copy(b[18:], s) // URL string assert(n == len(s), "marshal topic index too short. have %d, expectd %d", n, len(s)) return b } func unmarshalTopicIndex(b []byte) (topicID, index uint64, u url.URL) { assert(len(b) >= 18, "unmarshal topic index too short. have %d, expected %d", len(b), 20) topicID = binary.BigEndian.Uint64(b[0:8]) index = binary.BigEndian.Uint64(b[8:16]) n := binary.BigEndian.Uint16(b[16:18]) // URL length du, err := url.Parse(string(b[18 : 18+n])) // URL assert(err == nil, "unmarshal binary error: %s", err) u = *du return } // Apply executes a message against the broker. func (b *Broker) Apply(m *Message) error { b.mu.Lock() defer b.mu.Unlock() // Exit if broker isn't open. if !b.opened() { return ErrClosed } // Ensure messages with old indexes aren't re-applied. assert(m.Index > b.index, "stale apply: msg=%d, broker=%d", m.Index, b.index) // Process internal commands separately than the topic writes. switch m.Type { case SetTopicMaxIndexMessageType: b.applySetTopicMaxIndex(m) default: // Create topic if not exists. t := b.topics[m.TopicID] if t == nil { t = NewTopic(m.TopicID, b.topicPath(m.TopicID)) t.MaxSegmentSize = b.MaxSegmentSize if err := t.Open(); err != nil { return fmt.Errorf("open topic: %s", err) } b.topics[t.id] = t } // Write message to topic. if err := t.WriteMessage(m); err != nil { return fmt.Errorf("write message: %s", err) } } // Save highest applied index in memory. // Only internal messages need to have their indexes saved to disk. b.index = m.Index return nil } // snapshotHeader represents the header of a snapshot. type snapshotHeader struct { Topics []*snapshotTopic `json:"topics"` Index uint64 `json:"index"` } type snapshotTopic struct { ID uint64 `json:"id"` Segments []*snapshotTopicSegment `json:"segments"` } type snapshotTopicSegment struct { Index uint64 `json:"index"` Size int64 `json:"size"` path string } // RaftFSM is a wrapper struct around the broker that implements the raft.FSM interface. // It will panic for any errors that occur during Apply. type RaftFSM struct { Broker interface { io.WriterTo io.ReaderFrom Apply(m *Message) error Index() uint64 SetMaxIndex(uint64) error } } func (fsm *RaftFSM) Index() uint64 { return fsm.Broker.Index() } func (fsm *RaftFSM) WriteTo(w io.Writer) (n int64, err error) { return fsm.Broker.WriteTo(w) } func (fsm *RaftFSM) ReadFrom(r io.Reader) (n int64, err error) { return fsm.Broker.ReadFrom(r) } // Apply applies a raft command to the broker. func (fsm *RaftFSM) Apply(e *raft.LogEntry) error { switch e.Type { case raft.LogEntryCommand: // Decode message. m := &Message{} if err := m.UnmarshalBinary(e.Data); err != nil { panic("message unmarshal: " + err.Error()) } m.Index = e.Index // Apply message. if err := fsm.Broker.Apply(m); err != nil { return fmt.Errorf("broker apply: %s", err) } default: // Move internal index forward if it's an internal raft comand. if err := fsm.Broker.SetMaxIndex(e.Index); err != nil { return fmt.Errorf("set max index: idx=%d, err=%s", e.Index, err) } } return nil } // topic represents a single named queue of messages. // Each topic is identified by a unique path. // // Topics write their entries to segmented log files which contain a // contiguous range of entries. type Topic struct { mu sync.Mutex id uint64 // unique identifier index uint64 // current index path string // on-disk path // highest index replicated per data url. The unique set of keys across all topics // provides a snapshot of the addresses of every data node in a cluster. indexByURL map[url.URL]uint64 file *os.File // last segment writer opened bool // The largest a segment can get before splitting into a new segment. MaxSegmentSize int64 } // NewTopic returns a new instance of Topic. func NewTopic(id uint64, path string) *Topic { return &Topic{ id: id, path: path, indexByURL: make(map[url.URL]uint64), MaxSegmentSize: DefaultMaxSegmentSize, } } // ID returns the topic identifier. func (t *Topic) ID() uint64 { return t.id } // Path returns the topic path. func (t *Topic) Path() string { return t.path } // TombstonePath returns the path of the tomstone file. func (t *Topic) TombstonePath() string { return filepath.Join(t.path, "tombstone") } // Index returns the highest replicated index for the topic. func (t *Topic) Index() uint64 { t.mu.Lock() defer t.mu.Unlock() return t.index } // DataURLs returns the data node URLs subscribed to this topic func (t *Topic) DataURLs() []url.URL { t.mu.Lock() defer t.mu.Unlock() var urls []url.URL for u, _ := range t.indexByURL { urls = append(urls, u) } return urls } // IndexForURL returns the highest index replicated for a given data URL. func (t *Topic) IndexForURL(u url.URL) uint64 { t.mu.Lock() defer t.mu.Unlock() return t.indexByURL[u] } // SetIndexForURL sets the replicated index for a given data URL. func (t *Topic) SetIndexForURL(index uint64, u url.URL) { t.mu.Lock() defer t.mu.Unlock() t.indexByURL[u] = index } // SegmentPath returns the path to a segment starting with a given log index. func (t *Topic) SegmentPath(index uint64) string { t.mu.Lock() defer t.mu.Unlock() return t.segmentPath(index) } func (t *Topic) segmentPath(index uint64) string { if t.path == "" { return "" } return filepath.Join(t.path, strconv.FormatUint(index, 10)) } // Open opens a topic for writing. func (t *Topic) Open() error { t.mu.Lock() defer t.mu.Unlock() // Ensure topic is not already open and it has a path. if t.opened { return ErrTopicOpen } else if t.path == "" { return ErrPathRequired } if err := func() error { t.opened = true // Ensure the parent directory exists. if err := os.MkdirAll(t.path, 0777); err != nil { return err } // Read available segments. segments, err := ReadSegments(t.path) if err != nil && !os.IsNotExist(err) { return fmt.Errorf("read segments: %s", err) } // Read max index and open file handle if we have segments. if len(segments) > 0 { s := segments.Last() // Read the last segment and extract the last message index. index, err := RecoverSegment(s.Path) if err != nil { return fmt.Errorf("recover segment: %s", err) } t.index = index // Open file handle on the segment. f, err := os.OpenFile(s.Path, os.O_RDWR|os.O_APPEND, 0666) if err != nil { return fmt.Errorf("open segment: %s", err) } t.file = f } return nil }(); err != nil { _ = t.close() return err } return nil } // Close closes the topic and segment writer. func (t *Topic) Close() error { t.mu.Lock() defer t.mu.Unlock() return t.close() } func (t *Topic) close() error { if t.file != nil { _ = t.file.Close() t.file = nil } t.opened = false t.index = 0 return nil } // WriteMessage writes a message to the end of the topic. func (t *Topic) WriteMessage(m *Message) error { t.mu.Lock() defer t.mu.Unlock() // Return error if message index is lower than the topic's highest index. if m.Index <= t.index { return ErrStaleWrite } // Close the current file handle if it's too large. if t.file != nil { if fi, err := t.file.Stat(); err != nil { return fmt.Errorf("stat: %s", err) } else if fi.Size() > t.MaxSegmentSize { _ = t.file.Close() t.file = nil } } // Create a new segment if we have no handle. if t.file == nil { f, err := os.OpenFile(t.segmentPath(m.Index), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { return fmt.Errorf("create segment file: %s", err) } t.file = f } // Write to last segment. if _, err := m.WriteTo(t.file); err != nil { return fmt.Errorf("write segment: %s", err) } // Update index. t.index = m.Index return nil } // Truncate attempts to delete topic segments such that the total size of the topic on-disk // is equal to or less-than maxSize. Returns the number of bytes deleted, and error if any. // This function is not guaranteed to be performant. func (t *Topic) Truncate(maxSize int64) (int64, error) { t.mu.Lock() defer t.mu.Unlock() segments, err := ReadSegments(t.Path()) if err != nil { return 0, err } totalSize, err := segments.Size() if err != nil { return 0, err } // Find the highest-replicated index, this is a condition for deletion of segments // within this topic. var highestIndex uint64 for _, idx := range t.indexByURL { if idx > highestIndex { highestIndex = idx } } var nBytesDeleted int64 for { if (totalSize - nBytesDeleted) <= maxSize { break } // More than 2 segments are needed for current writes and replication checks. if len(segments) <= 2 { break } // Attempt deletion of oldest segment in topic. First and second segment needed // for this operation. first := segments[0] second := segments[1] // The first segment can only be deleted if the last index in that segment has // been replicated. The most efficient way to check this is to ensure that the // first index of the subsequent segment has been replicated. if second.Index > highestIndex { // No guarantee first segment has been replicated. break } // Deletion can proceed! size, err := first.Size() if err != nil { return nBytesDeleted, err } if err = os.Remove(first.Path); err != nil { return nBytesDeleted, err } nBytesDeleted += size segments = segments[1:] // Create tombstone to indicate that the topic has been truncated at least once. f, err := os.Create(t.TombstonePath()) if err != nil { return nBytesDeleted, err } f.Close() } return nBytesDeleted, err } // Topics represents a list of topics sorted by id. type Topics []*Topic func (a Topics) Len() int { return len(a) } func (a Topics) Less(i, j int) bool { return a[i].id < a[j].id } func (a Topics) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // ReadTopics reads all topics from a directory path. func ReadTopics(path string) (Topics, error) { // Open handle to directory. f, err := os.Open(path) if err != nil { return nil, err } defer func() { _ = f.Close() }() // Read directory items. fis, err := f.Readdir(0) if err != nil { return nil, err } // Create a topic for each directory with a numeric name. var a Topics for _, fi := range fis { // Skip non-directory paths. if !fi.IsDir() { continue } topicID, err := strconv.ParseUint(fi.Name(), 10, 64) if err != nil { continue } a = append(a, NewTopic(topicID, filepath.Join(path, fi.Name()))) } sort.Sort(a) return a, nil } // Segment represents a contiguous section of a topic log. type Segment struct { Index uint64 // starting index of the segment and name Path string // path to the segment file. } // Size returns the file size of the segment. func (s *Segment) Size() (int64, error) { fi, err := os.Stat(s.Path) if err != nil { return 0, err } return fi.Size(), nil } // Segments represents a list of segments sorted by index. type Segments []*Segment // Last returns the last segment in the slice. // Returns nil if there are no segments. func (a Segments) Last() *Segment { if len(a) == 0 { return nil } return a[len(a)-1] } func (a Segments) Size() (int64, error) { var size int64 for _, s := range a { sz, err := s.Size() if err != nil { return 0, nil } size += sz } return size, nil } func (a Segments) Len() int { return len(a) } func (a Segments) Less(i, j int) bool { return a[i].Index < a[j].Index } func (a Segments) Swap(i, j int) { a[i], a[j] = a[j], a[i] } // ReadSegments reads all segments from a directory path. func ReadSegments(path string) (Segments, error) { // Open handle to directory. f, err := os.Open(path) if err != nil { return nil, err } defer func() { _ = f.Close() }() // Read directory items. fis, err := f.Readdir(0) if err != nil { return nil, err } // Create a segment for each file with a numeric name. var a Segments for _, fi := range fis { index, err := strconv.ParseUint(fi.Name(), 10, 64) if err != nil { continue } a = append(a, &Segment{ Index: index, Path: filepath.Join(path, fi.Name()), }) } sort.Sort(a) return a, nil } // ReadSegmentByIndex returns the segment that contains a given index. func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { // Find a list of all segments. segments, err := ReadSegments(path) if os.IsNotExist(err) { return nil, err } else if err != nil { return nil, fmt.Errorf("read segments: %s", err) } // If there are no segments then ignore. // If index is zero then start from the first segment. // If index is less than the first segment range then return error. if len(segments) == 0 { return nil, nil } else if index == 0 { return segments[0], nil } else if index < segments[0].Index { return segments[0], nil } // Find segment that contains index. for i := range segments[:len(segments)-1] { if index >= segments[i].Index && index < segments[i+1].Index { return segments[i], nil } } // If no segment ranged matched then return the last segment. return segments[len(segments)-1], nil } // RecoverSegment parses the entire segment and truncates at any partial messages. // Returns the last index seen in the segment. func RecoverSegment(path string) (uint64, error) { // Open segment file. f, err := os.Open(path) if os.IsNotExist(err) { return 0, err } else if err != nil { return 0, fmt.Errorf("open: %s", err) } defer func() { _ = f.Close() }() // Read all messages until the end. dec := NewMessageDecoder(f) index := uint64(0) for { var m Message if err := dec.Decode(&m); err == io.EOF { return index, nil } else if err == io.ErrUnexpectedEOF || err == ErrChecksum { // The decoder will unread any partially read data so we can // simply truncate at current position. if n, err := f.Seek(0, os.SEEK_CUR); err != nil { return 0, fmt.Errorf("seek: %s", err) } else if err := os.Truncate(path, n); err != nil { return 0, fmt.Errorf("truncate: n=%d, err=%s", n-1, err) } return index, nil } else if err != nil { return 0, fmt.Errorf("decode: %s", err) } index = m.Index } } // TopicReader reads data on a single topic from a given index. type TopicReader struct { mu sync.Mutex path string // topic directory path index uint64 // starting index streaming bool // true if reader should hang and wait for new messages file *os.File // current segment file handler closed bool // The time between file system polling to check for new segments. PollInterval time.Duration } // NewTopicReader returns a new instance of TopicReader that reads segments // from a path starting from a given index. func NewTopicReader(path string, index uint64, streaming bool) *TopicReader { return &TopicReader{ path: path, index: index, streaming: streaming, PollInterval: DefaultPollInterval, } } // Seek seeks to a position the current segment. func (r *TopicReader) Seek(offset int64, whence int) (int64, error) { assert(whence == os.SEEK_CUR, "topic reader can only seek to a relative position") r.mu.Lock() defer r.mu.Unlock() if r.file == nil { return 0, nil } return r.file.Seek(offset, whence) } // Read reads the next bytes from the reader into the buffer. func (r *TopicReader) Read(p []byte) (int, error) { for { // Retrieve current segment file handle. // If the reader is closed then return EOF. // If we don't have a file and we're streaming then sleep and retry. f, err := r.File() if err == ErrReaderClosed { return 0, io.EOF } else if err != nil { return 0, fmt.Errorf("file: %s", err) } else if f == nil { if r.streaming { time.Sleep(r.PollInterval) continue } return 0, io.EOF } // Read under lock so the underlying file cannot be closed. r.mu.Lock() n, err := f.Read(p) r.mu.Unlock() // Read into buffer. // If no more data is available, then retry with the next segment. if err == io.EOF { if err := r.nextSegment(); err != nil { return 0, fmt.Errorf("next segment: %s", err) } time.Sleep(r.PollInterval) continue } else { return n, err } } } // File returns the current segment file handle. // Returns nil when there is no more data left. func (r *TopicReader) File() (*os.File, error) { r.mu.Lock() defer r.mu.Unlock() // Exit if closed. if r.closed { return nil, ErrReaderClosed } // If the first file hasn't been opened then open it and seek. if r.file == nil { // Find the segment containing the index. // Exit if no segments are available or if path not found. segment, err := ReadSegmentByIndex(r.path, r.index) if os.IsNotExist(err) { return nil, nil } else if err != nil { return nil, fmt.Errorf("segment by index: %s", err) } else if segment == nil { return nil, nil } // Open that segment file. f, err := os.Open(segment.Path) if err != nil { return nil, fmt.Errorf("open: %s", err) } // Seek to index. if err := r.seekAfterIndex(f, r.index); err != nil { _ = f.Close() return nil, fmt.Errorf("seek to index: %s", err) } // Save file handle and segment name. r.file = f } return r.file, nil } // seekAfterIndex moves a segment file to the message after a given index. func (r *TopicReader) seekAfterIndex(f *os.File, seek uint64) error { dec := NewMessageDecoder(f) for { var m Message if err := dec.Decode(&m); err == io.EOF { return nil } else if err != nil { return err } else if m.Index >= seek { // Seek to message start. if _, err := f.Seek(-int64(MessageChecksumSize+MessageHeaderSize+len(m.Data)), os.SEEK_CUR); err != nil { return fmt.Errorf("seek: %s", err) } return nil } } } // nextSegment closes the current segment's file handle and opens the next segment. func (r *TopicReader) nextSegment() error { r.mu.Lock() defer r.mu.Unlock() // Skip if the reader is closed. if r.closed { return nil } // Find current segment index. index, err := strconv.ParseUint(filepath.Base(r.file.Name()), 10, 64) if err != nil { return fmt.Errorf("parse current segment index: %s", err) } // Read current segment list. // If no segments exist then exit. // If current segment is the last segment then ignore. segments, err := ReadSegments(r.path) if os.IsNotExist(err) { return nil } else if err != nil { return fmt.Errorf("read segments: %s", err) } else if len(segments) == 0 { return nil } else if segments[len(segments)-1].Index == index { if !r.streaming { r.closed = true } return nil } // Loop over segments and find the next one. for i := range segments[:len(segments)-1] { if segments[i].Index == index { // Clear current file. if r.file != nil { r.file.Close() r.file = nil } // Open next segment. f, err := os.Open(segments[i+1].Path) if err != nil { return fmt.Errorf("open next segment: %s", err) } r.file = f return nil } } // This should only occur if our current segment was deleted. r.closed = true return nil } // Close closes the reader. func (r *TopicReader) Close() error { r.mu.Lock() defer r.mu.Unlock() // Close current handle. if r.file != nil { _ = r.file.Close() r.file = nil } // Mark reader as closed. r.closed = true return nil } // MessageType represents the type of message. type MessageType uint16 // BrokerMessageType is a flag set on broker messages to prevent them // from being passed through to topics. const BrokerMessageType = 0x8000 const ( SetTopicMaxIndexMessageType = BrokerMessageType | MessageType(0x00) ) const ( // MessageChecksumSize is the size of the encoded message checksum, in bytes. MessageChecksumSize = 4 // MessageHeaderSize is the size of the encoded message header, in bytes. MessageHeaderSize = 2 + 8 + 8 + 4 ) // Message represents a single item in a topic. type Message struct { Type MessageType TopicID uint64 Index uint64 Data []byte } // WriteTo encodes and writes the message to a writer. Implements io.WriterTo. func (m *Message) WriteTo(w io.Writer) (int64, error) { b, err := m.MarshalBinary() if err != nil { return 0, err } n, err := w.Write(b) return int64(n), err } // MarshalBinary returns a binary representation of the message. // This implements encoding.BinaryMarshaler. An error cannot be returned. func (m *Message) MarshalBinary() ([]byte, error) { // Encode message. b := make([]byte, MessageChecksumSize+MessageHeaderSize+len(m.Data)) copy(b[MessageChecksumSize:], m.marshalHeader()) copy(b[MessageChecksumSize+MessageHeaderSize:], m.Data) // Calculate and write the checksum. h := fnv.New32a() h.Write(b[MessageChecksumSize:]) binary.BigEndian.PutUint32(b, h.Sum32()) return b, nil } // UnmarshalBinary reads a message from a binary encoded slice. // This implements encoding.BinaryUnmarshaler. func (m *Message) UnmarshalBinary(b []byte) error { // Read checksum. if len(b) < MessageChecksumSize { return fmt.Errorf("message checksum too short: %d < %d", len(b), MessageChecksumSize) } sum := binary.BigEndian.Uint32(b) // Read header. if len(b)-MessageChecksumSize < MessageHeaderSize { return fmt.Errorf("message header too short: %d < %d", len(b)-MessageChecksumSize, MessageHeaderSize) } m.unmarshalHeader(b[MessageChecksumSize:]) // Read data. if len(b)-MessageChecksumSize-MessageHeaderSize < len(m.Data) { return fmt.Errorf("message data too short: %d < %d", len(b)-MessageChecksumSize-MessageHeaderSize, len(m.Data)) } copy(m.Data, b[MessageChecksumSize+MessageHeaderSize:]) // Verify checksum. h := fnv.New32a() h.Write(b[MessageChecksumSize:]) if h.Sum32() != sum { return ErrChecksum } return nil } // marshalHeader returns a byte slice with the message header. func (m *Message) marshalHeader() []byte { b := make([]byte, MessageHeaderSize) binary.BigEndian.PutUint16(b[0:2], uint16(m.Type)) binary.BigEndian.PutUint64(b[2:10], m.TopicID) binary.BigEndian.PutUint64(b[10:18], m.Index) binary.BigEndian.PutUint32(b[18:22], uint32(len(m.Data))) return b } // unmarshalHeader reads message header data from binary encoded slice. // The data field is appropriately sized but is not filled. func (m *Message) unmarshalHeader(b []byte) { m.Type = MessageType(binary.BigEndian.Uint16(b[0:2])) m.TopicID = binary.BigEndian.Uint64(b[2:10]) m.Index = binary.BigEndian.Uint64(b[10:18]) m.Data = make([]byte, binary.BigEndian.Uint32(b[18:22])) } // MessageDecoder decodes messages from a reader. type MessageDecoder struct { r io.ReadSeeker } // NewMessageDecoder returns a new instance of the MessageDecoder. func NewMessageDecoder(r io.ReadSeeker) *MessageDecoder { return &MessageDecoder{r: r} } // Decode reads a message from the decoder's reader. func (dec *MessageDecoder) Decode(m *Message) error { // Read header bytes. // Unread if there is a partial read. var b [MessageChecksumSize + MessageHeaderSize]byte if n, err := io.ReadFull(dec.r, b[:]); err == io.EOF { return err } else if err == io.ErrUnexpectedEOF { if _, err := dec.r.Seek(-int64(n), os.SEEK_CUR); err != nil { return fmt.Errorf("cannot unread header: n=%d, err=%s", n, err) } warnf("unexpected eof(0): len=%d, n=%d, err=%s", len(b[:]), n, err) return err } else if err != nil { return fmt.Errorf("read header: %s", err) } // Read checksum. sum := binary.BigEndian.Uint32(b[:]) // Read header. m.unmarshalHeader(b[MessageChecksumSize:]) // Read data. if n, err := io.ReadFull(dec.r, m.Data); err == io.EOF || err == io.ErrUnexpectedEOF { if _, err := dec.r.Seek(-int64(len(b)+n), os.SEEK_CUR); err != nil { return fmt.Errorf("cannot unread header+data: n=%d, err=%s", n, err) } warnf("unexpected eof(1): len=%d, n=%d, err=%s", len(m.Data), n, err) return io.ErrUnexpectedEOF } else if err != nil { return fmt.Errorf("read data: %s", err) } // Verify checksum. h := fnv.New32a() h.Write(b[MessageChecksumSize:]) h.Write(m.Data) if h.Sum32() != sum { if _, err := dec.r.Seek(-int64(len(b)+len(m.Data)), os.SEEK_CUR); err != nil { return fmt.Errorf("cannot unread after checksum: err=%s", err) } return ErrChecksum } return nil } // UnmarshalMessage decodes a byte slice into a message. func UnmarshalMessage(data []byte) (*Message, error) { m := &Message{} if err := m.UnmarshalBinary(data); err != nil { return nil, err } return m, nil } // assert will panic with a given formatted message if the given condition is false. func assert(condition bool, msg string, v ...interface{}) { if !condition { panic(fmt.Sprintf("assert failed: "+msg, v...)) } } // u64tob converts a uint64 into an 8-byte slice. func u64tob(v uint64) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, v) return b } // btou64 converts an 8-byte slice into an uint64. func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }