CVS User Account cvsuser
Wed Mar 16 17:15:43 PST 2005
Log Message:
-----------
Change the SUBSCRIBE_SET event so that it starts by checking for
the availability of tables on the subscriber node, and subscribes
sequences _BEFORE_ copying all the data.

That way we find problems before doing 8h of copying of data...

As suggested by Hannu Krosing

Modified Files:
--------------
    slony1-engine/src/slon:
        remote_worker.c (r1.77 -> r1.78)

-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.77
retrieving revision 1.78
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.77 -r1.78
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -2352,6 +2352,180 @@
 		}
 	}
 
+	/* cbbrowne - in progress - check tables/sequences in set to
+	 * make sure they are there and in good order.  Don't copy any
+	 * data yet; we want to just do a first pass that finds "bozo
+	 * errors" */
+
+        /* Check tables and sequences in set to make sure they are all
+	 * appropriately configured... */
+
+	/*
+	 * Select the list of all tables the provider currently has in the set.
+	 */
+	slon_mkquery(&query1,
+		     "select T.tab_id, "
+		     "    \"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
+		     "    \"pg_catalog\".quote_ident(PGC.relname) as tab_fqname, "
+		     "    T.tab_idxname, T.tab_comment "
+		     "from %s.sl_table T, "
+		     "    \"pg_catalog\".pg_class PGC, "
+		     "    \"pg_catalog\".pg_namespace PGN "
+		     "where T.tab_set = %d "
+		     "    and T.tab_reloid = PGC.oid "
+		     "    and PGC.relnamespace = PGN.oid "
+		     "order by tab_id; ",
+		     rtcfg_namespace, set_id);
+	res1 = PQexec(pro_dbconn, dstring_data(&query1));
+	if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+	{
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+			 node->no_id, dstring_data(&query1),
+			 PQresultErrorMessage(res1));
+		PQclear(res1);
+		slon_disconnectdb(pro_conn);
+		dstring_free(&query1);
+		terminate_log_archive();
+		return -1;
+	}
+	ntuples1 = PQntuples(res1);
+
+	/*
+	 * For each table in the set
+	 */
+	for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
+	{
+		int	tab_id = strtol(PQgetvalue(res1, tupno1, 0), NULL, 10);
+		char	   *tab_fqname = PQgetvalue(res1, tupno1, 1);
+		char	   *tab_idxname = PQgetvalue(res1, tupno1, 2);
+		char	   *tab_comment = PQgetvalue(res1, tupno1, 3);
+		int64		copysize = 0;
+
+		gettimeofday(&tv_start2, NULL);
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+			 "prepare to copy table %s\n",
+			 node->no_id, tab_fqname);
+
+		/*
+		 * Find out if the table we're copying has the special slony serial
+		 * number key on the provider DB
+		 */
+		slon_mkquery(&query1,
+			     "select %s.tableHasSerialKey('%q');",
+			     rtcfg_namespace, tab_fqname);
+		res2 = PQexec(pro_dbconn, dstring_data(&query1));
+		if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+				 node->no_id, dstring_data(&query1),
+				 PQresultErrorMessage(res2));
+			PQclear(res2);
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+		rc = *PQgetvalue(res2, 0, 0) == 't';
+		PQclear(res2);
+
+		if (rc)
+		{
+			/*
+			 * It has, check if the table has this on the local DB too.
+			 */
+			slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+				 "table %s will require Slony-I serial key\n",
+				 node->no_id, tab_fqname);
+			res2 = PQexec(loc_dbconn, dstring_data(&query1));
+			if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+			{
+				slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+					 node->no_id, dstring_data(&query1),
+					 PQresultErrorMessage(res2));
+				PQclear(res2);
+				PQclear(res1);
+				slon_disconnectdb(pro_conn);
+				dstring_free(&query1);
+				terminate_log_archive();
+				return -1;
+			}
+			rc = *PQgetvalue(res2, 0, 0) == 't';
+			PQclear(res2);
+
+			if (!rc)
+			{
+				slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+					 "table %s Slony-I serial key to be added local\n",
+					 node->no_id, tab_fqname);
+			}
+		}
+		else
+		{
+			slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
+				 "table %s does not require Slony-I serial key\n",
+				 node->no_id, tab_fqname);
+		}
+	}
+	PQclear(res1);
+
+	slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+		 "all tables for set %d found on subscriber\n",
+		 node->no_id, set_id);
+	/*
+	 * Add in the sequences contained in the set
+	 */
+	slon_mkquery(&query1,
+		     "select SQ.seq_id, "
+		     "		\"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
+		     "		\"pg_catalog\".quote_ident(PGC.relname), "
+		     "		SQ.seq_comment "
+		     "	from %s.sl_sequence SQ, "
+		     "		\"pg_catalog\".pg_class PGC, "
+		     "		\"pg_catalog\".pg_namespace PGN "
+		     "	where SQ.seq_set = %d "
+		     "		and PGC.oid = SQ.seq_reloid "
+		     "		and PGN.oid = PGC.relnamespace; ",
+		     rtcfg_namespace, set_id);
+	res1 = PQexec(pro_dbconn, dstring_data(&query1));
+	if (PQresultStatus(res1) != PGRES_TUPLES_OK)
+	{
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
+			 node->no_id, dstring_data(&query1),
+			 PQresultErrorMessage(res1));
+		PQclear(res1);
+		slon_disconnectdb(pro_conn);
+		dstring_free(&query1);
+		terminate_log_archive();
+		return -1;
+	}
+	ntuples1 = PQntuples(res1);
+	for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
+	{
+		char	   *seq_id = PQgetvalue(res1, tupno1, 0);
+		char	   *seq_fqname = PQgetvalue(res1, tupno1, 1);
+		char	   *seq_comment = PQgetvalue(res1, tupno1, 2);
+
+		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
+			 "copy sequence %s\n",
+			 node->no_id, seq_fqname);
+
+		slon_mkquery(&query1,
+			     "select %s.setAddSequence_int(%d, %s, '%q', '%q')",
+			     rtcfg_namespace, set_id, seq_id,
+			     seq_fqname, seq_comment);
+		if (query_execute(node, loc_dbconn, &query1) < 0)
+		{
+			PQclear(res1);
+			slon_disconnectdb(pro_conn);
+			dstring_free(&query1);
+			terminate_log_archive();
+			return -1;
+		}
+	}
+	PQclear(res1);
+
+
 	/*
 	 * Select the list of all tables the provider currently has in the set.
 	 */
