2015-06-05 05:03:15 +00:00
|
|
|
package hh
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/binary"
|
2015-09-29 01:33:31 +00:00
|
|
|
"expvar"
|
2015-06-05 05:03:15 +00:00
|
|
|
"fmt"
|
2015-10-07 08:56:55 +00:00
|
|
|
"io"
|
2015-06-05 05:03:15 +00:00
|
|
|
"io/ioutil"
|
2015-06-08 15:09:10 +00:00
|
|
|
"log"
|
2015-06-05 05:03:15 +00:00
|
|
|
"os"
|
|
|
|
"path/filepath"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
2015-06-05 16:37:36 +00:00
|
|
|
"time"
|
2015-06-05 05:03:15 +00:00
|
|
|
|
2015-09-29 01:33:31 +00:00
|
|
|
"github.com/influxdb/influxdb"
|
2015-09-16 20:33:08 +00:00
|
|
|
"github.com/influxdb/influxdb/models"
|
2015-06-05 05:03:15 +00:00
|
|
|
"github.com/influxdb/influxdb/tsdb"
|
|
|
|
)
|
|
|
|
|
2015-09-29 01:33:31 +00:00
|
|
|
const (
|
2015-10-06 23:28:25 +00:00
|
|
|
pointsHint = "points_hint"
|
|
|
|
pointsWrite = "points_write"
|
|
|
|
bytesWrite = "bytes_write"
|
|
|
|
writeErr = "write_err"
|
|
|
|
unmarshalErr = "unmarshal_err"
|
|
|
|
advanceErr = "advance_err"
|
|
|
|
currentErr = "current_err"
|
2015-09-29 01:33:31 +00:00
|
|
|
)
|
|
|
|
|
2015-06-05 05:03:15 +00:00
|
|
|
type Processor struct {
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
2015-06-05 16:37:36 +00:00
|
|
|
dir string
|
|
|
|
maxSize int64
|
|
|
|
maxAge time.Duration
|
|
|
|
retryRateLimit int64
|
|
|
|
|
2015-10-08 22:07:31 +00:00
|
|
|
queues map[uint64]*queue
|
|
|
|
meta metaStore
|
|
|
|
writer shardWriter
|
|
|
|
metastore metaStore
|
|
|
|
Logger *log.Logger
|
2015-09-29 01:33:31 +00:00
|
|
|
|
|
|
|
// Shard-level and node-level HH stats.
|
|
|
|
shardStatMaps map[uint64]*expvar.Map
|
|
|
|
nodeStatMaps map[uint64]*expvar.Map
|
2015-06-05 16:37:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type ProcessorOptions struct {
|
|
|
|
MaxSize int64
|
|
|
|
RetryRateLimit int64
|
2015-06-05 05:03:15 +00:00
|
|
|
}
|
|
|
|
|
2015-10-08 22:07:31 +00:00
|
|
|
func NewProcessor(dir string, writer shardWriter, metastore metaStore, options ProcessorOptions) (*Processor, error) {
|
2015-06-05 05:03:15 +00:00
|
|
|
p := &Processor{
|
2015-09-29 01:33:31 +00:00
|
|
|
dir: dir,
|
|
|
|
queues: map[uint64]*queue{},
|
|
|
|
writer: writer,
|
2015-10-08 22:07:31 +00:00
|
|
|
metastore: metastore,
|
2015-09-29 01:33:31 +00:00
|
|
|
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
|
|
|
shardStatMaps: make(map[uint64]*expvar.Map),
|
|
|
|
nodeStatMaps: make(map[uint64]*expvar.Map),
|
2015-06-05 05:03:15 +00:00
|
|
|
}
|
2015-06-05 16:37:36 +00:00
|
|
|
p.setOptions(options)
|
2015-06-05 05:03:15 +00:00
|
|
|
|
|
|
|
// Create the root directory if it doesn't already exist.
|
|
|
|
if err := os.MkdirAll(dir, 0700); err != nil {
|
|
|
|
return nil, fmt.Errorf("mkdir all: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := p.loadQueues(); err != nil {
|
|
|
|
return p, err
|
|
|
|
}
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
2015-06-05 16:37:36 +00:00
|
|
|
func (p *Processor) setOptions(options ProcessorOptions) {
|
|
|
|
p.maxSize = DefaultMaxSize
|
|
|
|
if options.MaxSize != 0 {
|
|
|
|
p.maxSize = options.MaxSize
|
|
|
|
}
|
|
|
|
|
|
|
|
p.retryRateLimit = DefaultRetryRateLimit
|
|
|
|
if options.RetryRateLimit != 0 {
|
|
|
|
p.retryRateLimit = options.RetryRateLimit
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-06-05 05:03:15 +00:00
|
|
|
func (p *Processor) loadQueues() error {
|
|
|
|
files, err := ioutil.ReadDir(p.dir)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, file := range files {
|
|
|
|
nodeID, err := strconv.ParseUint(file.Name(), 10, 64)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := p.addQueue(nodeID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
Fully lock HH node queue creation
I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows.
During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated.
Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file.
Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files.
This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner.
2015-10-07 09:14:18 +00:00
|
|
|
// addQueue adds a hinted-handoff queue for the given node. This function is not thread-safe
|
|
|
|
// and the caller must ensure this function is not called concurrently.
|
2015-06-05 05:03:15 +00:00
|
|
|
func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
|
|
|
|
path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10))
|
|
|
|
if err := os.MkdirAll(path, 0700); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
queue, err := newQueue(path, p.maxSize)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if err := queue.Open(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
p.queues[nodeID] = queue
|
2015-09-29 01:33:31 +00:00
|
|
|
|
|
|
|
// Create node stats for this queue.
|
|
|
|
key := fmt.Sprintf("hh_processor:node:%d", nodeID)
|
|
|
|
tags := map[string]string{"nodeID": strconv.FormatUint(nodeID, 10)}
|
|
|
|
p.nodeStatMaps[nodeID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
2015-06-05 05:03:15 +00:00
|
|
|
return queue, nil
|
|
|
|
}
|
|
|
|
|
Fully lock HH node queue creation
I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows.
During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated.
Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file.
Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files.
This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner.
2015-10-07 09:14:18 +00:00
|
|
|
// WriteShard writes hinted-handoff data for the given shard and node. Since it may manipulate
|
|
|
|
// hinted-handoff queues, and be called concurrently, it takes a lock during queue access.
|
2015-09-16 20:33:08 +00:00
|
|
|
func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) error {
|
Fully lock HH node queue creation
I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows.
During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated.
Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file.
Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files.
This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner.
2015-10-07 09:14:18 +00:00
|
|
|
p.mu.RLock()
|
2015-06-05 05:03:15 +00:00
|
|
|
queue, ok := p.queues[ownerID]
|
Fully lock HH node queue creation
I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows.
During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated.
Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file.
Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files.
This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner.
2015-10-07 09:14:18 +00:00
|
|
|
p.mu.RUnlock()
|
2015-06-05 05:03:15 +00:00
|
|
|
if !ok {
|
Fully lock HH node queue creation
I believe this change address the issues with hinted-handoff not fully replicating all data to nodes that come back online after an outage.. A detailed explanation follows.
During testing of of hinted-handoff (HH) under various scenarios, HH stats showed that the HH Processor was occasionally encountering errors while unmarshalling hinted data. This error was not handled completely correctly, and in clusters with more than 3 nodes, this could cause the HH service to stall until the node was restarted. This was the high-level reason why HH data was not being replicated.
Furthermore by watching, at the byte-level, the hinted-handoff data it could be seen that HH segment block lengths were getting randomly set to 0, but the block data itself was fine (Block data contains hinted writes). This was the root cause of the unmarshalling errors outlined above. This, in turn, was tracked down to the HH system opening each segment file multiple times concurrently, which was not file-level thread-safe, so these mutiple open calls were corrupting the file.
Finally, the reason a segment file was being opened multiple times in parallel was because WriteShard on the HH Processor was checking for node queues in an unsafe manner. Since WriteShard can be called concurrently this was adding queues for the same node more than once, and each queue-addition results in opening segment files.
This change fixes the locking in WriteShard such the check for an existing HH queue for a given node is performed in a synchronized manner.
2015-10-07 09:14:18 +00:00
|
|
|
if err := func() error {
|
|
|
|
// Check again under write-lock.
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
|
|
queue, ok = p.queues[ownerID]
|
|
|
|
if !ok {
|
|
|
|
var err error
|
|
|
|
if queue, err = p.addQueue(ownerID); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}(); err != nil {
|
2015-06-05 05:03:15 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-09-29 01:33:31 +00:00
|
|
|
// Update stats
|
|
|
|
p.updateShardStats(shardID, pointsHint, int64(len(points)))
|
|
|
|
p.nodeStatMaps[ownerID].Add(pointsHint, int64(len(points)))
|
|
|
|
|
2015-06-05 05:03:15 +00:00
|
|
|
b := p.marshalWrite(shardID, points)
|
|
|
|
return queue.Append(b)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *Processor) Process() error {
|
|
|
|
p.mu.RLock()
|
|
|
|
defer p.mu.RUnlock()
|
|
|
|
|
2015-10-08 22:07:31 +00:00
|
|
|
activeQueues, err := p.activeQueues()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
res := make(chan error, len(activeQueues))
|
|
|
|
for nodeID, q := range activeQueues {
|
2015-06-08 21:05:44 +00:00
|
|
|
go func(nodeID uint64, q *queue) {
|
2015-06-09 18:23:05 +00:00
|
|
|
|
|
|
|
// Log how many writes we successfully sent at the end
|
|
|
|
var sent int
|
2015-06-09 18:45:50 +00:00
|
|
|
start := time.Now()
|
|
|
|
defer func(start time.Time) {
|
2015-06-09 18:23:05 +00:00
|
|
|
if sent > 0 {
|
2015-06-09 18:45:50 +00:00
|
|
|
p.Logger.Printf("%d queued writes sent to node %d in %s", sent, nodeID, time.Since(start))
|
2015-06-09 18:23:05 +00:00
|
|
|
}
|
2015-06-09 18:45:50 +00:00
|
|
|
}(start)
|
2015-06-09 18:23:05 +00:00
|
|
|
|
|
|
|
limiter := NewRateLimiter(p.retryRateLimit)
|
2015-06-08 21:05:44 +00:00
|
|
|
for {
|
|
|
|
// Get the current block from the queue
|
|
|
|
buf, err := q.Current()
|
|
|
|
if err != nil {
|
2015-10-07 08:56:55 +00:00
|
|
|
if err != io.EOF {
|
|
|
|
p.nodeStatMaps[nodeID].Add(currentErr, 1)
|
|
|
|
}
|
2015-06-08 21:05:44 +00:00
|
|
|
res <- nil
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// unmarshal the byte slice back to shard ID and points
|
|
|
|
shardID, points, err := p.unmarshalWrite(buf)
|
|
|
|
if err != nil {
|
2015-10-06 23:28:25 +00:00
|
|
|
p.nodeStatMaps[nodeID].Add(unmarshalErr, 1)
|
2015-07-27 23:19:55 +00:00
|
|
|
p.Logger.Printf("unmarshal write failed: %v", err)
|
|
|
|
if err := q.Advance(); err != nil {
|
2015-10-06 23:28:25 +00:00
|
|
|
p.nodeStatMaps[nodeID].Add(advanceErr, 1)
|
2015-07-27 23:19:55 +00:00
|
|
|
res <- err
|
|
|
|
}
|
2015-10-06 23:14:09 +00:00
|
|
|
|
|
|
|
// Skip and try the next block.
|
|
|
|
continue
|
2015-06-08 21:05:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Try to send the write to the node
|
2015-06-10 18:19:50 +00:00
|
|
|
if err := p.writer.WriteShard(shardID, nodeID, points); err != nil && tsdb.IsRetryable(err) {
|
2015-10-06 23:28:25 +00:00
|
|
|
p.nodeStatMaps[nodeID].Add(writeErr, 1)
|
2015-06-08 21:05:44 +00:00
|
|
|
p.Logger.Printf("remote write failed: %v", err)
|
|
|
|
res <- nil
|
|
|
|
break
|
|
|
|
}
|
2015-09-29 01:33:31 +00:00
|
|
|
p.updateShardStats(shardID, pointsWrite, int64(len(points)))
|
|
|
|
p.nodeStatMaps[nodeID].Add(pointsWrite, int64(len(points)))
|
2015-06-08 21:05:44 +00:00
|
|
|
|
2015-06-08 22:51:48 +00:00
|
|
|
// If we get here, the write succeeded so advance the queue to the next item
|
2015-06-08 21:05:44 +00:00
|
|
|
if err := q.Advance(); err != nil {
|
2015-10-06 23:28:25 +00:00
|
|
|
p.nodeStatMaps[nodeID].Add(advanceErr, 1)
|
2015-06-08 21:05:44 +00:00
|
|
|
res <- err
|
|
|
|
return
|
|
|
|
}
|
2015-06-09 18:23:05 +00:00
|
|
|
|
|
|
|
sent += 1
|
|
|
|
|
|
|
|
// Update how many bytes we've sent
|
|
|
|
limiter.Update(len(buf))
|
2015-09-29 01:33:31 +00:00
|
|
|
p.updateShardStats(shardID, bytesWrite, int64(len(buf)))
|
|
|
|
p.nodeStatMaps[nodeID].Add(bytesWrite, int64(len(buf)))
|
2015-06-09 18:23:05 +00:00
|
|
|
|
|
|
|
// Block to maintain the throughput rate
|
|
|
|
time.Sleep(limiter.Delay())
|
|
|
|
|
2015-06-05 05:03:15 +00:00
|
|
|
}
|
2015-06-08 21:05:44 +00:00
|
|
|
}(nodeID, q)
|
|
|
|
}
|
2015-06-05 05:03:15 +00:00
|
|
|
|
2015-10-08 22:07:31 +00:00
|
|
|
for range activeQueues {
|
2015-06-08 21:05:44 +00:00
|
|
|
err := <-res
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2015-06-05 05:03:15 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-09-16 20:33:08 +00:00
|
|
|
func (p *Processor) marshalWrite(shardID uint64, points []models.Point) []byte {
|
2015-06-05 05:03:15 +00:00
|
|
|
b := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(b, shardID)
|
|
|
|
for _, p := range points {
|
|
|
|
b = append(b, []byte(p.String())...)
|
|
|
|
b = append(b, '\n')
|
|
|
|
}
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
2015-09-16 20:33:08 +00:00
|
|
|
func (p *Processor) unmarshalWrite(b []byte) (uint64, []models.Point, error) {
|
2015-07-27 23:19:55 +00:00
|
|
|
if len(b) < 8 {
|
|
|
|
return 0, nil, fmt.Errorf("too short: len = %d", len(b))
|
|
|
|
}
|
2015-06-05 05:03:15 +00:00
|
|
|
ownerID := binary.BigEndian.Uint64(b[:8])
|
2015-09-16 20:33:08 +00:00
|
|
|
points, err := models.ParsePoints(b[8:])
|
2015-06-05 05:03:15 +00:00
|
|
|
return ownerID, points, err
|
|
|
|
}
|
2015-06-05 17:27:21 +00:00
|
|
|
|
2015-09-29 01:33:31 +00:00
|
|
|
func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) {
|
|
|
|
m, ok := p.shardStatMaps[shardID]
|
|
|
|
if !ok {
|
|
|
|
key := fmt.Sprintf("hh_processor:shard:%d", shardID)
|
|
|
|
tags := map[string]string{"shardID": strconv.FormatUint(shardID, 10)}
|
|
|
|
p.shardStatMaps[shardID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
|
|
|
m = p.shardStatMaps[shardID]
|
|
|
|
}
|
|
|
|
m.Add(stat, inc)
|
|
|
|
}
|
|
|
|
|
2015-10-08 22:07:31 +00:00
|
|
|
func (p *Processor) activeQueues() (map[uint64]*queue, error) {
|
|
|
|
queues := make(map[uint64]*queue)
|
|
|
|
for id, q := range p.queues {
|
|
|
|
ni, err := p.metastore.Node(id)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if ni != nil {
|
|
|
|
queues[id] = q
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return queues, nil
|
|
|
|
}
|
|
|
|
|
2015-06-05 17:27:21 +00:00
|
|
|
func (p *Processor) PurgeOlderThan(when time.Duration) error {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
|
|
for _, queue := range p.queues {
|
|
|
|
if err := queue.PurgeOlderThan(time.Now().Add(-when)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2015-10-09 00:44:45 +00:00
|
|
|
|
|
|
|
func (p *Processor) PurgeInactiveOlderThan(when time.Duration) error {
|
|
|
|
p.mu.Lock()
|
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
|
|
for nodeID, queue := range p.queues {
|
|
|
|
// Only delete queues for inactive nodes.
|
|
|
|
ni, err := p.metastore.Node(nodeID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if ni != nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
last, err := queue.LastModified()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if last.Before(time.Now().Add(-when)) {
|
|
|
|
// Close and remove the queue.
|
|
|
|
if err := queue.Close(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := queue.Remove(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-10-09 23:30:20 +00:00
|
|
|
delete(p.queues, nodeID)
|
2015-10-09 00:44:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|