CVS User Account cvsuser
Thu Feb 17 06:59:14 PST 2005
Log Message:
-----------
First cut on the log shipping mechanism.

The tools/slony1_dump.sh script can be used against a subscriber
to get a snapshot of a replica's user data plus minimal status
information. 

If slon is started with -a <archivedir>, it will output an sql
script for every sync processing, containing statements that replicate
the changes plus keep the status information up to date. Note that these
scripts will have the same "grouping" of SYNC events that the replica
used.

Next steps:
    Add output of archive files by slon for subscribing, unsubscribing
	and merging of sets and for ddl script executions.

Jan

Modified Files:
--------------
    slony1-engine/src/slon:
        confoptions.h (r1.13 -> r1.14)
        remote_worker.c (r1.71 -> r1.72)
        slon.c (r1.42 -> r1.43)

Added Files:
-----------
    slony1-engine/tools:
        slony1_dump.sh (r1.1)

-------------- next part --------------
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.13
retrieving revision 1.14
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.13 -r1.14
--- src/slon/confoptions.h
+++ src/slon/confoptions.h
@@ -15,7 +15,7 @@
 extern char *rtcfg_conninfo;
 
 extern char *pid_file;
-
+extern char *archive_dir;
 
 extern int	vac_frequency;
 extern int	slon_log_level;
@@ -267,12 +267,21 @@
 		{
 			(const char *)"log_timestamp_format",
 			gettext_noop("A strftime()-style log timestamp format string."),
-			gettext_noop("A strftime()-style log timestamp format string."),
 			SLON_C_STRING
 		},
 		&log_timestamp_format,
 		"%Y-%m-%d %H:%M:%S %Z"
 	},
+	{
+		{
+			(const char *)"archive_dir",
+			gettext_noop("Where to drop the sync archive files"),
+			NULL,
+			SLON_C_STRING
+		},
+		&archive_dir,
+		NULL
+	},
 #ifdef HAVE_SYSLOG
 	{
 		{
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.71
retrieving revision 1.72
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.71 -r1.72
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -26,6 +26,7 @@
 #include "c.h"
 
 #include "slon.h"
+#include "confoptions.h"
 
 
 /*
@@ -185,6 +186,7 @@
 	WorkGroupLineCode code;
 	ProviderInfo *provider;
 	SlonDString data;
+	SlonDString log;
 
 	WorkerGroupLine *prev;
 	WorkerGroupLine *next;
@@ -1210,6 +1212,7 @@
 						line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine));
 						memset(line, 0, sizeof(WorkerGroupLine));
 						dstring_init(&(line->data));
+						dstring_init(&(line->log));
 						DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail,
 										line);
 					}
@@ -1295,6 +1298,7 @@
 				if ((line = wd->linepool_head) == NULL)
 					break;
 				dstring_free(&(line->data));
+				dstring_free(&(line->log));
 				DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail,
 							  line);
 #ifdef SLON_MEMDEBUG
@@ -2934,6 +2938,11 @@
 	SlonDString query;
 	SlonDString *provider_qual;
 
+	/* FIXME: must determine and use OS specific max path length */
+	char		archive_name[1024];
+	char		archive_tmp[1024];
+	FILE	   *archive_fp = NULL;
+
 	gettimeofday(&tv_start, NULL);
 	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: SYNC " INT64_FORMAT
 			 " processing\n",
@@ -2943,6 +2952,36 @@
 	dstring_init(&query);
 
 	/*
+	 * If this slon is running in log archiving mode, open a
+	 * temporary file for it.
+	 */
+	if (archive_dir)
+	{
+		int			i;
+
+		sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node->no_id);
+		for (i = strlen(seqbuf); i < 20; i++)
+			strcat(archive_name, "0");
+		strcat(archive_name, seqbuf);
+		strcat(archive_name, ".sql");
+		strcpy(archive_tmp, archive_name);
+		strcat(archive_tmp, ".tmp");
+
+		if ((archive_fp = fopen(archive_tmp, "w")) == NULL)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+						"Cannot open archive file %s - %s\n",
+						node->no_id, archive_tmp, strerror(errno));
+			dstring_free(&query);
+			return 60;
+		}
+		fprintf(archive_fp, "-- Slony-I sync log\n"
+				"-- Event %d,%s\n"
+				"start transaction;\n",
+				node->no_id, seqbuf);
+	}
+
+	/*
 	 * Establish all required data provider connections
 	 */
 	for (provider = wd->provider_head; provider; provider = provider->next)
