Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 58 additions & 59 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ typedef struct repack_index
typedef struct repack_table
{
const char *target_name; /* target: relname */
const char *target_schema; /* target: schemaname */
const char *temp_schema; /* target: temp_schemaname */
Oid target_oid; /* target: OID */
Oid target_toast; /* target: toast OID */
Oid target_tidx; /* target: toast index OID */
Oid pkid; /* target: PK OID */
Oid ckid; /* target: CK OID */
Oid temp_oid; /* temp: OID */
Expand All @@ -194,12 +194,12 @@ typedef struct repack_table
const char *create_trigger; /* CREATE TRIGGER repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
const char *create_table; /* CREATE TABLE table AS SELECT WITH NO DATA*/
const char *dest_tablespace; /* Destination tablespace */
const char *copy_data; /* INSERT INTO */
const char *alter_col_storage; /* ALTER TABLE ALTER COLUMN SET STORAGE */
const char *drop_columns; /* ALTER TABLE DROP COLUMNs */
const char *delete_log; /* DELETE FROM log */
const char *lock_table; /* LOCK TABLE table */
const char *lock_temp_table; /* LOCK TABLE temp table */
const char *sql_peek; /* SQL used in flush */
const char *sql_insert; /* SQL used in flush */
const char *sql_delete; /* SQL used in flush */
Expand Down Expand Up @@ -262,6 +262,7 @@ static bool no_error_on_publication = false; /* repack even though publicatio
static bool no_error_on_invalid_index = false; /* repack even though invalid index is found */
static bool error_on_invalid_index = false; /* don't repack when invalid index is found,
* deprecated, this the default behavior now */
static bool use_original_schema = false; /* use original schema of a table */
static int apply_count = APPLY_COUNT_DEFAULT;
static int switch_threshold = SWITCH_THRESHOLD_DEFAULT;

Expand Down Expand Up @@ -293,6 +294,7 @@ static pgut_option options[] =
{ 'b', 'D', "no-kill-backend", &no_kill_backend },
{ 'b', 'k', "no-superuser-check", &no_superuser_check },
{ 'l', 'C', "exclude-extension", &exclude_extension_list },
{ 'b', 6, "use-original-schema", &use_original_schema },
{ 'b', 5, "no-error-on-publication", &no_error_on_publication },
{ 'b', 4, "no-error-on-invalid-index", &no_error_on_invalid_index },
{ 'b', 3, "error-on-invalid-index", &error_on_invalid_index },
Expand Down Expand Up @@ -363,6 +365,9 @@ main(int argc, char *argv[])
else if (jobs)
ereport(WARNING, (errcode(EINVAL),
errmsg("option -j (--jobs) has no effect, repacking indexes does not use parallel jobs")));
if (use_original_schema)
ereport(WARNING, (errcode(EINVAL),
errmsg("option --use-original-schema has no effect while repacking indexes")));
if (!repack_all_indexes(errbuf, sizeof(errbuf)))
ereport(ERROR,
(errcode(ERROR), errmsg("%s", errbuf)));
Expand Down Expand Up @@ -805,11 +810,11 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
num_schemas = simple_string_list_size(schema_list);
num_excluded_extensions = simple_string_list_size(exclude_extension_list);

/* 1st param is the user-specified tablespace */
/* 1st param is tablespace, 2nd is use_original_schema */
num_params = num_excluded_extensions +
num_parent_tables +
num_tables +
num_schemas + 1;
num_schemas + 2;
params = pgut_malloc(num_params * sizeof(char *));

initStringInfo(&sql);
Expand All @@ -828,13 +833,12 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)

/* acquire target tables */
appendStringInfoString(&sql,
"SELECT t.*,"
" coalesce(v.tablespace, t.tablespace_orig) as tablespace_dest"
" FROM repack.tables t, "
" (VALUES ($1::text)) as v (tablespace)"
"SELECT t.*"
" FROM repack.get_tables($1::name, $2::boolean) t "
" WHERE ");

params[iparam++] = tablespace;
params[iparam++] = use_original_schema ? "true" : "false";
if (num_tables || num_parent_tables)
{
/* standalone tables */
Expand Down Expand Up @@ -947,9 +951,8 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)

