CVS User Account cvsuser
Wed Nov 9 16:24:28 PST 2005
Log Message:
-----------
Use a new table sl_nodelock to guard against concurrent slon daemons
for the same node as well as registering all node connections for
the terminateNodeConnections() function.

Jan

Tags:
----
REL_1_1_STABLE

Modified Files:
--------------
    slony1-engine/src/backend:
        slony1_base.sql (r1.27.2.1 -> r1.27.2.2)
        slony1_funcs.c (r1.33.2.1 -> r1.33.2.2)
        slony1_funcs.sql (r1.64.2.10 -> r1.64.2.11)
    slony1-engine/src/slon:
        local_listen.c (r1.31 -> r1.31.2.1)
        remote_listen.c (r1.21 -> r1.21.2.1)
        remote_worker.c (r1.86.2.8 -> r1.86.2.9)

-------------- next part --------------
Index: slony1_base.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_base.sql,v
retrieving revision 1.27.2.1
retrieving revision 1.27.2.2
diff -Lsrc/backend/slony1_base.sql -Lsrc/backend/slony1_base.sql -u -w -r1.27.2.1 -r1.27.2.2
--- src/backend/slony1_base.sql
+++ src/backend/slony1_base.sql
@@ -35,6 +35,23 @@
 
 
 -- ----------------------------------------------------------------------