@@ -2955,6 +2994,11 @@
 						 "No pa_conninfo for data provider %d\n",
 						 node->no_id, provider->no_id);
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				return 10;
 			}
 			sprintf(conn_symname, "subscriber_%d_provider_%d",
@@ -2968,6 +3012,11 @@
 						 node->no_id, provider->no_id,
 						 provider->pa_conninfo);
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				return provider->pa_connretry;
 			}
 
@@ -2980,6 +3029,11 @@
 			if (query_execute(node, provider->conn->dbconn, &query) < 0)
 			{
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				slon_disconnectdb(provider->conn);
 				provider->conn = NULL;
 				return provider->pa_connretry;
@@ -3016,6 +3070,11 @@
 						 node->no_id, provider->no_id,
 						 event->ev_origin);
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				return 10;
 			}
 			if (prov_seqno < event->ev_seqno)
@@ -3026,6 +3085,11 @@
 						 node->no_id, provider->no_id,
 						 prov_seqno, event->ev_origin);
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				return 10;
 			}
 			slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -3036,78 +3100,8 @@
 		}
 	}
 
-	/*
-	 * Get all sequence updates
-	 */
-	for (provider = wd->provider_head; provider; provider = provider->next)
-	{
-		int			ntuples1;
-		int			tupno1;
-
-		slon_mkquery(&query,
-					 "select SL.seql_seqid, SL.seql_last_value "
-					 "	from %s.sl_seqlog SL, "
-					 "		%s.sl_sequence SQ "
-					 "	where SQ.seq_id = SL.seql_seqid "
-					 "		and SL.seql_origin = %d "
-					 "		and SL.seql_ev_seqno = '%s' "
-					 "		and SQ.seq_set in (",
-					 rtcfg_namespace, rtcfg_namespace,
-					 node->no_id, seqbuf);
-		for (pset = provider->set_head; pset; pset = pset->next)
-			slon_appendquery(&query, "%s%d",
-							 (pset->prev == NULL) ? "" : ",",
-							 pset->set_id);
-		slon_appendquery(&query, "); ");
-
-		res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
-		if (PQresultStatus(res1) != PGRES_TUPLES_OK)
-		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
-					 node->no_id, dstring_data(&query),
-					 PQresultErrorMessage(res1));
-			PQclear(res1);
-			dstring_free(&query);
-			slon_disconnectdb(provider->conn);
-			provider->conn = NULL;
-			return 20;
-		}
-		ntuples1 = PQntuples(res1);
-		for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
-		{
-			char	   *seql_seqid = PQgetvalue(res1, tupno1, 0);
-			char	   *seql_last_value = PQgetvalue(res1, tupno1, 1);
-
-			slon_mkquery(&query,
-						 "select %s.sequenceSetValue(%s,%d,'%s','%s'); ",
-						 rtcfg_namespace,
-						 seql_seqid, node->no_id, seqbuf, seql_last_value);
-			if (query_execute(node, local_dbconn, &query) < 0)
-			{
-				PQclear(res1);
-				dstring_free(&query);
-				return 60;
-			}
-		}
-		PQclear(res1);
-
-		/*
-		 * Start listening on the special relation that will cause our local
-		 * connection to be killed when the provider node fails.
-		 */
-		slon_mkquery(&query,
-					 "listen \"_%s_Node_%d\"; ",
-					 rtcfg_cluster_name, provider->no_id);
-		if (query_execute(node, local_dbconn, &query) < 0)
-		{
-			dstring_free(&query);
-			return 60;
-		}
-	}
-
 	dstring_init(&new_qual);
 
-
 	if (strlen(event->ev_xip) != 0)
 		slon_mkquery(&new_qual,
 					 "(log_xid < '%s' and "
@@ -3162,6 +3156,11 @@
 			PQclear(res1);
 			dstring_free(&new_qual);
 			dstring_free(&query);
+			if (archive_fp)
+			{
+				fclose(archive_fp);
+				unlink(archive_tmp);
+			}
 			return 60;
 		}
 
@@ -3208,6 +3207,11 @@
 				PQclear(res1);
 				dstring_free(&new_qual);
 				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
 				return 60;
 			}
 			ntuples2 = PQntuples(res2);