table.target_name = getstr(res, i, c++);
table.target_oid = getoid(res, i, c++);
table.target_toast = getoid(res, i, c++);
table.target_tidx = getoid(res, i, c++);
c++; // Skip schemaname
table.target_schema = getstr(res, i, c++);
table.temp_schema = getstr(res, i, c++);
table.pkid = getoid(res, i, c++);
table.ckid = getoid(res, i, c++);
table.temp_oid = InvalidOid; /* filled after creating the temp table */
Expand All @@ -967,25 +970,23 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.enable_trigger = getstr(res, i, c++);

table.create_table = getstr(res, i, c++);
getstr(res, i, c++); /* tablespace_orig is clobbered */
table.copy_data = getstr(res, i , c++);
table.alter_col_storage = getstr(res, i, c++);
table.drop_columns = getstr(res, i, c++);
table.delete_log = getstr(res, i, c++);
table.lock_table = getstr(res, i, c++);
table.lock_temp_table = getstr(res, i, c++);
ckey = getstr(res, i, c++);
table.sql_peek = getstr(res, i, c++);
table.sql_insert = getstr(res, i, c++);
table.sql_delete = getstr(res, i, c++);
table.sql_update = getstr(res, i, c++);
table.sql_pop = getstr(res, i, c++);
table.dest_tablespace = getstr(res, i, c++);

/* Craft Copy SQL */
initStringInfo(&copy_sql);
appendStringInfoString(&copy_sql, table.copy_data);
if (!orderby)