+-- TABLE sl_nodelock
+-- ----------------------------------------------------------------------
+create table @NAMESPACE at .sl_nodelock (
+	nl_nodeid			int4,
+	nl_conncnt			serial,
+	nl_backendpid		int4,
+
+	CONSTRAINT "sl_nodelock-pkey"
+		PRIMARY KEY (nl_nodeid, nl_conncnt)
+);
+comment on table @NAMESPACE at .sl_nodelock is 'Used to prevent multiple slon instances and to identify the backends to kill in terminateNodeConnections().';
+comment on column @NAMESPACE at .sl_nodelock.nl_nodeid is 'Clients node_id';
+comment on column @NAMESPACE at .sl_nodelock.nl_conncnt is 'Clients connection number';
+comment on column @NAMESPACE at .sl_nodelock.nl_backendpid is 'PID of database backend owning this lock';
+
+
+-- ----------------------------------------------------------------------
 -- TABLE sl_set
 -- ----------------------------------------------------------------------
 create table @NAMESPACE at .sl_set (
Index: slony1_funcs.sql
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.64.2.10
retrieving revision 1.64.2.11
diff -Lsrc/backend/slony1_funcs.sql -Lsrc/backend/slony1_funcs.sql -u -w -r1.64.2.10 -r1.64.2.11
--- src/backend/slony1_funcs.sql
+++ src/backend/slony1_funcs.sql
@@ -233,28 +233,44 @@
 grant execute on function @NAMESPACE at .logTrigger () to public;
 
 -- ----------------------------------------------------------------------
--- FUNCTION terminateNodeConnections (name)
+-- FUNCTION terminateNodeConnections (failed_node)
 --
 --	
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .terminateNodeConnections (name) returns int4
-    as '$libdir/slony1_funcs', '_Slony_I_terminateNodeConnections'
-	language C;
+create or replace function @NAMESPACE at .terminateNodeConnections (int4) returns int4
+as '
+declare
+	p_failed_node	alias for $1;
+	v_row			record;
+begin
+	for v_row in select nl_nodeid, nl_conncnt,
+			nl_backendpid from @NAMESPACE at .sl_nodelock
+			where nl_nodeid = p_failed_node for update
+	loop
+		perform @NAMESPACE at .killBackend(v_row.nl_backendpid, 15);
+		delete from @NAMESPACE at .sl_nodelock
+			where nl_nodeid = v_row.nl_nodeid
+			and nl_conncnt = v_row.nl_conncnt;
+	end loop;
 
-comment on function @NAMESPACE at .terminateNodeConnections (name) is 
-  'terminates connections to the node and terminates the process';
+	return 0;
+end;
+' language plpgsql;
+
+comment on function @NAMESPACE at .terminateNodeConnections (int4) is 
+  'terminates all backends that have registered to be from the given node';
 
 -- ----------------------------------------------------------------------
--- FUNCTION cleanupListener ()
+-- FUNCTION killBackend (pid, signo)
 --
 --	
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .cleanupListener () returns int4
-    as '$libdir/slony1_funcs', '_Slony_I_cleanupListener'
+create or replace function @NAMESPACE at .killBackend (int4, int4) returns int4
+    as '$libdir/slony1_funcs', '_Slony_I_killBackend'
 	language C;
 
-comment on function @NAMESPACE at .cleanupListener() is
-  'look for stale pg_listener entries and submit Async_Unlisten() to them';
+comment on function @NAMESPACE at .killBackend(int4, int4) is
+  'Send a signal to a postgres process. Requires superuser rights';
 
 -- ----------------------------------------------------------------------
 -- FUNCTION slon_quote_brute(text)
@@ -435,6 +451,56 @@
 
 
 -- ----------------------------------------------------------------------
+-- FUNCTION cleanupNodelock ()
+--
+--	Remove old entries from the nodelock table
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .cleanupNodelock ()
+returns int4
+as '
+declare
+	v_row		record;
+begin
+	for v_row in select nl_nodeid, nl_conncnt, nl_backendpid
+			from @NAMESPACE at .sl_nodelock
+			for update
+	loop
+		if @NAMESPACE at .killBackend(v_row.nl_backendpid, 0) < 0 then
+			raise notice ''Slony-I: cleanup stale sl_nodelock entry for pid=%'',
+					v_row.nl_backendpid;
+			delete from @NAMESPACE at .sl_nodelock where
+					nl_nodeid = v_row.nl_nodeid and
+					nl_conncnt = v_row.nl_conncnt;
+		end if;
+	end loop;
+
+	return 0;
+end;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
+-- FUNCTION registerNodeConnection (nodeid)
+--
+--	
+-- ----------------------------------------------------------------------
+create or replace function @NAMESPACE at .registerNodeConnection (int4)
+returns int4
+as '
+declare
+	p_nodeid	alias for $1;
+begin
+	insert into @NAMESPACE at .sl_nodelock
+		(nl_nodeid, nl_backendpid)
+		values
+		(p_nodeid, pg_backend_pid());
+
+	return 0;
+end;
+' language plpgsql;
+
+
+-- ----------------------------------------------------------------------
 -- FUNCTION initializeLocalNode (no_id, no_comment)
 --
 --	Initializes a new node.
@@ -937,8 +1003,7 @@
 	-- ----
 	-- Terminate all connections of the failed node the hard way
 	-- ----
-	perform @NAMESPACE at .terminateNodeConnections(
-			''_ at CLUSTERNAME@_Node_'' || p_failed_node);
+	perform @NAMESPACE at .terminateNodeConnections(p_failed_node);
 
 -- Note that the following code should all become obsolete in the wake
 -- of the availability of RebuildListenEntries()...
@@ -4232,6 +4297,11 @@
 		end if;
 	end loop;
 
+	-- ----
+	-- Also remove stale entries from the nodelock table.
+	-- ----
+	perform @NAMESPACE at .cleanupNodelock();
+
 	return 0;
 end;
 ' language plpgsql;
@@ -5110,6 +5180,25 @@
 		execute ''alter table @NAMESPACE at .sl_node add column no_spool boolean'';
 		update @NAMESPACE at .sl_node set no_spool = false;
 	end if;
+
+	-- ----
+	-- Changes for 1.1.3
+	-- ----
+	if p_old IN (''1.0.2'', ''1.0.5'', ''1.0.6'', ''1.1.0'', ''1.1.1'', ''1.1.2'') then
+		-- Add new table sl_nodelock
+		execute ''create table @NAMESPACE at .sl_nodelock (
+						nl_nodeid		int4,
+						nl_conncnt		serial,
+						nl_backendpid	int4,
+
+						CONSTRAINT "sl_nodelock-pkey"
+						PRIMARY KEY (nl_nodeid, nl_conncnt)
+					)'';
+		-- Drop obsolete functions
+		execute ''drop function @NAMESPACE at .terminateNodeConnections(name)'';
+		execute ''drop function @NAMESPACE at .cleanupListener()'';
+	end if;
+
 	return p_old;
 end;
 ' language plpgsql;
Index: slony1_funcs.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/backend/slony1_funcs.c,v
retrieving revision 1.33.2.1
retrieving revision 1.33.2.2
diff -Lsrc/backend/slony1_funcs.c -Lsrc/backend/slony1_funcs.c -u -w -r1.33.2.1 -r1.33.2.2
--- src/backend/slony1_funcs.c
+++ src/backend/slony1_funcs.c
@@ -44,8 +44,7 @@
 PG_FUNCTION_INFO_V1(_Slony_I_logTrigger);
 PG_FUNCTION_INFO_V1(_Slony_I_denyAccess);
 PG_FUNCTION_INFO_V1(_Slony_I_lockedSet);
-PG_FUNCTION_INFO_V1(_Slony_I_terminateNodeConnections);
-PG_FUNCTION_INFO_V1(_Slony_I_cleanupListener);
+PG_FUNCTION_INFO_V1(_Slony_I_killBackend);
 
 PG_FUNCTION_INFO_V1(_slon_quote_ident);
 
@@ -59,8 +58,7 @@
 Datum		_Slony_I_logTrigger(PG_FUNCTION_ARGS);
 Datum		_Slony_I_denyAccess(PG_FUNCTION_ARGS);
 Datum		_Slony_I_lockedSet(PG_FUNCTION_ARGS);
-Datum		_Slony_I_terminateNodeConnections(PG_FUNCTION_ARGS);
-Datum		_Slony_I_cleanupListener(PG_FUNCTION_ARGS);
+Datum		_Slony_I_killBackend(PG_FUNCTION_ARGS);
 
 Datum		_slon_quote_ident(PG_FUNCTION_ARGS);
 
@@ -1000,92 +998,21 @@
 
 
 Datum
-_Slony_I_terminateNodeConnections(PG_FUNCTION_ARGS)
+_Slony_I_killBackend(PG_FUNCTION_ARGS)
 {
-	Name		relname = PG_GETARG_NAME(0);
-	void	   *plan;
-	Oid			argtypes[1];
-	Datum		args  [1];
-	int			i;
 	int32		pid;
-	bool		isnull;
-
-	if (SPI_connect() < 0)
-		elog(ERROR, "Slony-I: SPI_connect() failed in terminateNodeConnections()");
-
-	argtypes[0] = NAMEOID;
-	plan = SPI_prepare("select listenerpid "
-					   "    from \"pg_catalog\".pg_listener "
-					   "    where relname = $1; ",
-					   1, argtypes);
-	if (plan == NULL)
-		elog(ERROR, "Slony-I: SPI_prepare() failed in terminateNodeConnections()");
-
-	args[0] = NameGetDatum(relname);
-	if (SPI_execp(plan, args, NULL, 0) != SPI_OK_SELECT)
-		elog(ERROR, "Slony-I: SPI_execp() failed in terminateNodeConnections()");
-
-	for (i = 0; i < SPI_processed; i++)
-	{
-		pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[i],
-										  SPI_tuptable->tupdesc, 1, &isnull));
-		elog(NOTICE, "Slony-I: terminating DB connection of failed node "
-			 "with pid %d", pid);
-		kill(pid, SIGTERM);
-	}
-
-	SPI_finish();
-
-	return (Datum)0;
-}
-
+	int32		signo;
 