@@ -3310,8 +3314,19 @@
 			else
 				slon_appendquery(provider_qual, "\n) ");
 
-
 			PQclear(res2);
+
+			/*
+			 * Add a call to the setsync tracking function to
+			 * the archive log. This function ensures that all
+			 * archive log files are applied in the right order.
+			 */
+			if (archive_fp)
+			{
+				fprintf(archive_fp, "select %s.setsyncTracking_offline(%d, '%s', '%s');\n",
+						rtcfg_namespace,
+						sub_set, PQgetvalue(res1, tupno1, 1), seqbuf);
+			}
 		}
 		PQclear(res1);
 
@@ -3341,6 +3356,12 @@
 				 "no sets need syncing for this event\n",
 				 node->no_id);
 		dstring_free(&query);
+		if (archive_fp)
+		{
+			fprintf(archive_fp, "commit;\n");
+			fclose(archive_fp);
+			rename(archive_tmp, archive_name);
+		}
 		return 0;
 	}
 
@@ -3395,6 +3416,26 @@
 					if (num_errors > 0)
 						break;
 
+					if (wgline->log.n_used > 0)
+					{
+						res1 = PQexec(local_dbconn, dstring_data(&(wgline->log)));
+						if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
+						{
+							PQclear(res1);
+							break;
+						}
+						if (PQresultStatus(res1) != PGRES_COMMAND_OK)
+						{
+							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+									 "\"%s\" %s - qualification was: %s\n",
+									 node->no_id, dstring_data(&(wgline->data)),
+									 PQresultErrorMessage(res1),
+									 dstring_data(&(wgline->provider->helper_qualification)));
+							num_errors++;
+						}
+						PQclear(res1);
+					}
+
 					res1 = PQexec(local_dbconn, dstring_data(&(wgline->data)));
 					if (PQresultStatus(res1) == PGRES_EMPTY_QUERY)
 					{
@@ -3430,6 +3471,16 @@
 					}
 #endif
 					PQclear(res1);
+
+					/*
+					 * Add the user data modification part to
+					 * the archive log.
+					 */
+					if (archive_fp)
+					{
+						fprintf(archive_fp, "%s", dstring_data(&(wgline->data)));
+					}
+
 					break;
 
 				case SLON_WGLC_DONE:
@@ -3461,6 +3512,7 @@
 		{
 			wgnext = wgline->next;
 			dstring_reset(&(wgline->data));
+			dstring_reset(&(wgline->log));
 			DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline);
 		}
 		if (num_errors == 1)
@@ -3509,12 +3561,108 @@
 	if (num_errors != 0)
 	{
 		dstring_free(&query);
+		if (archive_fp)
+		{
+			fclose(archive_fp);
+			unlink(archive_tmp);
+		}
 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
 				 node->no_id);
 		return 10;
 	}
 
 	/*
+	 * Get all sequence updates
+	 */
+	for (provider = wd->provider_head; provider; provider = provider->next)
+	{
+		int			ntuples1;
+		int			tupno1;
+
+		slon_mkquery(&query,
+					 "select SL.seql_seqid, SL.seql_last_value "
+					 "	from %s.sl_seqlog SL, "
+					 "		%s.sl_sequence SQ "
+					 "	where SQ.seq_id = SL.seql_seqid "
+					 "		and SL.seql_origin = %d "
+					 "		and SL.seql_ev_seqno = '%s' "
+					 "		and SQ.seq_set in (",
+					 rtcfg_namespace, rtcfg_namespace,
+					 node->no_id, seqbuf);
+		for (pset = provider->set_head; pset; pset = pset->next)
+			slon_appendquery(&query, "%s%d",
+							 (pset->prev == NULL) ? "" : ",",
+							 pset->set_id);
+		slon_appendquery(&query, "); ");
+
+		res1 = PQexec(provider->conn->dbconn, dstring_data(&query));
+		if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+					 node->no_id, dstring_data(&query),
+					 PQresultErrorMessage(res1));
+			PQclear(res1);
+			dstring_free(&query);
+			if (archive_fp)
+			{
+				fclose(archive_fp);
+				unlink(archive_tmp);
+			}
+			slon_disconnectdb(provider->conn);
+			provider->conn = NULL;
+			return 20;
+		}
+		ntuples1 = PQntuples(res1);
+		for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
+		{
+			char	   *seql_seqid = PQgetvalue(res1, tupno1, 0);
+			char	   *seql_last_value = PQgetvalue(res1, tupno1, 1);
+
+			slon_mkquery(&query,
+						 "select %s.sequenceSetValue(%s,%d,'%s','%s'); ",
+						 rtcfg_namespace,
+						 seql_seqid, node->no_id, seqbuf, seql_last_value);
+			if (query_execute(node, local_dbconn, &query) < 0)
+			{
+				PQclear(res1);
+				dstring_free(&query);
+				if (archive_fp)
+				{
+					fclose(archive_fp);
+					unlink(archive_tmp);
+				}
+				return 60;
+			}
+
+			/*
+			 * Add the sequence number adjust call to the archive log.
+			 */
+			if (archive_fp)
+			{
+				slon_mkquery(&query,
+						 "select %s.sequenceSetValue_offline(%s,'%s');\n",
+						 rtcfg_namespace,
+						 seql_seqid, seql_last_value);
+				fprintf(archive_fp, dstring_data(&query));
+			}
+		}
+		PQclear(res1);
+
+		/*
+		 * Start listening on the special relation that will cause our local
+		 * connection to be killed when the provider node fails.
+		 */
+		slon_mkquery(&query,
+					 "listen \"_%s_Node_%d\"; ",
+					 rtcfg_cluster_name, provider->no_id);
+		if (query_execute(node, local_dbconn, &query) < 0)
+		{
+			dstring_free(&query);
+			return 60;
+		}
+	}
+
+	/*
 	 * Light's are still green ... update the setsync status of all the sets
 	 * we've just replicated ...
 	 */