@@ -2842,55 +3016,60 @@
 	/*
 	 * Copy the sequences contained in the set
 	 */
-	slon_mkquery(&query1,
-		     "select SQ.seq_id, "
-		     "		\"pg_catalog\".quote_ident(PGN.nspname) || '.' || "
-		     "		\"pg_catalog\".quote_ident(PGC.relname), "
-		     "		SQ.seq_comment "
-		     "	from %s.sl_sequence SQ, "
-		     "		\"pg_catalog\".pg_class PGC, "
-		     "		\"pg_catalog\".pg_namespace PGN "
-		     "	where SQ.seq_set = %d "
-		     "		and PGC.oid = SQ.seq_reloid "
-		     "		and PGN.oid = PGC.relnamespace; ",
-		     rtcfg_namespace, set_id);
-	res1 = PQexec(pro_dbconn, dstring_data(&query1));
-	if (PQresultStatus(res1) != PGRES_TUPLES_OK)
-	{
-		slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s",
-			 node->no_id, dstring_data(&query1),
-			 PQresultErrorMessage(res1));
-		PQclear(res1);
-		slon_disconnectdb(pro_conn);
-		dstring_free(&query1);
-		terminate_log_archive();
-		return -1;
-	}
-	ntuples1 = PQntuples(res1);
-	for (tupno1 = 0; tupno1 < ntuples1; tupno1++)
-	{
-		char	   *seq_id = PQgetvalue(res1, tupno1, 0);
-		char	   *seq_fqname = PQgetvalue(res1, tupno1, 1);
-		char	   *seq_comment = PQgetvalue(res1, tupno1, 2);
-
-		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
-			 "copy sequence %s\n",
-			 node->no_id, seq_fqname);
 
