Thu Feb 17 06:59:14 PST 2005
- Previous message: [Slony1-commit] By smsimms: Provide a less cryptic error message when you pass a
- Next message: [Slony1-commit] By cbbrowne: 1.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- First cut on the log shipping mechanism. The tools/slony1_dump.sh script can be used against a subscriber to get a snapshot of a replica's user data plus minimal status information. If slon is started with -a <archivedir>, it will output an sql script for every sync processing, containing statements that replicate the changes plus keep the status information up to date. Note that these scripts will have the same "grouping" of SYNC events that the replica used. Next steps: Add output of archive files by slon for subscribing, unsubscribing and merging of sets and for ddl script executions. Jan Modified Files: -------------- slony1-engine/src/slon: confoptions.h (r1.13 -> r1.14) remote_worker.c (r1.71 -> r1.72) slon.c (r1.42 -> r1.43) Added Files: ----------- slony1-engine/tools: slony1_dump.sh (r1.1) -------------- next part -------------- Index: confoptions.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v retrieving revision 1.13 retrieving revision 1.14 diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.13 -r1.14 --- src/slon/confoptions.h +++ src/slon/confoptions.h @@ -15,7 +15,7 @@ extern char *rtcfg_conninfo; extern char *pid_file; - +extern char *archive_dir; extern int vac_frequency; extern int slon_log_level; @@ -267,12 +267,21 @@ { (const char *)"log_timestamp_format", gettext_noop("A strftime()-style log timestamp format string."), - gettext_noop("A strftime()-style log timestamp format string."), SLON_C_STRING }, &log_timestamp_format, "%Y-%m-%d %H:%M:%S %Z" }, + { + { + (const char *)"archive_dir", + gettext_noop("Where to drop the sync archive files"), + NULL, + SLON_C_STRING + }, + &archive_dir, + NULL + }, #ifdef HAVE_SYSLOG { { Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.71 retrieving revision 1.72 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.71 -r1.72 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -26,6 +26,7 @@ #include "c.h" #include "slon.h" +#include "confoptions.h" /* @@ -185,6 +186,7 @@ WorkGroupLineCode code; ProviderInfo *provider; SlonDString data; + SlonDString log; WorkerGroupLine *prev; WorkerGroupLine *next; @@ -1210,6 +1212,7 @@ line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine)); memset(line, 0, sizeof(WorkerGroupLine)); dstring_init(&(line->data)); + dstring_init(&(line->log)); DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail, line); } @@ -1295,6 +1298,7 @@ if ((line = wd->linepool_head) == NULL) break; dstring_free(&(line->data)); + dstring_free(&(line->log)); DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail, line); #ifdef SLON_MEMDEBUG @@ -2934,6 +2938,11 @@ SlonDString query; SlonDString *provider_qual; + /* FIXME: must determine and use OS specific max path length */ + char archive_name[1024]; + char archive_tmp[1024]; + FILE *archive_fp = NULL; + gettimeofday(&tv_start, NULL); slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT " processing\n", @@ -2943,6 +2952,36 @@ dstring_init(&query); /* + * If this slon is running in log archiving mode, open a + * temporary file for it. + */ + if (archive_dir) + { + int i; + + sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node->no_id); + for (i = strlen(seqbuf); i < 20; i++) + strcat(archive_name, "0"); + strcat(archive_name, seqbuf); + strcat(archive_name, ".sql"); + strcpy(archive_tmp, archive_name); + strcat(archive_tmp, ".tmp"); + + if ((archive_fp = fopen(archive_tmp, "w")) == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Cannot open archive file %s - %s\n", + node->no_id, archive_tmp, strerror(errno)); + dstring_free(&query); + return 60; + } + fprintf(archive_fp, "-- Slony-I sync log\n" + "-- Event %d,%s\n" + "start transaction;\n", + node->no_id, seqbuf); + } + + /* * Establish all required data provider connections */ for (provider = wd->provider_head; provider; provider = provider->next) @@ -2955,6 +2994,11 @@ "No pa_conninfo for data provider %d\n", node->no_id, provider->no_id); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 10; } sprintf(conn_symname, "subscriber_%d_provider_%d", @@ -2968,6 +3012,11 @@ node->no_id, provider->no_id, provider->pa_conninfo); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return provider->pa_connretry; } @@ -2980,6 +3029,11 @@ if (query_execute(node, provider->conn->dbconn, &query) < 0) { dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } slon_disconnectdb(provider->conn); provider->conn = NULL; return provider->pa_connretry; @@ -3016,6 +3070,11 @@ node->no_id, provider->no_id, event->ev_origin); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 10; } if (prov_seqno < event->ev_seqno) @@ -3026,6 +3085,11 @@ node->no_id, provider->no_id, prov_seqno, event->ev_origin); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 10; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -3036,78 +3100,8 @@ } } - /* - * Get all sequence updates - */ - for (provider = wd->provider_head; provider; provider = provider->next) - { - int ntuples1; - int tupno1; - - slon_mkquery(&query, - "select SL.seql_seqid, SL.seql_last_value " - " from %s.sl_seqlog SL, " - " %s.sl_sequence SQ " - " where SQ.seq_id = SL.seql_seqid " - " and SL.seql_origin = %d " - " and SL.seql_ev_seqno = '%s' " - " and SQ.seq_set in (", - rtcfg_namespace, rtcfg_namespace, - node->no_id, seqbuf); - for (pset = provider->set_head; pset; pset = pset->next) - slon_appendquery(&query, "%s%d", - (pset->prev == NULL) ? "" : ",", - pset->set_id); - slon_appendquery(&query, "); "); - - res1 = PQexec(provider->conn->dbconn, dstring_data(&query)); - if (PQresultStatus(res1) != PGRES_TUPLES_OK) - { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", - node->no_id, dstring_data(&query), - PQresultErrorMessage(res1)); - PQclear(res1); - dstring_free(&query); - slon_disconnectdb(provider->conn); - provider->conn = NULL; - return 20; - } - ntuples1 = PQntuples(res1); - for (tupno1 = 0; tupno1 < ntuples1; tupno1++) - { - char *seql_seqid = PQgetvalue(res1, tupno1, 0); - char *seql_last_value = PQgetvalue(res1, tupno1, 1); - - slon_mkquery(&query, - "select %s.sequenceSetValue(%s,%d,'%s','%s'); ", - rtcfg_namespace, - seql_seqid, node->no_id, seqbuf, seql_last_value); - if (query_execute(node, local_dbconn, &query) < 0) - { - PQclear(res1); - dstring_free(&query); - return 60; - } - } - PQclear(res1); - - /* - * Start listening on the special relation that will cause our local - * connection to be killed when the provider node fails. - */ - slon_mkquery(&query, - "listen \"_%s_Node_%d\"; ", - rtcfg_cluster_name, provider->no_id); - if (query_execute(node, local_dbconn, &query) < 0) - { - dstring_free(&query); - return 60; - } - } - dstring_init(&new_qual); - if (strlen(event->ev_xip) != 0) slon_mkquery(&new_qual, "(log_xid < '%s' and " @@ -3162,6 +3156,11 @@ PQclear(res1); dstring_free(&new_qual); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 60; } @@ -3208,6 +3207,11 @@ PQclear(res1); dstring_free(&new_qual); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 60; } ntuples2 = PQntuples(res2); @@ -3310,8 +3314,19 @@ else slon_appendquery(provider_qual, "\n) "); - PQclear(res2); + + /* + * Add a call to the setsync tracking function to + * the archive log. This function ensures that all + * archive log files are applied in the right order. + */ + if (archive_fp) + { + fprintf(archive_fp, "select %s.setsyncTracking_offline(%d, '%s', '%s');\n", + rtcfg_namespace, + sub_set, PQgetvalue(res1, tupno1, 1), seqbuf); + } } PQclear(res1); @@ -3341,6 +3356,12 @@ "no sets need syncing for this event\n", node->no_id); dstring_free(&query); + if (archive_fp) + { + fprintf(archive_fp, "commit;\n"); + fclose(archive_fp); + rename(archive_tmp, archive_name); + } return 0; } @@ -3395,6 +3416,26 @@ if (num_errors > 0) break; + if (wgline->log.n_used > 0) + { + res1 = PQexec(local_dbconn, dstring_data(&(wgline->log))); + if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) + { + PQclear(res1); + break; + } + if (PQresultStatus(res1) != PGRES_COMMAND_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "\"%s\" %s - qualification was: %s\n", + node->no_id, dstring_data(&(wgline->data)), + PQresultErrorMessage(res1), + dstring_data(&(wgline->provider->helper_qualification))); + num_errors++; + } + PQclear(res1); + } + res1 = PQexec(local_dbconn, dstring_data(&(wgline->data))); if (PQresultStatus(res1) == PGRES_EMPTY_QUERY) { @@ -3430,6 +3471,16 @@ } #endif PQclear(res1); + + /* + * Add the user data modification part to + * the archive log. + */ + if (archive_fp) + { + fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); + } + break; case SLON_WGLC_DONE: @@ -3461,6 +3512,7 @@ { wgnext = wgline->next; dstring_reset(&(wgline->data)); + dstring_reset(&(wgline->log)); DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline); } if (num_errors == 1) @@ -3509,12 +3561,108 @@ if (num_errors != 0) { dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; } /* + * Get all sequence updates + */ + for (provider = wd->provider_head; provider; provider = provider->next) + { + int ntuples1; + int tupno1; + + slon_mkquery(&query, + "select SL.seql_seqid, SL.seql_last_value " + " from %s.sl_seqlog SL, " + " %s.sl_sequence SQ " + " where SQ.seq_id = SL.seql_seqid " + " and SL.seql_origin = %d " + " and SL.seql_ev_seqno = '%s' " + " and SQ.seq_set in (", + rtcfg_namespace, rtcfg_namespace, + node->no_id, seqbuf); + for (pset = provider->set_head; pset; pset = pset->next) + slon_appendquery(&query, "%s%d", + (pset->prev == NULL) ? "" : ",", + pset->set_id); + slon_appendquery(&query, "); "); + + res1 = PQexec(provider->conn->dbconn, dstring_data(&query)); + if (PQresultStatus(res1) != PGRES_TUPLES_OK) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", + node->no_id, dstring_data(&query), + PQresultErrorMessage(res1)); + PQclear(res1); + dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } + slon_disconnectdb(provider->conn); + provider->conn = NULL; + return 20; + } + ntuples1 = PQntuples(res1); + for (tupno1 = 0; tupno1 < ntuples1; tupno1++) + { + char *seql_seqid = PQgetvalue(res1, tupno1, 0); + char *seql_last_value = PQgetvalue(res1, tupno1, 1); + + slon_mkquery(&query, + "select %s.sequenceSetValue(%s,%d,'%s','%s'); ", + rtcfg_namespace, + seql_seqid, node->no_id, seqbuf, seql_last_value); + if (query_execute(node, local_dbconn, &query) < 0) + { + PQclear(res1); + dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } + return 60; + } + + /* + * Add the sequence number adjust call to the archive log. + */ + if (archive_fp) + { + slon_mkquery(&query, + "select %s.sequenceSetValue_offline(%s,'%s');\n", + rtcfg_namespace, + seql_seqid, seql_last_value); + fprintf(archive_fp, dstring_data(&query)); + } + } + PQclear(res1); + + /* + * Start listening on the special relation that will cause our local + * connection to be killed when the provider node fails. + */ + slon_mkquery(&query, + "listen \"_%s_Node_%d\"; ", + rtcfg_cluster_name, provider->no_id); + if (query_execute(node, local_dbconn, &query) < 0) + { + dstring_free(&query); + return 60; + } + } + + /* * Light's are still green ... update the setsync status of all the sets * we've just replicated ... */ @@ -3551,6 +3699,11 @@ PQresultErrorMessage(res1)); PQclear(res1); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; @@ -3569,6 +3722,11 @@ if (query_execute(node, local_dbconn, &query) < 0) { dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 60; } } @@ -3592,6 +3750,11 @@ PQresultErrorMessage(res1)); PQclear(res1); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 60; } if (PQntuples(res1) > 0) @@ -3606,6 +3769,11 @@ { PQclear(res1); dstring_free(&query); + if (archive_fp) + { + fclose(archive_fp); + unlink(archive_tmp); + } return 60; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -3615,6 +3783,17 @@ PQclear(res1); /* + * Add the final commit to the archive log, close it and rename + * the temporary file to the real log chunk filename. + */ + if (archive_fp) + { + fprintf(archive_fp, "commit;\n"); + fclose(archive_fp); + rename(archive_tmp, archive_name); + } + + /* * Good job! */ dstring_free(&query); @@ -3833,6 +4012,7 @@ line->code = SLON_WGLC_ACTION; line->provider = provider; dstring_reset(&(line->data)); + dstring_reset(&(line->log)); } /* @@ -3847,7 +4027,7 @@ if (wd->tab_forward[log_tableid]) { - slon_appendquery(&(line->data), + slon_appendquery(&(line->log), "insert into %s.sl_log_1 " " (log_origin, log_xid, log_tableid, " " log_actionseq, log_cmdtype, " @@ -3861,21 +4041,21 @@ { case 'I': slon_appendquery(&(line->data), - "insert into %s %s;", + "insert into %s %s;\n", wd->tab_fqname[log_tableid], log_cmddata); break; case 'U': slon_appendquery(&(line->data), - "update only %s set %s;", + "update only %s set %s;\n", wd->tab_fqname[log_tableid], log_cmddata); break; case 'D': slon_appendquery(&(line->data), - "delete from only %s where %s;", + "delete from only %s where %s;\n", wd->tab_fqname[log_tableid], log_cmddata); break; Index: slon.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v retrieving revision 1.42 retrieving revision 1.43 diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.42 -r1.43 --- src/slon/slon.c +++ src/slon/slon.c @@ -52,6 +52,7 @@ int slon_log_level; char *pid_file; +char *archive_dir = NULL; /* * ---------- main ---------- @@ -75,7 +76,7 @@ InitializeConfOptions(); - while ((c = getopt(argc, argv, "f:d:s:t:g:c:p:o:hv")) != EOF) + while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:hv")) != EOF) { switch (c) { @@ -83,6 +84,10 @@ ProcessConfigFile(optarg); break; + case 'a': + set_config_option("archive_dir", optarg); + break; + case 'd': set_config_option("log_level", optarg); break; --- /dev/null +++ tools/slony1_dump.sh @@ -0,0 +1,221 @@ +#!/bin/sh +# ---------- +# slony1_dump.sh +# +# This script creates a special data only dump from a subscriber +# node. The stdout of this script, fed into psql for a database that +# has the user schema of the replicated database installed, will +# prepare that database for log archive application. +# ---------- + +# ---- +# Check for correct usage +# ---- +if test $# -ne 2 ; then + echo "usage: $0 subscriber-dbname clustername" >&2 + exit 1 +fi + +# ---- +# Remember call arguments and get the nodeId of the DB specified +# ---- +dbname=$1 +cluster=$2 +clname="\"_$2\"" +pgc="\"pg_catalog\"" +nodeid=`psql -q -At -c "select \"_$cluster\".getLocalNodeId('_$cluster')" $dbname` + +# ---- +# Get a list of all replicated table ID's this subscriber receives, +# and remember the table names. +# ---- +tables=`psql -q -At -d $dbname -c \ + "select tab_id from $clname.sl_table, $clname.sl_set + where tab_set = set_id + and exists (select 1 from $clname.sl_subscribe + where sub_set = set_id + and sub_receiver = $nodeid)"` +for tab in $tables ; do + eval tabname_$tab=`psql -q -At -d $dbname -c \ + "select $pgc.quote_ident(tab_nspname) || '.' || + $pgc.quote_ident(tab_relname) from + $clname.sl_table where tab_id = $tab"` +done + +# ---- +# Get a list of all replicated sequence ID's this subscriber receives, +# and remember the sequence names. +# ---- +sequences=`psql -q -At -d $dbname -c \ + "select seq_id from $clname.sl_sequence, $clname.sl_set + where seq_set = set_id + and exists (select 1 from $clname.sl_subscribe + where sub_set = set_id + and sub_receiver = $nodeid)"` +for seq in $sequences ; do + eval seqname_$seq=`psql -q -At -d $dbname -c \ + "select $pgc.quote_ident(seq_nspname) || '.' || + $pgc.quote_ident(seq_relname) from + $clname.sl_sequence where seq_id = $seq"` +done + + +# ---- +# Emit SQL code to create the slony specific object required +# in the remote database. +# ---- +cat <<_EOF_ +start transaction; + +-- ---------------------------------------------------------------------- +-- SCHEMA $clname +-- ---------------------------------------------------------------------- +create schema $clname; + +-- ---------------------------------------------------------------------- +-- TABLE sl_sequence_offline +-- ---------------------------------------------------------------------- +create table $clname.sl_sequence_offline ( + seq_id int4, + seq_relname name NOT NULL, + seq_nspname name NOT NULL, + + CONSTRAINT "sl_sequence-pkey" + PRIMARY KEY (seq_id) +); + + +-- ---------------------------------------------------------------------- +-- TABLE sl_setsync_offline +-- ---------------------------------------------------------------------- +create table $clname.sl_setsync_offline ( + ssy_setid int4, + ssy_seqno int8, + + CONSTRAINT "sl_setsync-pkey" + PRIMARY KEY (ssy_setid) +); + + +-- ---------------------------------------------------------------------- +-- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value) +-- ---------------------------------------------------------------------- +create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4 +as ' +declare + p_seq_id alias for \$1; + p_last_value alias for \$2; + v_fqname text; +begin + -- ---- + -- Get the sequences fully qualified name + -- ---- + select "pg_catalog".quote_ident(seq_nspname) || ''.'' || + "pg_catalog".quote_ident(seq_relname) into v_fqname + from $clname.sl_sequence_offline + where seq_id = p_seq_id; + if not found then + raise exception ''Slony-I: sequence % not found'', p_seq_id; + end if; + + -- ---- + -- Update it to the new value + -- ---- + execute ''select setval('''''' || v_fqname || + '''''', '''''' || p_last_value || '''''')''; + return p_seq_id; +end; +' language plpgsql; + +-- ---------------------------------------------------------------------- +-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value) +-- ---------------------------------------------------------------------- +create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8 +as ' +declare + p_set_id alias for \$1; + p_old_seq alias for \$2; + p_new_seq alias for \$3; + v_row record; +begin + select ssy_seqno into v_row from $clname.sl_setsync_offline + where ssy_setid = p_set_id for update; + if not found then + raise exception ''Slony-I: set % not found'', p_set_id; + end if; + + if v_row.ssy_seqno <> p_old_seq then + raise exception ''Slony-I: set % is on sync %, this archive log expects %'', + p_set_id, v_row.ssy_seqno, p_old_seq; + end if; + + update $clname.sl_setsync_offline set ssy_seqno = p_new_seq + where ssy_setid = p_set_id; + return p_new_seq; +end; +' language plpgsql; + +_EOF_ + + +# ---- +# The remainder of this script is written in a way that +# all output is generated by psql inside of one serializable +# transaction, so that we get a consistent snapshot of the +# replica. +# ---- + +( +echo "start transaction;" +echo "set transaction isolation level serializable;" + +# ---- +# Fill the sl_sequence_offline table and provide initial +# values for all sequences. +# ---- +echo "select 'copy $clname.sl_sequence_offline from stdin;';" +echo "select seq_id::text || ' ' || seq_relname || ' ' || seq_nspname from $clname.sl_sequence;" +echo "select '\\\\.';" + +for seq in $sequences ; do + eval seqname=\$seqname_$seq + echo "select 'select $clname.sequenceSetValue_offline($seq, ''' || last_value::text || ''');' from $seqname;" +done + +# ---- +# Fill the setsync tracking table with the current status +# ---- +echo "select 'insert into $clname.sl_setsync_offline values (' || + ssy_setid::text || ', ''' || ssy_seqno || ''');' + from $clname.sl_setsync where exists (select 1 + from $clname.sl_subscribe + where ssy_setid = sub_set + and sub_receiver = $nodeid);" + +# ---- +# Now dump all the user table data +# ---- +for tab in $tables ; do + eval tabname=\$tabname_$tab + echo "select 'copy $tabname from stdin;';" + + echo "copy $tabname to stdout;" + + echo "select '\\\\.';" +done + +# ---- +# Commit the transaction here in the replica that provided us +# with the information. +# ---- +echo "commit;" +) | psql -q -At -d $dbname + + +# ---- +# Emit the commit for the dump to stdout. +# ---- +echo "commit;" + +exit 0 +
- Previous message: [Slony1-commit] By smsimms: Provide a less cryptic error message when you pass a
- Next message: [Slony1-commit] By cbbrowne: 1.
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list