Wrap open logic in anonymous functions.
parent
96748cb217
commit
b045ad5d92
|
@ -132,43 +132,47 @@ func (b *Broker) Open(path string) error {
|
|||
if path == "" {
|
||||
return ErrPathRequired
|
||||
}
|
||||
b.path = path
|
||||
|
||||
// Ensure root directory exists.
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
b.close()
|
||||
return fmt.Errorf("mkdir: %s", err)
|
||||
}
|
||||
if err := func() error {
|
||||
b.path = path
|
||||
|
||||
// Open meta file.
|
||||
meta, err := bolt.Open(b.metaPath(), 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
b.close()
|
||||
return fmt.Errorf("open meta: %s", err)
|
||||
}
|
||||
b.meta = meta
|
||||
// Ensure root directory exists.
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return fmt.Errorf("mkdir: %s", err)
|
||||
}
|
||||
|
||||
// Initialize data from meta store.
|
||||
if err := b.meta.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists([]byte("meta"))
|
||||
// Open meta file.
|
||||
meta, err := bolt.Open(b.metaPath(), 0666, &bolt.Options{Timeout: 1 * time.Second})
|
||||
if err != nil {
|
||||
return fmt.Errorf("open meta: %s", err)
|
||||
}
|
||||
b.meta = meta
|
||||
|
||||
// Read in index from meta store, if set.
|
||||
if v := tx.Bucket([]byte("meta")).Get([]byte("index")); v != nil {
|
||||
b.index = btou64(v)
|
||||
// Initialize data from meta store.
|
||||
if err := b.meta.Update(func(tx *bolt.Tx) error {
|
||||
tx.CreateBucketIfNotExists([]byte("meta"))
|
||||
|
||||
// Read in index from meta store, if set.
|
||||
if v := tx.Bucket([]byte("meta")).Get([]byte("index")); v != nil {
|
||||
b.index = btou64(v)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all topic metadata into memory.
|
||||
if err := b.openTopics(); err != nil {
|
||||
return fmt.Errorf("open topics: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
}(); err != nil {
|
||||
_ = b.close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Read all topic metadata into memory.
|
||||
if err := b.openTopics(); err != nil {
|
||||
b.close()
|
||||
return fmt.Errorf("open topics: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -674,42 +678,46 @@ func (t *Topic) Open() error {
|
|||
} else if t.path == "" {
|
||||
return ErrPathRequired
|
||||
}
|
||||
t.opened = true
|
||||
|
||||
// Ensure the parent directory exists.
|
||||
if err := os.MkdirAll(t.path, 0777); err != nil {
|
||||
t.close()
|
||||
if err := func() error {
|
||||
t.opened = true
|
||||
|
||||
// Ensure the parent directory exists.
|
||||
if err := os.MkdirAll(t.path, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read available segments.
|
||||
segments, err := ReadSegments(t.path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("read segments: %s", err)
|
||||
}
|
||||
|
||||
// Read max index and open file handle if we have segments.
|
||||
if len(segments) > 0 {
|
||||
s := segments.Last()
|
||||
|
||||
// Read the last segment and extract the last message index.
|
||||
index, err := ReadSegmentMaxIndex(s.Path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("read segment max index: %s", err)
|
||||
}
|
||||
t.index = index
|
||||
|
||||
// Open file handle on the segment.
|
||||
f, err := os.OpenFile(s.Path, os.O_RDWR|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open segment: %s", err)
|
||||
}
|
||||
t.file = f
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
_ = t.close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Read available segments.
|
||||
segments, err := ReadSegments(t.path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
t.close()
|
||||
return fmt.Errorf("read segments: %s", err)
|
||||
}
|
||||
|
||||
// Read max index and open file handle if we have segments.
|
||||
if len(segments) > 0 {
|
||||
s := segments.Last()
|
||||
|
||||
// Read the last segment and extract the last message index.
|
||||
index, err := ReadSegmentMaxIndex(s.Path)
|
||||
if err != nil {
|
||||
t.close()
|
||||
return fmt.Errorf("read segment max index: %s", err)
|
||||
}
|
||||
t.index = index
|
||||
|
||||
// Open file handle on the segment.
|
||||
f, err := os.OpenFile(s.Path, os.O_RDWR|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
t.close()
|
||||
return fmt.Errorf("open segment: %s", err)
|
||||
}
|
||||
t.file = f
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -132,15 +132,21 @@ func (c *Client) Open(path string) error {
|
|||
return ErrClientOpen
|
||||
}
|
||||
|
||||
// Read URLs from file if no URLs are provided.
|
||||
c.path = path
|
||||
if err := c.loadConfig(); err != nil {
|
||||
_ = c.close()
|
||||
return fmt.Errorf("load config: %s", err)
|
||||
}
|
||||
if err := func() error {
|
||||
// Read URLs from file if no URLs are provided.
|
||||
c.path = path
|
||||
if err := c.loadConfig(); err != nil {
|
||||
return fmt.Errorf("load config: %s", err)
|
||||
}
|
||||
|
||||
// Set open flag.
|
||||
c.opened = true
|
||||
// Set open flag.
|
||||
c.opened = true
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
_ = c.close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Start background ping.
|
||||
c.closing = make(chan struct{}, 0)
|
||||
|
|
Loading…
Reference in New Issue