-Datum
-_Slony_I_cleanupListener(PG_FUNCTION_ARGS)
-{
-	static void *plan = NULL;
-	int			i;
-	int32		pid;
-	char	   *relname;
-	bool		isnull;
-
-
-	if (SPI_connect() < 0)
-		elog(ERROR, "Slony-I: SPI_connect() failed in cleanupListener()");
-
-	if (plan == NULL)
-	{
-		plan = SPI_saveplan(SPI_prepare("select relname, listenerpid "
-									 "    from \"pg_catalog\".pg_listener; ",
-										0, NULL));
-		if (plan == NULL)
-			elog(ERROR, "Slony-I: SPI_prepare() failed in cleanupListener()");
-	}
-
-	if (SPI_execp(plan, NULL, NULL, 0) != SPI_OK_SELECT)
-		elog(ERROR, "Slony-I: SPI_execp() failed in cleanupListener()");
-
-	for (i = 0; i < SPI_processed; i++)
-	{
-		pid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[i],
-										  SPI_tuptable->tupdesc, 2, &isnull));
-		if (kill(pid, 0) < 0)
-		{
-			if (errno == ESRCH)
-			{
-				relname = SPI_getvalue(SPI_tuptable->vals[i],
-									   SPI_tuptable->tupdesc, 1);
+	if (!superuser())
+		elog(ERROR, "Slony-I: insufficient privilege for killBackend");
 
-				elog(NOTICE, "Slony-I: removing stale pg_listener entry "
-					 "for pid %d, relname %s", pid, relname);
-				Async_Unlisten(relname, pid);
-			}
-		}
-	}
+	pid		= PG_GETARG_INT32(0);
+	signo	= PG_GETARG_INT32(1);
 