{
if (ckey != NULL)
{
Expand Down Expand Up @@ -1268,12 +1269,14 @@ repack_one_table(repack_table *table, const char *orderby)
int num;
char *vxid = NULL;
char buffer[12];
char pid_buf[12];
StringInfoData sql;
bool ret = false;
PGresult *indexres = NULL;
const char *indexparams[2];
const char *indexparams[3];
char indexbuffer[12];
int j;
Oid log_oid;

/* appname will be "pg_repack" in normal use on 9.0+, or
* "pg_regress/<testname>" when run under `make installcheck`
Expand All @@ -1295,21 +1298,19 @@ repack_one_table(repack_table *table, const char *orderby)

elog(DEBUG2, "---- repack_one_table ----");
elog(DEBUG2, "target_name : %s", table->target_name);
elog(DEBUG2, "target_schema : %s", table->target_schema);
elog(DEBUG2, "temp_schema : %s", table->temp_schema);
elog(DEBUG2, "target_oid : %u", table->target_oid);
elog(DEBUG2, "target_toast : %u", table->target_toast);
elog(DEBUG2, "target_tidx : %u", table->target_tidx);
elog(DEBUG2, "pkid : %u", table->pkid);
elog(DEBUG2, "ckid : %u", table->ckid);
elog(DEBUG2, "create_pktype : %s", table->create_pktype);
elog(DEBUG2, "create_log : %s", table->create_log);
elog(DEBUG2, "create_trigger : %s", table->create_trigger);
elog(DEBUG2, "enable_trigger : %s", table->enable_trigger);
elog(DEBUG2, "create_table : %s", table->create_table);
elog(DEBUG2, "dest_tablespace : %s", table->dest_tablespace);
elog(DEBUG2, "copy_data : %s", table->copy_data);
elog(DEBUG2, "alter_col_storage : %s", table->alter_col_storage ?
table->alter_col_storage : "(skipped)");
elog(DEBUG2, "drop_columns : %s", table->drop_columns ? table->drop_columns : "(skipped)");
elog(DEBUG2, "alter_col_storage : %s", table->alter_col_storage);
elog(DEBUG2, "drop_columns : %s", table->drop_columns);
elog(DEBUG2, "delete_log : %s", table->delete_log);
elog(DEBUG2, "lock_table : %s", table->lock_table);
elog(DEBUG2, "sql_peek : %s", table->sql_peek);
Expand Down Expand Up @@ -1349,7 +1350,8 @@ repack_one_table(repack_table *table, const char *orderby)
*/

indexparams[0] = utoa(table->target_oid, indexbuffer);
indexparams[1] = moveidx ? tablespace : NULL;
indexparams[1] = table->temp_schema;
indexparams[2] = moveidx ? tablespace : NULL;

/* First, just display a warning message for any invalid indexes
* which may be on the table (mostly to match the behavior of 1.1.8),
Expand All @@ -1375,9 +1377,9 @@ repack_one_table(repack_table *table, const char *orderby)

indexres = execute(
"SELECT indexrelid,"
" repack.repack_indexdef(indexrelid, indrelid, $2, FALSE) "
" repack.repack_indexdef(indexrelid, format('%I.table_%s', $2::text, indrelid), $3, FALSE) "
" FROM pg_index WHERE indrelid = $1 AND indisvalid",
2, indexparams);
3, indexparams);

table->n_indexes = PQntuples(indexres);
table->indexes = pgut_malloc(table->n_indexes * sizeof(repack_index));
Expand Down Expand Up @@ -1424,12 +1426,14 @@ repack_one_table(repack_table *table, const char *orderby)

command(table->create_pktype, 0, NULL);
temp_obj_num++;
command(table->create_log, 0, NULL);
res = execute(table->create_log, 0, NULL);
log_oid = getoid(res, 0, 0);
CLEARPGRES(res);
temp_obj_num++;
command(table->create_trigger, 0, NULL);
temp_obj_num++;
command(table->enable_trigger, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum(%u)", log_oid);
command(sql.data, 0, NULL);

/* While we are still holding an AccessExclusive lock on the table, submit
Expand All @@ -1444,8 +1448,8 @@ repack_one_table(repack_table *table, const char *orderby)
* pg_locks momentarily.
*/
res = pgut_execute(conn2, "SELECT pg_backend_pid()", 0, NULL);
buffer[0] = '\0';
strncat(buffer, PQgetvalue(res, 0, 0), sizeof(buffer) - 1);
pid_buf[0] = '\0';
strncat(pid_buf, PQgetvalue(res, 0, 0), sizeof(pid_buf) - 1);
CLEARPGRES(res);

/*
Expand Down Expand Up @@ -1536,7 +1540,7 @@ repack_one_table(repack_table *table, const char *orderby)

/* Fetch an array of Virtual IDs of all transactions active right now.
*/
params[0] = buffer; /* backend PID of conn2 */
params[0] = pid_buf; /* backend PID of conn2 */
params[1] = PROGRAM_NAME;
res = execute(SQL_XID_SNAPSHOT, 2, params);
vxid = pgut_strdup(PQgetvalue(res, 0, 0));
Expand Down Expand Up @@ -1569,27 +1573,19 @@ repack_one_table(repack_table *table, const char *orderby)
* Before copying data to the target table, we need to set the column storage
* type if its storage type has been changed from the type default.
*/
params[0] = utoa(table->target_oid, buffer);
params[1] = table->dest_tablespace;
command(table->create_table, 2, params);
if (table->alter_col_storage)
command(table->alter_col_storage, 0, NULL);
res = execute(table->create_table, 0, NULL);
table->temp_oid = getoid(res, 0, 0);
Assert(OidIsValid(table->temp_oid));
CLEARPGRES(res);

command(table->alter_col_storage, 0, NULL);
command(table->copy_data, 0, NULL);
temp_obj_num++;
printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.table_%u')", table->target_oid);
if (table->drop_columns)
command(table->drop_columns, 0, NULL);
printfStringInfo(&sql, "SELECT repack.disable_autovacuum(%u)", table->temp_oid);
command(table->drop_columns, 0, NULL);
command(sql.data, 0, NULL);
command("COMMIT", 0, NULL);

/* Get OID of the temp table */
printfStringInfo(&sql, "SELECT 'repack.table_%u'::regclass::oid",
table->target_oid);
res = execute(sql.data, 0, NULL);
table->temp_oid = getoid(res, 0, 0);
Assert(OidIsValid(table->temp_oid));
CLEARPGRES(res);

/*
* 3. Create indexes on temp table.
*/
Expand Down Expand Up @@ -1671,10 +1667,8 @@ repack_one_table(repack_table *table, const char *orderby)
* Acquire AccessExclusiveLock on the temp table to prevent concurrent
* operations during swapping relations.
*/
printfStringInfo(&sql, "LOCK TABLE repack.table_%u IN ACCESS EXCLUSIVE MODE",
table->target_oid);
if (!(lock_exclusive(conn2, utoa(table->temp_oid, buffer),
sql.data, false)))
table->lock_temp_table, false)))
{
elog(WARNING, "lock_exclusive() failed in conn2 for table_%u",
table->target_oid);
Expand All @@ -1683,7 +1677,8 @@ repack_one_table(repack_table *table, const char *orderby)

apply_log(conn2, table, 0);
params[0] = utoa(table->target_oid, buffer);
pgut_command(conn2, "SELECT repack.repack_swap($1)", 1, params);
params[1] = utoa(table->temp_oid, indexbuffer);
pgut_command(conn2, "SELECT repack.repack_swap($1, $2)", 2, params);
pgut_command(conn2, "COMMIT", 0, NULL);

/*
Expand All @@ -1693,8 +1688,10 @@ repack_one_table(repack_table *table, const char *orderby)
elog(DEBUG2, "---- drop ----");

command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
params[1] = utoa(temp_obj_num, indexbuffer);
command("SELECT repack.repack_drop($1, $2)", 2, params);
params[0] = utoa(table->target_oid, buffer);
params[1] = table->temp_schema;
params[2] = utoa(temp_obj_num, indexbuffer);
command("SELECT repack.repack_drop($1, $2, $3)", 3, params);
command("COMMIT", 0, NULL);

temp_obj_num = 0; /* reset temporary object counter after cleanup */
Expand Down Expand Up @@ -2053,14 +2050,15 @@ repack_cleanup_callback(bool fatal, void *userdata)
{
repack_table *table = (repack_table *) userdata;
Oid target_table = table->target_oid;
const char *params[2];
const char *params[3];
char buffer[12];
char num_buff[12];

if(fatal)
{
params[0] = utoa(target_table, buffer);
params[1] = utoa(temp_obj_num, num_buff);
params[1] = table->temp_schema;
params[2] = utoa(temp_obj_num, num_buff);

/* testing PQstatus() of connection and conn2, as we do
* in repack_cleanup(), doesn't seem to work here,
Expand All @@ -2076,7 +2074,7 @@ repack_cleanup_callback(bool fatal, void *userdata)
table->target_name);
}

command("SELECT repack.repack_drop($1, $2)", 2, params);
command("SELECT repack.repack_drop($1, $2, $3)", 3, params);
command("COMMIT", 0, NULL);
temp_obj_num = 0; /* reset temporary object counter after cleanup */
}
Expand All @@ -2097,7 +2095,7 @@ repack_cleanup(bool fatal, const repack_table *table)
{
char buffer[12];
char num_buff[12];
const char *params[2];
const char *params[3];

/* Try reconnection if not available. */
if (PQstatus(connection) != CONNECTION_OK ||
Expand All @@ -2106,7 +2104,8 @@ repack_cleanup(bool fatal, const repack_table *table)

/* do cleanup */
params[0] = utoa(table->target_oid, buffer);
params[1] = utoa(temp_obj_num, num_buff);
params[1] = table->temp_schema;
params[2] = utoa(temp_obj_num, num_buff);

command("BEGIN ISOLATION LEVEL READ COMMITTED", 0, NULL);
if (!(lock_exclusive(connection, params[0], table->lock_table, false)))
Expand All @@ -2116,7 +2115,7 @@ repack_cleanup(bool fatal, const repack_table *table)
table->target_name);
}

command("SELECT repack.repack_drop($1, $2)", 2, params);
command("SELECT repack.repack_drop($1, $2, $3)", 3, params);
command("COMMIT", 0, NULL);
temp_obj_num = 0; /* reset temporary object counter after cleanup */
}
Expand Down Expand Up @@ -2238,7 +2237,7 @@ repack_table_indexes(PGresult *index_details)
continue;

params[0] = utoa(index, buffer[0]);
res = execute("SELECT repack.repack_indexdef($1, $2, $3, true)", 3,
res = execute("SELECT repack.repack_indexdef($1, repack.oid2text($2), $3, true)", 3,
params);

if (PQntuples(res) < 1)
Expand Down
Loading
Loading