fix: sql scan error on remote bucket id when replication to 1.x (#23826)
parent
3ac7a10aa9
commit
55b7d29e4f
|
@ -107,11 +107,11 @@ func (r *UpdateReplicationRequest) OK() error {
|
||||||
// ReplicationHTTPConfig contains all info needed by a client to make HTTP requests against the
|
// ReplicationHTTPConfig contains all info needed by a client to make HTTP requests against the
|
||||||
// remote bucket targeted by a replication.
|
// remote bucket targeted by a replication.
|
||||||
type ReplicationHTTPConfig struct {
|
type ReplicationHTTPConfig struct {
|
||||||
RemoteURL string `db:"remote_url"`
|
RemoteURL string `db:"remote_url"`
|
||||||
RemoteToken string `db:"remote_api_token"`
|
RemoteToken string `db:"remote_api_token"`
|
||||||
RemoteOrgID platform.ID `db:"remote_org_id"`
|
RemoteOrgID platform.ID `db:"remote_org_id"`
|
||||||
AllowInsecureTLS bool `db:"allow_insecure_tls"`
|
AllowInsecureTLS bool `db:"allow_insecure_tls"`
|
||||||
RemoteBucketID platform.ID `db:"remote_bucket_id"`
|
RemoteBucketID *platform.ID `db:"remote_bucket_id"`
|
||||||
RemoteBucketName string `db:"remote_bucket_name"`
|
RemoteBucketName string `db:"remote_bucket_name"`
|
||||||
DropNonRetryableData bool `db:"drop_non_retryable_data"`
|
DropNonRetryableData bool `db:"drop_non_retryable_data"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,7 +46,7 @@ var (
|
||||||
RemoteToken: replication.RemoteID.String(),
|
RemoteToken: replication.RemoteID.String(),
|
||||||
RemoteOrgID: platform.ID(888888),
|
RemoteOrgID: platform.ID(888888),
|
||||||
AllowInsecureTLS: true,
|
AllowInsecureTLS: true,
|
||||||
RemoteBucketID: *replication.RemoteBucketID,
|
RemoteBucketID: replication.RemoteBucketID,
|
||||||
}
|
}
|
||||||
newRemoteID = platform.ID(200)
|
newRemoteID = platform.ID(200)
|
||||||
newQueueSize = influxdb.MinReplicationMaxQueueSizeBytes
|
newQueueSize = influxdb.MinReplicationMaxQueueSizeBytes
|
||||||
|
|
|
@ -208,9 +208,11 @@ func PostWrite(ctx context.Context, config *influxdb.ReplicationHTTPConfig, data
|
||||||
conf.HTTPClient.Timeout = timeout
|
conf.HTTPClient.Timeout = timeout
|
||||||
client := api.NewAPIClient(conf).WriteApi
|
client := api.NewAPIClient(conf).WriteApi
|
||||||
|
|
||||||
bucket := config.RemoteBucketID.String()
|
var bucket string
|
||||||
if config.RemoteBucketName != "" {
|
if config.RemoteBucketID == nil || config.RemoteBucketName != "" {
|
||||||
bucket = config.RemoteBucketName
|
bucket = config.RemoteBucketName
|
||||||
|
} else {
|
||||||
|
bucket = config.RemoteBucketID.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
req := client.PostWrite(ctx).
|
req := client.PostWrite(ctx).
|
||||||
|
|
|
@ -170,7 +170,7 @@ func (s *service) ValidateNewReplication(ctx context.Context, request influxdb.C
|
||||||
return errLocalBucketNotFound(request.LocalBucketID, err)
|
return errLocalBucketNotFound(request.LocalBucketID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
config := influxdb.ReplicationHTTPConfig{RemoteBucketID: request.RemoteBucketID}
|
config := influxdb.ReplicationHTTPConfig{RemoteBucketID: &request.RemoteBucketID}
|
||||||
if err := s.store.PopulateRemoteHTTPConfig(ctx, request.RemoteID, &config); err != nil {
|
if err := s.store.PopulateRemoteHTTPConfig(ctx, request.RemoteID, &config); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ func (s *service) ValidateUpdatedReplication(ctx context.Context, id platform.ID
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if request.RemoteBucketID != nil {
|
if request.RemoteBucketID != nil {
|
||||||
baseConfig.RemoteBucketID = *request.RemoteBucketID
|
baseConfig.RemoteBucketID = request.RemoteBucketID
|
||||||
}
|
}
|
||||||
|
|
||||||
if request.RemoteID != nil {
|
if request.RemoteID != nil {
|
||||||
|
|
|
@ -94,7 +94,7 @@ var (
|
||||||
RemoteToken: replication1.RemoteID.String(),
|
RemoteToken: replication1.RemoteID.String(),
|
||||||
RemoteOrgID: platform.ID(888888),
|
RemoteOrgID: platform.ID(888888),
|
||||||
AllowInsecureTLS: true,
|
AllowInsecureTLS: true,
|
||||||
RemoteBucketID: *replication1.RemoteBucketID,
|
RemoteBucketID: replication1.RemoteBucketID,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -295,7 +295,7 @@ func TestValidateNewReplication(t *testing.T) {
|
||||||
|
|
||||||
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.req.LocalBucketID).Return(nil, tt.bucketErr)
|
mocks.bucketSvc.EXPECT().FindBucketByID(gomock.Any(), tt.req.LocalBucketID).Return(nil, tt.bucketErr)
|
||||||
|
|
||||||
testConfig := &influxdb.ReplicationHTTPConfig{RemoteBucketID: tt.req.RemoteBucketID}
|
testConfig := &influxdb.ReplicationHTTPConfig{RemoteBucketID: &tt.req.RemoteBucketID}
|
||||||
if tt.bucketErr == nil {
|
if tt.bucketErr == nil {
|
||||||
mocks.serviceStore.EXPECT().PopulateRemoteHTTPConfig(gomock.Any(), tt.req.RemoteID, testConfig).Return(tt.storeErr)
|
mocks.serviceStore.EXPECT().PopulateRemoteHTTPConfig(gomock.Any(), tt.req.RemoteID, testConfig).Return(tt.storeErr)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue