influxdb/replications/internal/store.go

306 lines
9.2 KiB
Go

package internal
import (
"context"
"database/sql"
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/platform"
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/sqlite"
"github.com/mattn/go-sqlite3"
)
var errReplicationNotFound = &ierrors.Error{
Code: ierrors.ENotFound,
Msg: "replication not found",
}
var errMissingIDName = &ierrors.Error{
Code: ierrors.EUnprocessableEntity,
Msg: "one of remote_bucket_id, remote_bucket_name should be provided",
}
func errRemoteNotFound(id platform.ID, cause error) error {
return &ierrors.Error{
Code: ierrors.EInvalid,
Msg: fmt.Sprintf("remote %q not found", id),
Err: cause,
}
}
type Store struct {
sqlStore *sqlite.SqlStore
}
func NewStore(sqlStore *sqlite.SqlStore) *Store {
return &Store{
sqlStore: sqlStore,
}
}
func (s *Store) Lock() {
s.sqlStore.Mu.Lock()
}
func (s *Store) Unlock() {
s.sqlStore.Mu.Unlock()
}
// ListReplications returns a list of replications matching the provided filter.
func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications")
if filter.OrgID.Valid() {
q = q.Where(sq.Eq{"org_id": filter.OrgID})
}
if filter.Name != nil {
q = q.Where(sq.Eq{"name": *filter.Name})
}
if filter.RemoteID != nil {
q = q.Where(sq.Eq{"remote_id": *filter.RemoteID})
}
if filter.LocalBucketID != nil {
q = q.Where(sq.Eq{"local_bucket_id": *filter.LocalBucketID})
}
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var rs influxdb.Replications
if err := s.sqlStore.DB.SelectContext(ctx, &rs.Replications, query, args...); err != nil {
return nil, err
}
return &rs, nil
}
// CreateReplication persists a new replication in the database. Caller is responsible for managing locks.
func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
fields := sq.Eq{
"id": newID,
"org_id": request.OrgID,
"name": request.Name,
"description": request.Description,
"remote_id": request.RemoteID,
"local_bucket_id": request.LocalBucketID,
"max_queue_size_bytes": request.MaxQueueSizeBytes,
"drop_non_retryable_data": request.DropNonRetryableData,
"max_age_seconds": request.MaxAgeSeconds,
"created_at": "datetime('now')",
"updated_at": "datetime('now')",
}
if request.RemoteBucketID != platform.ID(0) {
fields["remote_bucket_id"] = request.RemoteBucketID
fields["remote_bucket_name"] = ""
} else if request.RemoteBucketName != "" {
fields["remote_bucket_id"] = nil
fields["remote_bucket_name"] = request.RemoteBucketName
} else {
return nil, errMissingIDName
}
q := sq.Insert("replications").
SetMap(fields).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var r influxdb.Replication
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil {
if sqlErr, ok := err.(sqlite3.Error); ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey {
return nil, errRemoteNotFound(request.RemoteID, err)
}
return nil, err
}
return &r, nil
}
// GetReplication gets a replication by ID from the database.
func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications").
Where(sq.Eq{"id": id})
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var r influxdb.Replication
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errReplicationNotFound
}
return nil, err
}
return &r, nil
}
// UpdateReplication updates a replication by ID. Caller is responsible for managing locks.
func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) {
updates := sq.Eq{"updated_at": sq.Expr("datetime('now')")}
if request.Name != nil {
updates["name"] = *request.Name
}
if request.Description != nil {
updates["description"] = *request.Description
}
if request.RemoteID != nil {
updates["remote_id"] = *request.RemoteID
}
if request.RemoteBucketID != nil {
updates["remote_bucket_id"] = *request.RemoteBucketID
}
if request.RemoteBucketName != nil {
updates["remote_bucket_name"] = *request.RemoteBucketName
}
if request.MaxQueueSizeBytes != nil {
updates["max_queue_size_bytes"] = *request.MaxQueueSizeBytes
}
if request.DropNonRetryableData != nil {
updates["drop_non_retryable_data"] = *request.DropNonRetryableData
}
if request.MaxAgeSeconds != nil {
updates["max_age_seconds"] = *request.MaxAgeSeconds
}
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var r influxdb.Replication
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errReplicationNotFound
}
if sqlErr, ok := err.(sqlite3.Error); ok && request.RemoteID != nil && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey {
return nil, errRemoteNotFound(*request.RemoteID, err)
}
return nil, err
}
return &r, nil
}
// UpdateResponseInfo sets the most recent HTTP status code and error message received for a replication remote write.
func (s *Store) UpdateResponseInfo(ctx context.Context, id platform.ID, code int, message string) error {
updates := sq.Eq{
"latest_response_code": code,
"latest_error_message": message,
}
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).Suffix("RETURNING id")
query, args, err := q.ToSql()
if err != nil {
return err
}
var d platform.ID
if err := s.sqlStore.DB.GetContext(ctx, &d, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errReplicationNotFound
}
return err
}
return nil
}
// DeleteReplication deletes a replication by ID from the database. Caller is responsible for managing locks.
func (s *Store) DeleteReplication(ctx context.Context, id platform.ID) error {
q := sq.Delete("replications").Where(sq.Eq{"id": id}).Suffix("RETURNING id")
query, args, err := q.ToSql()
if err != nil {
return err
}
var d platform.ID
if err := s.sqlStore.DB.GetContext(ctx, &d, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errReplicationNotFound
}
return err
}
return nil
}
// DeleteBucketReplications deletes the replications for the provided localBucketID from the database. Caller is
// responsible for managing locks. A list of deleted IDs is returned for further processing by the caller.
func (s *Store) DeleteBucketReplications(ctx context.Context, localBucketID platform.ID) ([]platform.ID, error) {
q := sq.Delete("replications").Where(sq.Eq{"local_bucket_id": localBucketID}).Suffix("RETURNING id")
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var deleted []platform.ID
if err := s.sqlStore.DB.SelectContext(ctx, &deleted, query, args...); err != nil {
return nil, err
}
return deleted, nil
}
func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*influxdb.ReplicationHTTPConfig, error) {
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.remote_bucket_name", "r.drop_non_retryable_data").
From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id)
query, args, err := q.ToSql()
if err != nil {
return nil, err
}
var rc influxdb.ReplicationHTTPConfig
if err := s.sqlStore.DB.GetContext(ctx, &rc, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, errReplicationNotFound
}
return nil, err
}
return &rc, nil
}
func (s *Store) PopulateRemoteHTTPConfig(ctx context.Context, id platform.ID, target *influxdb.ReplicationHTTPConfig) error {
q := sq.Select("remote_url", "remote_api_token", "remote_org_id", "allow_insecure_tls").
From("remotes").Where(sq.Eq{"id": id})
query, args, err := q.ToSql()
if err != nil {
return err
}
if err := s.sqlStore.DB.GetContext(ctx, target, query, args...); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return errRemoteNotFound(id, nil)
}
return err
}
return nil
}