package internal import ( "errors" "fmt" "io" "io/fs" "math" "os" "path/filepath" "sync" "time" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/pkg/durablequeue" "github.com/influxdata/influxdb/v2/replications/metrics" "github.com/influxdata/influxdb/v2/replications/remotewrite" "go.uber.org/zap" ) const ( scannerAdvanceInterval = 10 * time.Second purgeInterval = 60 * time.Second defaultMaxAge = 7 * 24 * time.Hour // 1 week ) type remoteWriter interface { Write(data []byte, attempt int) (time.Duration, error) } type replicationQueue struct { id platform.ID orgID platform.ID localBucketID platform.ID queue *durablequeue.Queue wg sync.WaitGroup done chan struct{} receive chan struct{} logger *zap.Logger metrics *metrics.ReplicationsMetrics remoteWriter remoteWriter failedWrites int maxAge time.Duration } type durableQueueManager struct { replicationQueues map[platform.ID]*replicationQueue logger *zap.Logger queuePath string mutex sync.RWMutex metrics *metrics.ReplicationsMetrics configStore remotewrite.HttpConfigStore } var errStartup = errors.New("startup tasks for replications durable queue management failed, see server logs for details") var errShutdown = errors.New("shutdown tasks for replications durable queues failed, see server logs for details") // NewDurableQueueManager creates a new durableQueueManager struct, for managing durable queues associated with // replication streams. func NewDurableQueueManager(log *zap.Logger, queuePath string, metrics *metrics.ReplicationsMetrics, configStore remotewrite.HttpConfigStore) *durableQueueManager { replicationQueues := make(map[platform.ID]*replicationQueue) os.MkdirAll(queuePath, 0777) return &durableQueueManager{ replicationQueues: replicationQueues, logger: log, queuePath: queuePath, metrics: metrics, configStore: configStore, } } // InitializeQueue creates and opens a new durable queue which is associated with a replication stream. func (qm *durableQueueManager) InitializeQueue(replicationID platform.ID, maxQueueSizeBytes int64, orgID platform.ID, localBucketID platform.ID, maxAgeSeconds int64) error { qm.mutex.Lock() defer qm.mutex.Unlock() // Check for duplicate replication ID if _, exists := qm.replicationQueues[replicationID]; exists { return fmt.Errorf("durable queue already exists for replication ID %q", replicationID) } // Set up path for new queue on disk dir := filepath.Join( qm.queuePath, replicationID.String(), ) if err := os.MkdirAll(dir, 0777); err != nil { return err } // Create a new durable queue newQueue, err := durablequeue.NewQueue( dir, maxQueueSizeBytes, durablequeue.DefaultSegmentSize, &durablequeue.SharedCount{}, durablequeue.MaxWritesPending, func(bytes []byte) error { return nil }, ) if err != nil { return err } // Open the new queue if err := newQueue.Open(); err != nil { return err } // Map new durable queue and scanner to its corresponding replication stream via replication ID rq := qm.newReplicationQueue(replicationID, orgID, localBucketID, newQueue, maxAgeSeconds) qm.replicationQueues[replicationID] = rq rq.Open() qm.logger.Debug("Created new durable queue for replication stream", zap.String("id", replicationID.String()), zap.String("path", dir)) return nil } func (rq *replicationQueue) Open() { rq.wg.Add(1) go rq.run() } func (rq *replicationQueue) Close() error { close(rq.receive) close(rq.done) rq.wg.Wait() // wait for goroutine to finish processing all messages return rq.queue.Close() } func (rq *replicationQueue) run() { defer rq.wg.Done() retry := time.NewTimer(math.MaxInt64) purgeTicker := time.NewTicker(purgeInterval) sendWrite := func() time.Duration { for { waitForRetry, shouldRetry := rq.SendWrite() if !shouldRetry { return math.MaxInt64 } // immediately retry if the wait time is zero if waitForRetry == 0 { continue } return waitForRetry } } for { select { case <-rq.done: // end the goroutine when done is messaged return case <-rq.receive: // run the scanner on data append // Receive channel has a buffer to prevent a potential race condition where rq.SendWrite has reached EOF and will // return false, but data is queued after evaluating the scanner and before the loop is ready to select on the // receive channel again. This would result in data remaining unprocessed in the queue until the next send on the // receive channel since the send to the receive channel in qm.EnqueueData is non-blocking. The buffer ensures // that rq.SendWrite will be called again in this situation and not leave data in the queue. Outside of this // specific scenario, the buffer might result in an extra call to rq.SendWrite that will immediately return on // EOF. retryTime := sendWrite() if !retry.Stop() { <-retry.C } retry.Reset(retryTime) case <-retry.C: retryTime := sendWrite() retry.Reset(retryTime) case <-purgeTicker.C: if rq.maxAge != 0 { rq.queue.PurgeOlderThan(time.Now().Add(-rq.maxAge)) } } } } // SendWrite processes data enqueued into the durablequeue.Queue. // SendWrite is responsible for processing all data in the queue at the time of calling. func (rq *replicationQueue) SendWrite() (waitForRetry time.Duration, shouldRetry bool) { // Any error in creating the scanner should exit the loop in run() // Either it is io.EOF indicating no data, or some other failure in making // the Scanner object that we don't know how to handle. scan, err := rq.queue.NewScanner() if err != nil { if !errors.Is(err, io.EOF) { rq.logger.Error("Error creating replications queue scanner", zap.Error(err)) } return 0, false } advanceScanner := func() error { if _, err = scan.Advance(); err != nil { if err != io.EOF { rq.logger.Error("Error in replication queue scanner", zap.Error(err)) } return err } rq.metrics.Dequeue(rq.id, rq.queue.TotalBytes()) return nil } ticker := time.NewTicker(scannerAdvanceInterval) defer ticker.Stop() for scan.Next() { if err := scan.Err(); err != nil { if errors.Is(err, io.EOF) { // An io.EOF error here indicates that there is no more data left to process, and is an expected error. return 0, false } // Any other error here indicates a problem reading the data from the queue, so we log the error and drop the data // with a call to scan.Advance() later. rq.logger.Info("Segment read error.", zap.Error(scan.Err())) } if waitForRetry, err := rq.remoteWriter.Write(scan.Bytes(), rq.failedWrites); err != nil { rq.failedWrites++ // We failed the remote write. Do not advance the scanner rq.logger.Error("Error in replication stream", zap.Error(err), zap.Int("retries", rq.failedWrites)) return waitForRetry, true } // a successful write resets the number of failed write attempts to zero rq.failedWrites = 0 // Advance the scanner periodically to prevent extended runs of local writes without updating the underlying queue // position. select { case <-ticker.C: if err := advanceScanner(); err != nil { return 0, false } default: } } if err := advanceScanner(); err != nil { return 0, false } return 0, true } // DeleteQueue deletes a durable queue and its associated data on disk. func (qm *durableQueueManager) DeleteQueue(replicationID platform.ID) error { qm.mutex.Lock() defer qm.mutex.Unlock() if _, exist := qm.replicationQueues[replicationID]; !exist { return fmt.Errorf("durable queue not found for replication ID %q", replicationID) } rq := qm.replicationQueues[replicationID] // Close the queue if err := rq.Close(); err != nil { return err } qm.logger.Debug("Closed replication stream durable queue", zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir())) // Delete any enqueued, un-flushed data on disk for this queue if err := rq.queue.Remove(); err != nil { return err } qm.logger.Debug("Deleted data associated with replication stream durable queue", zap.String("id", replicationID.String()), zap.String("path", rq.queue.Dir())) // Remove entry from replicationQueues map delete(qm.replicationQueues, replicationID) return nil } // UpdateMaxQueueSize updates the maximum size of a durable queue. func (qm *durableQueueManager) UpdateMaxQueueSize(replicationID platform.ID, maxQueueSizeBytes int64) error { qm.mutex.RLock() defer qm.mutex.RUnlock() if _, exist := qm.replicationQueues[replicationID]; !exist { return fmt.Errorf("durable queue not found for replication ID %q", replicationID) } if err := qm.replicationQueues[replicationID].queue.SetMaxSize(maxQueueSizeBytes); err != nil { return err } return nil } // CurrentQueueSizes returns the current size-on-disk for the requested set of durable queues. func (qm *durableQueueManager) CurrentQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) { qm.mutex.RLock() defer qm.mutex.RUnlock() sizes := make(map[platform.ID]int64, len(ids)) for _, id := range ids { if _, exist := qm.replicationQueues[id]; !exist { return nil, fmt.Errorf("durable queue not found for replication ID %q", id) } sizes[id] = qm.replicationQueues[id].queue.DiskUsage() } return sizes, nil } // Returns the remaining number of bytes in Queue to be read: func (qm *durableQueueManager) RemainingQueueSizes(ids []platform.ID) (map[platform.ID]int64, error) { qm.mutex.RLock() defer qm.mutex.RUnlock() sizes := make(map[platform.ID]int64, len(ids)) for _, id := range ids { if _, exist := qm.replicationQueues[id]; !exist { return nil, fmt.Errorf("durable queue not found for replication ID %q", id) } sizes[id] = qm.replicationQueues[id].queue.TotalBytes() } return sizes, nil } // StartReplicationQueues updates the durableQueueManager.replicationQueues map, fully removing any partially deleted // queues (present on disk, but not tracked in sqlite), opening all current queues, and logging info for each. func (qm *durableQueueManager) StartReplicationQueues(trackedReplications map[platform.ID]*influxdb.TrackedReplication) error { errOccurred := false for id, repl := range trackedReplications { // Re-initialize a queue struct for each replication stream from sqlite queue, err := durablequeue.NewQueue( filepath.Join(qm.queuePath, id.String()), repl.MaxQueueSizeBytes, durablequeue.DefaultSegmentSize, &durablequeue.SharedCount{}, durablequeue.MaxWritesPending, func(bytes []byte) error { return nil }, ) if err != nil { qm.logger.Error("failed to initialize replication stream durable queue", zap.Error(err)) errOccurred = true continue } // Open and map the queue struct to its replication ID if err := queue.Open(); err != nil { // This could have errored after a backup/restore (we do not persist the replicationq). // Check if the dir exists, create if it doesn't, then open and carry on if pErr, ok := err.(*fs.PathError); ok { path := pErr.Path if _, err := os.Stat(path); err != nil && os.IsNotExist(err) { if err := os.MkdirAll(path, 0777); err != nil { qm.logger.Error("error attempting to recreate missing replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path)) errOccurred = true continue } if err := queue.Open(); err != nil { qm.logger.Error("error attempting to open replication queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", path)) errOccurred = true continue } qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds) qm.replicationQueues[id].Open() qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir())) } } else { qm.logger.Error("failed to open replication stream durable queue", zap.Error(err), zap.String("id", id.String()), zap.String("path", queue.Dir())) errOccurred = true } } else { qm.replicationQueues[id] = qm.newReplicationQueue(id, repl.OrgID, repl.LocalBucketID, queue, repl.MaxAgeSeconds) qm.replicationQueues[id].Open() qm.logger.Info("Opened replication stream", zap.String("id", id.String()), zap.String("path", queue.Dir())) } } if errOccurred { return errStartup } // Get contents of replicationq directory entries, err := os.ReadDir(qm.queuePath) if err != nil { return err } for _, entry := range entries { // Skip over non-relevant entries (must be a dir named with a replication ID) if !entry.IsDir() { continue } id, err := platform.IDFromString(entry.Name()) if err != nil { continue } // Partial delete found, needs to be fully removed if _, exist := qm.replicationQueues[*id]; !exist { if err := os.RemoveAll(filepath.Join(qm.queuePath, id.String())); err != nil { qm.logger.Error("failed to remove durable queue during partial delete cleanup", zap.Error(err), zap.String("id", id.String())) errOccurred = true } } } if errOccurred { return errStartup } else { return nil } } // CloseAll loops through all current replication stream queues and closes them without deleting on-disk resources func (qm *durableQueueManager) CloseAll() error { errOccurred := false for id, replicationQueue := range qm.replicationQueues { if err := replicationQueue.Close(); err != nil { qm.logger.Error("failed to close durable queue", zap.Error(err), zap.String("id", id.String())) errOccurred = true } } if errOccurred { return errShutdown } else { return nil } } // EnqueueData persists a set of bytes to a replication's durable queue. func (qm *durableQueueManager) EnqueueData(replicationID platform.ID, data []byte, numPoints int) (err error) { qm.mutex.RLock() defer qm.mutex.RUnlock() // Update metrics if data fails to be added to queue defer func() { if err != nil { qm.metrics.EnqueueError(replicationID, len(data), numPoints) } }() rq, ok := qm.replicationQueues[replicationID] if !ok { return fmt.Errorf("durable queue not found for replication ID %q", replicationID) } if err := rq.queue.Append(data); err != nil { return err } // Update metrics for this replication queue when adding data to the queue. qm.metrics.EnqueueData(replicationID, len(data), numPoints, rq.queue.TotalBytes()) // Send to the replication receive channel if it is not full to activate the queue processing. If the receive channel // is full, don't block further writes and return. select { case qm.replicationQueues[replicationID].receive <- struct{}{}: default: } return nil } func (qm *durableQueueManager) newReplicationQueue(id platform.ID, orgID platform.ID, localBucketID platform.ID, queue *durablequeue.Queue, maxAgeSeconds int64) *replicationQueue { logger := qm.logger.With(zap.String("replication_id", id.String())) done := make(chan struct{}) // check for max age minimum var maxAgeTime time.Duration if maxAgeSeconds < 0 { maxAgeTime = defaultMaxAge } else { maxAgeTime = time.Duration(maxAgeSeconds) * time.Second } return &replicationQueue{ id: id, orgID: orgID, localBucketID: localBucketID, queue: queue, done: done, receive: make(chan struct{}, 1), logger: logger, metrics: qm.metrics, remoteWriter: remotewrite.NewWriter(id, qm.configStore, qm.metrics, logger, done), maxAge: maxAgeTime, } } // GetReplications returns the ids of all currently registered replication streams matching the provided orgID // and localBucketID func (qm *durableQueueManager) GetReplications(orgID platform.ID, localBucketID platform.ID) []platform.ID { replications := make([]platform.ID, 0, len(qm.replicationQueues)) for _, repl := range qm.replicationQueues { if repl.orgID == orgID && repl.localBucketID == localBucketID { replications = append(replications, repl.id) } } return replications }