Issue #1060476 by Arantxio, RoSk0, Renrhaf, ravi.shankar, mogtofu33, daffie, jordan.jamous, intrafusion, vsujeetkumar, tostinni, stefan.r, asad_ahmed, ozden_meren, jaredsmith, ridhimaabrol24, dark_ixion, quietone, ericsol, David_Rothstein, mradcliffe: Multiple issues when PostgreSQL is used with non-public schema

merge-requests/3789/head
Lee Rowlands 2023-04-06 11:38:37 +10:00
parent 5f16738501
commit f4fcd7b578
No known key found for this signature in database
GPG Key ID: 2B829A3DF9204DC4
4 changed files with 428 additions and 25 deletions

View File

@ -116,7 +116,7 @@ abstract class Connection {
*
* @var string
*/
private string $prefix;
protected string $prefix;
/**
* Replacements to fully qualify {table} placeholders in SQL strings.
@ -126,7 +126,7 @@ abstract class Connection {
*
* @var string[]
*/
private array $tablePlaceholderReplacements;
protected array $tablePlaceholderReplacements;
/**
* The prefixes used by this database connection.

View File

@ -73,6 +73,16 @@ class Connection extends DatabaseConnection implements SupportsTemporaryTablesIn
* Constructs a connection object.
*/
public function __construct(\PDO $connection, array $connection_options) {
// Sanitize the schema name here, so we do not have to do it in other
// functions.
if (isset($connection_options['schema']) && ($connection_options['schema'] !== 'public')) {
$connection_options['schema'] = preg_replace('/[^A-Za-z0-9_]+/', '', $connection_options['schema']);
}
// We need to set the connectionOptions before the parent, because setPrefix
// needs this.
$this->connectionOptions = $connection_options;
parent::__construct($connection, $connection_options);
// Force PostgreSQL to use the UTF-8 character set by default.
@ -84,6 +94,26 @@ class Connection extends DatabaseConnection implements SupportsTemporaryTablesIn
}
}
/**
* {@inheritdoc}
*/
protected function setPrefix($prefix) {
assert(is_string($prefix), 'The \'$prefix\' argument to ' . __METHOD__ . '() must be a string');
$this->prefix = $prefix;
// Add the schema name if it is not set to public, otherwise it will use the
// default schema name.
$quoted_schema = '';
if (isset($this->connectionOptions['schema']) && ($this->connectionOptions['schema'] !== 'public')) {
$quoted_schema = $this->identifierQuotes[0] . $this->connectionOptions['schema'] . $this->identifierQuotes[1] . '.';
}
$this->tablePlaceholderReplacements = [
$quoted_schema . $this->identifierQuotes[0] . str_replace('.', $this->identifierQuotes[1] . '.' . $this->identifierQuotes[0], $prefix),
$this->identifierQuotes[1],
];
}
/**
* {@inheritdoc}
*/
@ -309,12 +339,11 @@ class Connection extends DatabaseConnection implements SupportsTemporaryTablesIn
*/
public function getFullQualifiedTableName($table) {
$options = $this->getConnectionOptions();
$prefix = $this->getPrefix();
$schema = $options['schema'] ?? 'public';
// The fully qualified table name in PostgreSQL is in the form of
// <database>.<schema>.<table>, so we have to include the 'public' schema in
// the return value.
return $options['database'] . '.public.' . $prefix . $table;
// <database>.<schema>.<table>.
return $options['database'] . '.' . $schema . '.' . $this->getPrefix() . $table;
}
/**

View File

@ -50,6 +50,17 @@ class Schema extends DatabaseSchema {
*/
protected $tempNamespaceName;
/**
* {@inheritdoc}
*/
public function __construct($connection) {
parent::__construct($connection);
// If the schema is not set in the connection options then schema defaults
// to public.
$this->defaultSchema = $connection->getConnectionOptions()['schema'] ?? 'public';
}
/**
* Make sure to limit identifiers according to PostgreSQL compiled in length.
*
@ -116,16 +127,18 @@ class Schema extends DatabaseSchema {
*/
public function queryTableInformation($table) {
// Generate a key to reference this table's information on.
$prefixed_table = $this->connection->getPrefix() . $table;
$key = $this->connection->prefixTables('{' . $table . '}');
// Take into account that temporary tables are stored in a different schema.
// \Drupal\Core\Database\Connection::generateTemporaryTableName() sets the
// 'db_temporary_' prefix to all temporary tables.
if (!str_contains($key, '.') && !str_contains($table, 'db_temporary_')) {
$key = 'public.' . $key;
if (str_contains($table, 'db_temporary_')) {
$key = $quoted_key = $this->getTempNamespaceName() . '.' . $prefixed_table;
}
else {
$key = $this->getTempNamespaceName() . '.' . $key;
$key = $this->defaultSchema . '.' . $prefixed_table;
$quoted_key = '"' . $this->defaultSchema . '"."' . $prefixed_table . '"';
}
if (!isset($this->tableInformation[$key])) {
@ -153,7 +166,7 @@ AND (format_type(pg_attribute.atttypid, pg_attribute.atttypmod) = 'bytea'
OR pg_get_expr(pg_attrdef.adbin, pg_attribute.attrelid) LIKE 'nextval%')
EOD;
$result = $this->connection->query($sql, [
':key' => $key,
':key' => $quoted_key,
]);
}
catch (\Exception $e) {
@ -206,10 +219,7 @@ EOD;
* The non-prefixed name of the table.
*/
protected function resetTableInformation($table) {
$key = $this->connection->prefixTables('{' . $table . '}');
if (!str_contains($key, '.')) {
$key = 'public.' . $key;
}
$key = $this->defaultSchema . '.' . $this->connection->getPrefix() . $table;
unset($this->tableInformation[$key]);
}
@ -553,15 +563,13 @@ EOD;
}
// Get the schema and tablename for the old table.
$old_full_name = str_replace('"', '', $this->connection->prefixTables('{' . $table . '}'));
[$old_schema, $old_table_name] = strpos($old_full_name, '.') ? explode('.', $old_full_name) : ['public', $old_full_name];
$table_name = $this->connection->getPrefix() . $table;
// Index names and constraint names are global in PostgreSQL, so we need to
// rename them when renaming the table.
$indexes = $this->connection->query('SELECT indexname FROM pg_indexes WHERE schemaname = :schema AND tablename = :table', [':schema' => $old_schema, ':table' => $old_table_name]);
$indexes = $this->connection->query('SELECT indexname FROM pg_indexes WHERE schemaname = :schema AND tablename = :table', [':schema' => $this->defaultSchema, ':table' => $table_name]);
foreach ($indexes as $index) {
// Get the index type by suffix, e.g. idx/key/pkey
// Get the index type by suffix, e.g. idx/key/pkey.
$index_type = substr($index->indexname, strrpos($index->indexname, '_') + 1);
// If the index is already rewritten by ensureIdentifiersLength() to not
@ -576,10 +584,10 @@ EOD;
// Make sure to remove the suffix from index names, because
// $this->ensureIdentifiersLength() will add the suffix again and thus
// would result in a wrong index name.
preg_match('/^' . preg_quote($old_full_name) . '__(.*)__' . preg_quote($index_type) . '/', $index->indexname, $matches);
preg_match('/^' . preg_quote($table_name) . '__(.*)__' . preg_quote($index_type) . '/', $index->indexname, $matches);
$index_name = $matches[1];
}
$this->connection->query('ALTER INDEX "' . $index->indexname . '" RENAME TO ' . $this->ensureIdentifiersLength($new_name, $index_name, $index_type) . '');
$this->connection->query('ALTER INDEX "' . $this->defaultSchema . '"."' . $index->indexname . '" RENAME TO ' . $this->ensureIdentifiersLength($new_name, $index_name, $index_type));
}
// Ensure the new table name does not include schema syntax.
@ -592,7 +600,7 @@ EOD;
// The initial name of the sequence is generated automatically by
// PostgreSQL when the table is created, so we need to use
// pg_get_serial_sequence() to retrieve it.
$old_sequence = $this->connection->query("SELECT pg_get_serial_sequence('" . $old_full_name . "', '" . $field . "')")->fetchField();
$old_sequence = $this->connection->query("SELECT pg_get_serial_sequence('" . $this->defaultSchema . '.' . $table_name . "', '" . $field . "')")->fetchField();
// If the new sequence name exceeds the maximum identifier length limit,
// it will not match the pattern that is automatically applied by
@ -715,7 +723,13 @@ EOD;
// Remove leading and trailing quotes because the index name is in a WHERE
// clause and not used as an identifier.
$index_name = str_replace('"', '', $index_name);
return (bool) $this->connection->query("SELECT 1 FROM pg_indexes WHERE indexname = '$index_name'")->fetchField();
$sql_params = [
':schema' => $this->defaultSchema,
':table' => $this->connection->getPrefix() . $table,
':index' => $index_name,
];
return (bool) $this->connection->query("SELECT 1 FROM pg_indexes WHERE schemaname = :schema AND tablename = :table AND indexname = :index", $sql_params)->fetchField();
}
/**
@ -841,7 +855,7 @@ EOD;
return FALSE;
}
$this->connection->query('DROP INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx'));
$this->connection->query('DROP INDEX ' . $this->defaultSchema . '.' . $this->ensureIdentifiersLength($table, $name, 'idx'));
$this->resetTableInformation($table);
return TRUE;
}
@ -1089,7 +1103,7 @@ EOD;
protected function getSequenceName(string $table, string $column): ?string {
return $this->connection
->query("SELECT pg_get_serial_sequence(:table, :column)", [
':table' => $this->connection->getPrefix() . $table,
':table' => $this->defaultSchema . '.' . $this->connection->getPrefix() . $table,
':column' => $column,
])
->fetchField();

View File

@ -0,0 +1,360 @@
<?php
namespace Drupal\Tests\pgsql\Kernel\pgsql;
use Drupal\Core\Database\Database;
use Drupal\Core\Database\Connection;
use Drupal\KernelTests\Core\Database\DatabaseTestSchemaDataTrait;
use Drupal\KernelTests\Core\Database\DatabaseTestSchemaInstallTrait;
use Drupal\KernelTests\Core\Database\DriverSpecificKernelTestBase;
// cSpell:ignore nspname schemaname upserting indexdef
/**
* Tests schema API for non-public schema for the PostgreSQL driver.
*
* @group Database
* @coversDefaultClass \Drupal\pgsql\Driver\Database\pgsql\Schema
*/
class NonPublicSchemaTest extends DriverSpecificKernelTestBase {
use DatabaseTestSchemaDataTrait;
use DatabaseTestSchemaInstallTrait;
/**
* The database connection for testing.
*
* @var \Drupal\Core\Database\Connection
*/
protected Connection $testingFakeConnection;
/**
* {@inheritdoc}
*/
protected function setUp(): void {
parent::setUp();
// Create a connection to the non-public schema.
$info = Database::getConnectionInfo('default');
$info['default']['schema'] = 'testing_fake';
Database::addConnectionInfo('default', 'testing_fake', $info['default']);
$this->testingFakeConnection = Database::getConnection('testing_fake', 'default');
$table_specification = [
'description' => 'Schema table description may contain "quotes" and could be long—very long indeed.',
'fields' => [
'id' => [
'type' => 'serial',
'not null' => TRUE,
],
'test_field' => [
'type' => 'int',
'not null' => TRUE,
],
],
'primary key' => ['id'],
];
$this->testingFakeConnection->schema()->createTable('faking_table', $table_specification);
$this->testingFakeConnection->insert('faking_table')
->fields(
[
'id' => '1',
'test_field' => '123',
]
)->execute();
$this->testingFakeConnection->insert('faking_table')
->fields(
[
'id' => '2',
'test_field' => '456',
]
)->execute();
$this->testingFakeConnection->insert('faking_table')
->fields(
[
'id' => '3',
'test_field' => '789',
]
)->execute();
}
/**
* {@inheritdoc}
*/
protected function tearDown(): void {
// We overwrite this function because the regular teardown will not drop the
// tables from a specified schema.
$tables = $this->testingFakeConnection->schema()->findTables('%');
foreach ($tables as $table) {
if ($this->testingFakeConnection->schema()->dropTable($table)) {
unset($tables[$table]);
}
}
$this->assertEmpty($this->testingFakeConnection->schema()->findTables('%'));
Database::removeConnection('testing_fake');
parent::tearDown();
}
/**
* @covers ::extensionExists
* @covers ::tableExists
*/
public function testExtensionExists(): void {
// Check if PG_trgm extension is present.
$this->assertTrue($this->testingFakeConnection->schema()->extensionExists('pg_trgm'));
// Asserting that the Schema testing_fake exist in the database.
$this->assertCount(1, \Drupal::database()->query("SELECT * FROM pg_catalog.pg_namespace WHERE nspname = 'testing_fake'")->fetchAll());
$this->assertTrue($this->testingFakeConnection->schema()->tableExists('faking_table'));
// Hardcoded assertion that we created the table in the non-public schema.
$this->assertCount(1, $this->testingFakeConnection->query("SELECT * FROM pg_tables WHERE schemaname = 'testing_fake' AND tablename = :prefixedTable", [':prefixedTable' => $this->testingFakeConnection->getPrefix() . "faking_table"])->fetchAll());
}
/**
* @covers ::addField
* @covers ::fieldExists
* @covers ::dropField
* @covers ::changeField
*/
public function testField(): void {
$this->testingFakeConnection->schema()->addField('faking_table', 'added_field', ['type' => 'int', 'not null' => FALSE]);
$this->assertTrue($this->testingFakeConnection->schema()->fieldExists('faking_table', 'added_field'));
$this->testingFakeConnection->schema()->changeField('faking_table', 'added_field', 'changed_field', ['type' => 'int', 'not null' => FALSE]);
$this->assertFalse($this->testingFakeConnection->schema()->fieldExists('faking_table', 'added_field'));
$this->assertTrue($this->testingFakeConnection->schema()->fieldExists('faking_table', 'changed_field'));
$this->testingFakeConnection->schema()->dropField('faking_table', 'changed_field');
$this->assertFalse($this->testingFakeConnection->schema()->fieldExists('faking_table', 'changed_field'));
}
/**
* @covers \Drupal\Core\Database\Connection::insert
* @covers \Drupal\Core\Database\Connection::select
*/
public function testInsert(): void {
$num_records_before = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->testingFakeConnection->insert('faking_table')
->fields([
'id' => '123',
'test_field' => '55',
])->execute();
// Testing that the insert is correct.
$results = $this->testingFakeConnection->select('faking_table')->fields('faking_table')->condition('id', '123')->execute()->fetchAll();
$this->assertIsArray($results);
$num_records_after = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->assertEquals($num_records_before + 1, $num_records_after, 'Merge inserted properly.');
$this->assertSame('123', $results[0]->id);
$this->assertSame('55', $results[0]->test_field);
}
/**
* @covers \Drupal\Core\Database\Connection::update
*/
public function testUpdate(): void {
$updated_record = $this->testingFakeConnection->update('faking_table')
->fields(['test_field' => 321])
->condition('id', 1)
->execute();
$this->assertSame(1, $updated_record, 'Updated 1 record.');
$updated_results = $this->testingFakeConnection->select('faking_table')->fields('faking_table')->condition('id', '1')->execute()->fetchAll();
$this->assertSame('321', $updated_results[0]->test_field);
}
/**
* @covers \Drupal\Core\Database\Connection::upsert
*/
public function testUpsert(): void {
$num_records_before = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$upsert = $this->testingFakeConnection->upsert('faking_table')
->key('id')
->fields(['id', 'test_field']);
// Upserting a new row.
$upsert->values([
'id' => '456',
'test_field' => '444',
]);
// Upserting an existing row.
$upsert->values([
'id' => '1',
'test_field' => '898',
]);
$result = $upsert->execute();
$this->assertSame(2, $result, 'The result of the upsert operation should report that at exactly two rows were affected.');
$num_records_after = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->assertEquals($num_records_before + 1, $num_records_after, 'Merge inserted properly.');
// Check if new row has been added with upsert.
$result = $this->testingFakeConnection->query('SELECT * FROM {faking_table} WHERE [id] = :id', [':id' => '456'])->fetch();
$this->assertEquals('456', $result->id, 'ID set correctly.');
$this->assertEquals('444', $result->test_field, 'test_field set correctly.');
// Check if new row has been edited with upsert.
$result = $this->testingFakeConnection->query('SELECT * FROM {faking_table} WHERE [id] = :id', [':id' => '1'])->fetch();
$this->assertEquals('1', $result->id, 'ID set correctly.');
$this->assertEquals('898', $result->test_field, 'test_field set correctly.');
}
/**
* @covers \Drupal\Core\Database\Connection::merge
*/
public function testMerge(): void {
$num_records_before = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->testingFakeConnection->merge('faking_table')
->key('id', '789')
->fields([
'test_field' => 343,
])
->execute();
$num_records_after = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->assertEquals($num_records_before + 1, $num_records_after, 'Merge inserted properly.');
$merge_results = $this->testingFakeConnection->select('faking_table')->fields('faking_table')->condition('id', '789')->execute()->fetchAll();
$this->assertSame('789', $merge_results[0]->id);
$this->assertSame('343', $merge_results[0]->test_field);
}
/**
* @covers \Drupal\Core\Database\Connection::delete
* @covers \Drupal\Core\Database\Connection::truncate
*/
public function testDelete(): void {
$num_records_before = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$num_deleted = $this->testingFakeConnection->delete('faking_table')
->condition('id', 3)
->execute();
$this->assertSame(1, $num_deleted, 'Deleted 1 record.');
$num_records_after = $this->testingFakeConnection->query('SELECT COUNT(*) FROM {faking_table}')->fetchField();
$this->assertEquals($num_records_before, $num_records_after + $num_deleted, 'Deletion adds up.');
$num_records_before = $this->testingFakeConnection->query("SELECT COUNT(*) FROM {faking_table}")->fetchField();
$this->assertNotEmpty($num_records_before);
$this->testingFakeConnection->truncate('faking_table')->execute();
$num_records_after = $this->testingFakeConnection->query("SELECT COUNT(*) FROM {faking_table}")->fetchField();
$this->assertEquals(0, $num_records_after, 'Truncate really deletes everything.');
}
/**
* @covers ::addIndex
* @covers ::indexExists
* @covers ::dropIndex
*/
public function testIndex(): void {
$this->testingFakeConnection->schema()->addIndex('faking_table', 'test_field', ['test_field'], []);
$this->assertTrue($this->testingFakeConnection->schema()->indexExists('faking_table', 'test_field'));
$results = $this->testingFakeConnection->query("SELECT * FROM pg_indexes WHERE indexname = :indexname", [':indexname' => $this->testingFakeConnection->getPrefix() . 'faking_table__test_field__idx'])->fetchAll();
$this->assertCount(1, $results);
$this->assertSame('testing_fake', $results[0]->schemaname);
$this->assertSame($this->testingFakeConnection->getPrefix() . 'faking_table', $results[0]->tablename);
$this->assertStringContainsString('USING btree (test_field)', $results[0]->indexdef);
$this->testingFakeConnection->schema()->dropIndex('faking_table', 'test_field');
$this->assertFalse($this->testingFakeConnection->schema()->indexExists('faking_table', 'test_field'));
}
/**
* @covers ::addUniqueKey
* @covers ::indexExists
* @covers ::dropUniqueKey
*/
public function testUniqueKey(): void {
$this->testingFakeConnection->schema()->addUniqueKey('faking_table', 'test_field', ['test_field']);
// This should work, but currently indexExist() only searches for keys that end with idx.
// @todo remove comments when: https://www.drupal.org/project/drupal/issues/3325358 is committed.
// $this->assertTrue($this->testingFakeConnection->schema()->indexExists('faking_table', 'test_field'));
$results = $this->testingFakeConnection->query("SELECT * FROM pg_indexes WHERE indexname = :indexname", [':indexname' => $this->testingFakeConnection->getPrefix() . 'faking_table__test_field__key'])->fetchAll();
// Check the unique key columns.
$this->assertCount(1, $results);
$this->assertSame('testing_fake', $results[0]->schemaname);
$this->assertSame($this->testingFakeConnection->getPrefix() . 'faking_table', $results[0]->tablename);
$this->assertStringContainsString('USING btree (test_field)', $results[0]->indexdef);
$this->testingFakeConnection->schema()->dropUniqueKey('faking_table', 'test_field');
// This function will not work due to a the fact that indexExist() does not search for keys without idx tag.
// @todo remove comments when: https://www.drupal.org/project/drupal/issues/3325358 is committed.
// $this->assertFalse($this->testingFakeConnection->schema()->indexExists('faking_table', 'test_field'));
}
/**
* @covers ::addPrimaryKey
* @covers ::dropPrimaryKey
*/
public function testPrimaryKey(): void {
$this->testingFakeConnection->schema()->dropPrimaryKey('faking_table');
$results = $this->testingFakeConnection->query("SELECT * FROM pg_indexes WHERE schemaname = 'testing_fake'")->fetchAll();
$this->assertCount(0, $results);
$this->testingFakeConnection->schema()->addPrimaryKey('faking_table', ['id']);
$results = $this->testingFakeConnection->query("SELECT * FROM pg_indexes WHERE schemaname = 'testing_fake'")->fetchAll();
$this->assertCount(1, $results);
$this->assertSame('testing_fake', $results[0]->schemaname);
$this->assertSame($this->testingFakeConnection->getPrefix() . 'faking_table', $results[0]->tablename);
$this->assertStringContainsString('USING btree (id)', $results[0]->indexdef);
$find_primary_keys_columns = new \ReflectionMethod(get_class($this->testingFakeConnection->schema()), 'findPrimaryKeyColumns');
$results = $find_primary_keys_columns->invoke($this->testingFakeConnection->schema(), 'faking_table');
$this->assertCount(1, $results);
$this->assertSame('id', $results[0]);
}
/**
* @covers ::renameTable
* @covers ::tableExists
* @covers ::findTables
* @covers ::dropTable
*/
public function testTable(): void {
$this->testingFakeConnection->schema()->renameTable('faking_table', 'new_faking_table');
$tables = $this->testingFakeConnection->schema()->findTables('%');
$result = $this->testingFakeConnection->query("SELECT * FROM information_schema.tables WHERE table_schema = 'testing_fake'")->fetchAll();
$this->assertFalse($this->testingFakeConnection->schema()->tableExists('faking_table'));
$this->assertTrue($this->testingFakeConnection->schema()->tableExists('new_faking_table'));
$this->assertEquals($this->testingFakeConnection->getPrefix() . 'new_faking_table', $result[0]->table_name);
$this->assertEquals('testing_fake', $result[0]->table_schema);
sort($tables);
$this->assertEquals(['new_faking_table'], $tables);
$this->testingFakeConnection->schema()->dropTable('new_faking_table');
$this->assertFalse($this->testingFakeConnection->schema()->tableExists('new_faking_table'));
$this->assertCount(0, $this->testingFakeConnection->query("SELECT * FROM information_schema.tables WHERE table_schema = 'testing_fake'")->fetchAll());
}
}