-	SPI_finish();
+	if (kill(pid, signo) < 0)
+		PG_RETURN_INT32(-1);
 
-	return (Datum)0;
+	PG_RETURN_INT32(0);
 }
 
 
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.86.2.8
retrieving revision 1.86.2.9
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.86.2.8 -r1.86.2.9
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -314,14 +314,11 @@
 	local_dbconn = local_conn->dbconn;
 
 	/*
-	 * Put the connection into replication mode and listen on the special
-	 * relation telling what node daemon this connection belongs to.
+	 * Put the connection into replication mode
 	 */
 	slon_mkquery(&query1,
-				 "select %s.setSessionRole('_%s', 'slon'); "
-				 "listen \"_%s_Node_%d\"; ",
-				 rtcfg_namespace, rtcfg_cluster_name,
-				 rtcfg_cluster_name, rtcfg_nodeid);
+				 "select %s.setSessionRole('_%s', 'slon'); ",
+				 rtcfg_namespace, rtcfg_cluster_name);
 	if (query_execute(node, local_dbconn, &query1) < 0)
 		slon_abort();
 
@@ -2312,12 +2309,11 @@
 		}
 	}
 	/*
-	 * Listen on the special relation telling what node daemon this connection
-	 * belongs to.
+	 * Register this connection in sl_nodelock
 	 */
 	slon_mkquery(&query1,
-		     "listen \"_%s_Node_%d\"; ",
-		     rtcfg_cluster_name, rtcfg_nodeid);
+		     "select %s.registerNodeConnection(%d); ",
+		     rtcfg_namespace, rtcfg_nodeid);
 	if (query_execute(node, pro_dbconn, &query1) < 0)
 	{
 		slon_disconnectdb(pro_conn);
@@ -3702,8 +3698,8 @@
 			 * Listen on the special relation telling our node relationship
 			 */
 			slon_mkquery(&query,
-						 "listen \"_%s_Node_%d\"; ",
-						 rtcfg_cluster_name, rtcfg_nodeid);
+						 "select %s.registerNodeConnection(%d); ",
+						 rtcfg_namespace, rtcfg_nodeid);
 			if (query_execute(node, provider->conn->dbconn, &query) < 0)
 			{
 				TERMINATE_QUERY_AND_ARCHIVE;
@@ -4319,19 +4315,6 @@
 			}
 		}
 		PQclear(res1);