-		slon_mkquery(&query1,
-			     "select %s.setAddSequence_int(%d, %s, '%q', '%q')",
-			     rtcfg_namespace, set_id, seq_id,
-			     seq_fqname, seq_comment);
-		if (query_execute(node, loc_dbconn, &query1) < 0)
-		{
-			PQclear(res1);
-			slon_disconnectdb(pro_conn);
-			dstring_free(&query1);
-			terminate_log_archive();
-			return -1;
-		}
-	}
-	PQclear(res1);
+	/* The copy of sequences is being done earlier, before we
+	 * start doing tables, so that if anything is missing, that is
+	 * noticed BEFORE 8 hours of copying of data takes place... */
+
+/* 	slon_mkquery(&query1, */
+/* 		     "select SQ.seq_id, " */
+/* 		     "		\"pg_catalog\".quote_ident(PGN.nspname) || '.' || " */
+/* 		     "		\"pg_catalog\".quote_ident(PGC.relname), " */
+/* 		     "		SQ.seq_comment " */
+/* 		     "	from %s.sl_sequence SQ, " */
+/* 		     "		\"pg_catalog\".pg_class PGC, " */
+/* 		     "		\"pg_catalog\".pg_namespace PGN " */
+/* 		     "	where SQ.seq_set = %d " */
+/* 		     "		and PGC.oid = SQ.seq_reloid " */
+/* 		     "		and PGN.oid = PGC.relnamespace; ", */
+/* 		     rtcfg_namespace, set_id); */
+/* 	res1 = PQexec(pro_dbconn, dstring_data(&query1)); */
+/* 	if (PQresultStatus(res1) != PGRES_TUPLES_OK) */
+/* 	{ */
+/* 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s", */
+/* 			 node->no_id, dstring_data(&query1), */
+/* 			 PQresultErrorMessage(res1)); */
+/* 		PQclear(res1); */
+/* 		slon_disconnectdb(pro_conn); */
+/* 		dstring_free(&query1); */
+/* 		terminate_log_archive(); */
+/* 		return -1; */
+/* 	} */
+/* 	ntuples1 = PQntuples(res1); */
+/* 	for (tupno1 = 0; tupno1 < ntuples1; tupno1++) */
+/* 	{ */
+/* 		char	   *seq_id = PQgetvalue(res1, tupno1, 0); */
+/* 		char	   *seq_fqname = PQgetvalue(res1, tupno1, 1); */
+/* 		char	   *seq_comment = PQgetvalue(res1, tupno1, 2); */
+
+/* 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " */
+/* 			 "copy sequence %s\n", */
+/* 			 node->no_id, seq_fqname); */
+
+/* 		slon_mkquery(&query1, */
+/* 			     "select %s.setAddSequence_int(%d, %s, '%q', '%q')", */
+/* 			     rtcfg_namespace, set_id, seq_id, */
+/* 			     seq_fqname, seq_comment); */
+/* 		if (query_execute(node, loc_dbconn, &query1) < 0) */
+/* 		{ */
+/* 			PQclear(res1); */
+/* 			slon_disconnectdb(pro_conn); */
+/* 			dstring_free(&query1); */
+/* 			terminate_log_archive(); */
+/* 			return -1; */
+/* 		} */
+/* 	} */
+/* 	PQclear(res1); */
 
 	/*
 	 * And copy over the sequence last_value corresponding to the


More information about the Slony1-commit mailing list