@@ -3551,6 +3699,11 @@
 					 PQresultErrorMessage(res1));
 			PQclear(res1);
 			dstring_free(&query);
+			if (archive_fp)
+			{
+				fclose(archive_fp);
+				unlink(archive_tmp);
+			}
 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
 					 node->no_id);
 			return 10;
@@ -3569,6 +3722,11 @@
 		if (query_execute(node, local_dbconn, &query) < 0)
 		{
 			dstring_free(&query);
+			if (archive_fp)
+			{
+				fclose(archive_fp);
+				unlink(archive_tmp);
+			}
 			return 60;
 		}
 	}
@@ -3592,6 +3750,11 @@
 				 PQresultErrorMessage(res1));
 		PQclear(res1);
 		dstring_free(&query);
+		if (archive_fp)
+		{
+			fclose(archive_fp);
+			unlink(archive_tmp);
+		}
 		return 60;
 	}
 	if (PQntuples(res1) > 0)
@@ -3606,6 +3769,11 @@
 		{
 			PQclear(res1);
 			dstring_free(&query);
+			if (archive_fp)
+			{
+				fclose(archive_fp);
+				unlink(archive_tmp);
+			}
 			return 60;
 		}
 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -3615,6 +3783,17 @@
 	PQclear(res1);
 
 	/*
+	 * Add the final commit to the archive log, close it and rename
+	 * the temporary file to the real log chunk filename.
+	 */
+	if (archive_fp)
+	{
+		fprintf(archive_fp, "commit;\n");
+		fclose(archive_fp);
+		rename(archive_tmp, archive_name);
+	}
+
+	/*
 	 * Good job!
 	 */
 	dstring_free(&query);
@@ -3833,6 +4012,7 @@
 						line->code = SLON_WGLC_ACTION;
 						line->provider = provider;
 						dstring_reset(&(line->data));