-
-		/*
-		 * Start listening on the special relation that will cause our local
-		 * connection to be killed when the provider node fails.
-		 */
-		slon_mkquery(&query,
-					 "listen \"_%s_Node_%d\"; ",
-					 rtcfg_cluster_name, provider->no_id);
-		if (query_execute(node, local_dbconn, &query) < 0)
-		{
-			dstring_free(&query);
-			return 60;
-		}
 	}
 
 	/*
@@ -4377,21 +4360,6 @@
 		}
 		PQclear(res1);
 	}
-	for (provider = wd->provider_head; provider; provider = provider->next)
-	{
-		/*
-		 * Stop listening on the special relations that will cause our local
-		 * connection to be killed when the provider node fails.
-		 */
-		slon_mkquery(&query,
-					 "unlisten \"_%s_Node_%d\"; ",
-					 rtcfg_cluster_name, provider->no_id);
-		if (query_execute(node, local_dbconn, &query) < 0)
-		{
-			TERMINATE_QUERY_AND_ARCHIVE;
-			return 60;
-		}
-	}
 
 	/*
 	 * Get the nodes rowid sequence at that sync time just in case we are
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.21
retrieving revision 1.21.2.1
diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.21 -r1.21.2.1
--- src/slon/remote_listen.c
+++ src/slon/remote_listen.c
@@ -224,15 +224,16 @@
 
 			/*
 			 * Listen on the connection for events and confirmations
+			 * and register the node connection.
 			 */
 			slon_mkquery(&query1,
 				     "listen \"_%s_Event\"; "
 				     "listen \"_%s_Confirm\"; "
-				     "listen \"_%s_Node_%d\"; ",
+				     "select %s.registerNodeConnection(%d); ",
 				     rtcfg_cluster_name, rtcfg_cluster_name,
-				     rtcfg_cluster_name, rtcfg_nodeid);
+				     rtcfg_namespace, rtcfg_nodeid);
 			res = PQexec(dbconn, dstring_data(&query1));
-			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			{
 				slon_log(SLON_ERROR,
 					 "remoteListenThread_%d: \"%s\" - %s",
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.31
retrieving revision 1.31.2.1
diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.31 -r1.31.2.1
--- src/slon/local_listen.c
+++ src/slon/local_listen.c
@@ -69,10 +69,8 @@
 	 * Listen for local events
 	 */
 	slon_mkquery(&query1,
-				 "select %s.cleanupListener(); "
 				 "listen \"_%s_Event\"; "
 				 "listen \"_%s_Restart\"; ",
-				 rtcfg_namespace,
 				 rtcfg_cluster_name, rtcfg_cluster_name);
 	res = PQexec(dbconn, dstring_data(&query1));
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
@@ -90,11 +88,13 @@
 	 * Check that we are the only slon daemon connected.
 	 */
 	slon_mkquery(&query1,
-				 "select true from \"pg_catalog\".pg_listener "
-				 "    where relname = '_%s_Restart';",
-				 rtcfg_cluster_name, rtcfg_cluster_name);
+				 "select %s.cleanupNodelock(); "
+				 "insert into %s.sl_nodelock values ("
+				 "    %d, 0, \"pg_catalog\".pg_backend_pid()); ",
+				 rtcfg_namespace, rtcfg_namespace,
+				 rtcfg_nodeid);
 	res = PQexec(dbconn, dstring_data(&query1));
-	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		slon_log(SLON_FATAL,
 				 "localListenThread: \"%s\" - %s",
@@ -103,14 +103,6 @@
 		dstring_free(&query1);
 		slon_abort();
 	}
-	if (PQntuples(res) != 1)
-	{
-		slon_log(SLON_FATAL,
-				 "localListenThread: Another slon daemon is serving this node already\n");
-		PQclear(res);
-		dstring_free(&query1);
-		slon_abort();
-	}
 	PQclear(res);
 
 	/*


More information about the Slony1-commit mailing list