influxdb/replications/internal/queue_management_test.go

375 lines
11 KiB
Go
Raw Normal View History

package internal
import (
"os"
"path/filepath"
"testing"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
)
var (
id1 = platform.ID(1)
id2 = platform.ID(2)
maxQueueSizeBytes = 3 * influxdb.DefaultReplicationMaxQueueSizeBytes
)
func TestCreateNewQueueDirExists(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestEnqueueScan(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData))
require.NoError(t, err)
}
func TestEnqueueScanMultiple(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
// Enqueue some data
testData := "weather,location=us-midwest temperature=82 1465839830100400200"
qm.writeFunc = getTestWriteFunc(t, testData)
err = qm.EnqueueData(id1, []byte(testData))
require.NoError(t, err)
err = qm.EnqueueData(id1, []byte(testData))
require.NoError(t, err)
}
func TestCreateNewQueueDuplicateID(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
// Try to initialize another queue with the same replication ID
err = qm.InitializeQueue(id1, maxQueueSizeBytes)
require.EqualError(t, err, "durable queue already exists for replication ID \"0000000000000001\"")
}
func TestDeleteQueueDirRemoved(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create a valid new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Delete queue and make sure its queue has been deleted from disk
err = qm.DeleteQueue(id1)
require.NoError(t, err)
require.NoDirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestDeleteQueueNonexistentID(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Delete nonexistent queue
err := qm.DeleteQueue(id1)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
func TestUpdateMaxQueueSizeNonexistentID(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Update nonexistent queue
err := qm.UpdateMaxQueueSize(id1, influxdb.DefaultReplicationMaxQueueSizeBytes)
require.EqualError(t, err, "durable queue not found for replication ID \"0000000000000001\"")
}
func TestStartReplicationQueue(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Represents the replications tracked in sqlite, this one is tracked
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue is stored in map
require.NotNil(t, qm.replicationQueues[id1])
// Ensure queue is open by trying to remove, will error if open
err = qm.replicationQueues[id1].queue.Remove()
require.Errorf(t, err, "queue is open")
}
func TestStartReplicationQueuePartialDelete(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create new queue
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Represents the replications tracked in sqlite, replication above is not tracked (not present in map)
trackedReplications := make(map[platform.ID]int64)
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue is not stored in map
require.Nil(t, qm.replicationQueues[id1])
// Check for queue on disk, should be removed
require.NoDirExists(t, filepath.Join(queuePath, id1.String()))
}
func TestStartReplicationQueuesMultiple(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Represents the replications tracked in sqlite, both replications above are tracked
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
trackedReplications[id2] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure both queues are stored in map
require.NotNil(t, qm.replicationQueues[id1])
require.NotNil(t, qm.replicationQueues[id2])
// Make sure both queues are present on disk
require.DirExists(t, filepath.Join(queuePath, id1.String()))
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Ensure both queues are open by trying to remove, will error if open
err = qm.replicationQueues[id1].queue.Remove()
require.Errorf(t, err, "queue is open")
err = qm.replicationQueues[id2].queue.Remove()
require.Errorf(t, err, "queue is open")
}
func TestStartReplicationQueuesMultipleWithPartialDelete(t *testing.T) {
t.Parallel()
queuePath, qm := initQueueManager(t)
defer os.RemoveAll(filepath.Dir(queuePath))
// Create queue1
err := qm.InitializeQueue(id1, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id1.String()))
// Create queue2
err = qm.InitializeQueue(id2, maxQueueSizeBytes)
require.NoError(t, err)
require.DirExists(t, filepath.Join(queuePath, id2.String()))
// Represents the replications tracked in sqlite, queue1 is tracked and queue2 is not
trackedReplications := make(map[platform.ID]int64)
trackedReplications[id1] = maxQueueSizeBytes
// Simulate server shutdown by closing all queues and clearing replicationQueues map
shutdown(t, qm)
// Call startup function
err = qm.StartReplicationQueues(trackedReplications)
require.NoError(t, err)
// Make sure queue1 is in replicationQueues map and queue2 is not
require.NotNil(t, qm.replicationQueues[id1])
require.Nil(t, qm.replicationQueues[id2])
// Make sure queue1 is present on disk and queue2 has been removed
require.DirExists(t, filepath.Join(queuePath, id1.String()))
require.NoDirExists(t, filepath.Join(queuePath, id2.String()))
// Ensure queue1 is open by trying to remove, will error if open
err = qm.replicationQueues[id1].queue.Remove()
require.Errorf(t, err, "queue is open")
}
func initQueueManager(t *testing.T) (string, *durableQueueManager) {
t.Helper()
enginePath, err := os.MkdirTemp("", "engine")
require.NoError(t, err)
queuePath := filepath.Join(enginePath, "replicationq")
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
return queuePath, qm
}
func shutdown(t *testing.T, qm *durableQueueManager) {
t.Helper()
// Close all queues
err := qm.CloseAll()
require.NoError(t, err)
// Clear replication queues map
emptyMap := make(map[platform.ID]*replicationQueue)
qm.replicationQueues = emptyMap
}
func getTestWriteFunc(t *testing.T, expected string) func([]byte) error {
t.Helper()
return func(b []byte) error {
require.Equal(t, expected, string(b))
return nil
}
}
func TestEnqueueData(t *testing.T) {
t.Parallel()
queuePath, err := os.MkdirTemp("", "testqueue")
require.NoError(t, err)
defer os.RemoveAll(queuePath)
logger := zaptest.NewLogger(t)
qm := NewDurableQueueManager(logger, queuePath, WriteFunc)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(queuePath, id1.String()))
sizes, err := qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
// Empty queues are 8 bytes for the footer.
require.Equal(t, map[platform.ID]int64{id1: 8}, sizes)
data := "some fake data"
// close the scanner goroutine to specifically test EnqueueData()
rq, ok := qm.replicationQueues[id1]
require.True(t, ok)
close(rq.done)
go func() { <-rq.receive }() // absorb the receive to avoid testcase deadlock
require.NoError(t, qm.EnqueueData(id1, []byte(data)))
sizes, err = qm.CurrentQueueSizes([]platform.ID{id1})
require.NoError(t, err)
require.Greater(t, sizes[id1], int64(8))
written, err := qm.replicationQueues[id1].queue.Current()
require.NoError(t, err)
require.Equal(t, data, string(written))
}
func TestGoroutineReceives(t *testing.T) {
t.Parallel()
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(path, id1.String()))
rq, ok := qm.replicationQueues[id1]
require.True(t, ok)
require.NotNil(t, rq)
close(rq.done) // atypical from normal behavior, but lets us receive channels to test
go func() { require.NoError(t, qm.EnqueueData(id1, []byte("1234"))) }()
select {
case <-rq.receive:
return
case <-time.After(time.Second):
t.Fatal("Test timed out")
return
}
}
func TestGoroutineCloses(t *testing.T) {
t.Parallel()
path, qm := initQueueManager(t)
defer os.RemoveAll(path)
require.NoError(t, qm.InitializeQueue(id1, maxQueueSizeBytes))
require.DirExists(t, filepath.Join(path, id1.String()))
rq, ok := qm.replicationQueues[id1]
require.True(t, ok)
require.NotNil(t, rq)
require.NoError(t, qm.CloseAll())
// wg should be zero here, indicating that the goroutine has closed
// if this does not panic, then the routine is still active
require.Panics(t, func() { rq.wg.Add(-1) })
}