+						dstring_reset(&(line->log));
 					}
 
 					/*
@@ -3847,7 +4027,7 @@
 
 					if (wd->tab_forward[log_tableid])
 					{
-						slon_appendquery(&(line->data),
+						slon_appendquery(&(line->log),
 										 "insert into %s.sl_log_1 "
 									"    (log_origin, log_xid, log_tableid, "
 										 "     log_actionseq, log_cmdtype, "
@@ -3861,21 +4041,21 @@
 					{
 						case 'I':
 							slon_appendquery(&(line->data),
-											 "insert into %s %s;",
+											 "insert into %s %s;\n",
 											 wd->tab_fqname[log_tableid],
 											 log_cmddata);
 							break;
 
 						case 'U':
 							slon_appendquery(&(line->data),
-											 "update only %s set %s;",
+											 "update only %s set %s;\n",
 											 wd->tab_fqname[log_tableid],
 											 log_cmddata);
 							break;
 
 						case 'D':
 							slon_appendquery(&(line->data),
-											 "delete from only %s where %s;",
+											 "delete from only %s where %s;\n",
 											 wd->tab_fqname[log_tableid],
 											 log_cmddata);
 							break;
Index: slon.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.c,v
retrieving revision 1.42
retrieving revision 1.43
diff -Lsrc/slon/slon.c -Lsrc/slon/slon.c -u -w -r1.42 -r1.43
--- src/slon/slon.c
+++ src/slon/slon.c
@@ -52,6 +52,7 @@
 
 int			slon_log_level;
 char	   *pid_file;
+char	   *archive_dir = NULL;
 
 /*
  * ---------- main ----------
@@ -75,7 +76,7 @@
 	InitializeConfOptions();
 
 
-	while ((c = getopt(argc, argv, "f:d:s:t:g:c:p:o:hv")) != EOF)
+	while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:hv")) != EOF)
 	{
 		switch (c)
 		{
@@ -83,6 +84,10 @@
 				ProcessConfigFile(optarg);
 				break;
 
+			case 'a':
+				set_config_option("archive_dir", optarg);
+				break;
+				
 			case 'd':
 				set_config_option("log_level", optarg);
 				break;
--- /dev/null
+++ tools/slony1_dump.sh
@@ -0,0 +1,221 @@
+#!/bin/sh
+# ----------
+# slony1_dump.sh
+#
+#	This script creates a special data only dump from a subscriber
+#	node. The stdout of this script, fed into psql for a database that
+#	has the user schema of the replicated database installed, will
+#	prepare that database for log archive application.
+# ----------
+
+# ----
+# Check for correct usage
+# ----
+if test $# -ne 2 ; then
+	echo "usage: $0 subscriber-dbname clustername" >&2
+	exit 1
+fi
+
+# ----
+# Remember call arguments and get the nodeId of the DB specified
+# ----
+dbname=$1
+cluster=$2
+clname="\"_$2\""
+pgc="\"pg_catalog\""
+nodeid=`psql -q -At -c "select \"_$cluster\".getLocalNodeId('_$cluster')" $dbname`
+
+# ----
+# Get a list of all replicated table ID's this subscriber receives,
+# and remember the table names.
+# ----
+tables=`psql -q -At -d $dbname -c \
+		"select tab_id from $clname.sl_table, $clname.sl_set
+				where tab_set = set_id
+					and exists (select 1 from $clname.sl_subscribe
+							where sub_set = set_id
+								and sub_receiver = $nodeid)"`
+for tab in $tables ; do
+	eval tabname_$tab=`psql -q -At -d $dbname -c \
+			"select $pgc.quote_ident(tab_nspname) || '.' || 
+					$pgc.quote_ident(tab_relname) from 
+					$clname.sl_table where tab_id = $tab"`
+done
+
+# ----
+# Get a list of all replicated sequence ID's this subscriber receives,
+# and remember the sequence names.
+# ----
+sequences=`psql -q -At -d $dbname -c \
+		"select seq_id from $clname.sl_sequence, $clname.sl_set
+				where seq_set = set_id
+					and exists (select 1 from $clname.sl_subscribe
+							where sub_set = set_id
+								and sub_receiver = $nodeid)"`
+for seq in $sequences ; do
+	eval seqname_$seq=`psql -q -At -d $dbname -c \
+			"select $pgc.quote_ident(seq_nspname) || '.' || 
+					$pgc.quote_ident(seq_relname) from 
+					$clname.sl_sequence where seq_id = $seq"`
+done
+
+
+# ----
+# Emit SQL code to create the slony specific object required
+# in the remote database.
+# ----
+cat <<_EOF_
+start transaction;
+
+-- ----------------------------------------------------------------------
+-- SCHEMA $clname
+-- ----------------------------------------------------------------------
+create schema $clname;
+
+-- ----------------------------------------------------------------------
+-- TABLE sl_sequence_offline
+-- ----------------------------------------------------------------------
+create table $clname.sl_sequence_offline (
+	seq_id				int4,
+	seq_relname			name NOT NULL,
+	seq_nspname			name NOT NULL,
+
+	CONSTRAINT "sl_sequence-pkey"
+		PRIMARY KEY (seq_id)
+);
+
+
+-- ----------------------------------------------------------------------
+-- TABLE sl_setsync_offline
+-- ----------------------------------------------------------------------
+create table $clname.sl_setsync_offline (
+	ssy_setid			int4,
+	ssy_seqno			int8,
+
+	CONSTRAINT "sl_setsync-pkey"
+		PRIMARY KEY (ssy_setid)
+);
+
+
+-- ----------------------------------------------------------------------
+-- FUNCTION sequenceSetValue_offline (seq_id, seq_origin, ev_seqno, last_value)
+-- ----------------------------------------------------------------------
+create or replace function $clname.sequenceSetValue_offline(int4, int8) returns int4
+as '
+declare
+	p_seq_id			alias for \$1;
+	p_last_value		alias for \$2;
+	v_fqname			text;
+begin
+	-- ----
+	-- Get the sequences fully qualified name
+	-- ----
+	select "pg_catalog".quote_ident(seq_nspname) || ''.'' ||
+			"pg_catalog".quote_ident(seq_relname) into v_fqname
+		from $clname.sl_sequence_offline
+		where seq_id = p_seq_id;
+	if not found then
+		raise exception ''Slony-I: sequence % not found'', p_seq_id;
+	end if;
+
+	-- ----
+	-- Update it to the new value
+	-- ----
+	execute ''select setval('''''' || v_fqname ||
+			'''''', '''''' || p_last_value || '''''')'';
+	return p_seq_id;
+end;
+' language plpgsql;
+
+-- ----------------------------------------------------------------------
+-- FUNCTION setsyncTracking_offline (seq_id, seq_origin, ev_seqno, last_value)
+-- ----------------------------------------------------------------------
+create or replace function $clname.setsyncTracking_offline(int4, int8, int8) returns int8
+as '
+declare
+	p_set_id	alias for \$1;
+	p_old_seq	alias for \$2;
+	p_new_seq	alias for \$3;
+	v_row		record;
+begin
+	select ssy_seqno into v_row from $clname.sl_setsync_offline
+		where ssy_setid = p_set_id for update;
+	if not found then
+		raise exception ''Slony-I: set % not found'', p_set_id;
+	end if;
+
+	if v_row.ssy_seqno <> p_old_seq then
+		raise exception ''Slony-I: set % is on sync %, this archive log expects %'', 
+			p_set_id, v_row.ssy_seqno, p_old_seq;
+	end if;
+
+	update $clname.sl_setsync_offline set ssy_seqno = p_new_seq
+		where ssy_setid = p_set_id;
+	return p_new_seq;
+end;
+' language plpgsql;
+
+_EOF_
+
+
+# ----
+# The remainder of this script is written in a way that
+# all output is generated by psql inside of one serializable
+# transaction, so that we get a consistent snapshot of the
+# replica.
+# ----
+
+(
+echo "start transaction;"
+echo "set transaction isolation level serializable;"
+
+# ----
+# Fill the sl_sequence_offline table and provide initial 
+# values for all sequences.
+# ----
+echo "select 'copy $clname.sl_sequence_offline from stdin;';"
+echo "select seq_id::text || '	' || seq_relname  || '	' || seq_nspname from $clname.sl_sequence;"
+echo "select '\\\\.';"
+
+for seq in $sequences ; do
+	eval seqname=\$seqname_$seq
+	echo "select 'select $clname.sequenceSetValue_offline($seq, ''' || last_value::text || ''');' from $seqname;"
+done
+
+# ----
+# Fill the setsync tracking table with the current status
+# ----
+echo "select 'insert into $clname.sl_setsync_offline values (' ||
+			ssy_setid::text || ', ''' || ssy_seqno || ''');'
+			from $clname.sl_setsync where exists (select 1
+						from $clname.sl_subscribe
+						where ssy_setid = sub_set
+							and sub_receiver = $nodeid);"
+
+# ----
+# Now dump all the user table data
+# ----
+for tab in $tables ; do
+	eval tabname=\$tabname_$tab
+	echo "select 'copy $tabname from stdin;';"
+
+	echo "copy $tabname to stdout;"
+
+	echo "select '\\\\.';"
+done
+
+# ----
+# Commit the transaction here in the replica that provided us
+# with the information.
+# ----
+echo "commit;"
+) | psql -q -At -d $dbname
+
+
+# ----
+# Emit the commit for the dump to stdout.
+# ----
+echo "commit;"
+
+exit 0
+


More information about the Slony1-commit mailing list