Christopher Browne cbbrowne
Wed Dec 7 22:04:49 PST 2005
Attached is a patch to the slon code which *should* dramatically reduce
the amount of pg_listener bloat, against CVS HEAD.

The notion is that when the slon is working well, processing events with
great regularity, we should be able to do some polling, and UNLISTEN to
events being handled via pg_listener, which completely eliminates the
tendancy for pg_listener to bloat up.

By now, people probably know my tendancy to prefer adaptive
self-configuration, where possible :-).

We have two states enumerated:

enum pstate_enum {POLL=1, LISTEN};
static enum pstate_enum pstate;

Polling times are managed via poll_sleep...
static int poll_sleep;

- Any time the event loop finds events, the sleep time gets set to 0 so
that the next iteration starts immediately.

- Any time the event loop doesn't find events, we double poll_sleep and
add sync_interval (the -s option value)

This causes polling to rapidly double, heading towards... OUCH! 
sync_interval_timeout.

If it reaches sync_interval_timeout, we switch states from POLL to
LISTEN, and turn on listening to the node's Event.

And when we get an event, again, we switch back to POLLing mode.

So, we'll generally have two kinds of behaviour:

1.  If your application is applying changes very infrequently, then
you'll LISTEN, waking up once in a while when there's an update.  Not
many events generated will mean not many dead pg_listener tuples.

2.  If your application applies changes very frequently, then you'll
mostly POLL, which will generate exactly 0 dead pg_listener tuples :-).

Both of those have the similar "not many dead pg_listener tuples" property.

This would doubtless resolve the common desire for "less dead
pg_listener tuples," particularly in combination with yesterday's patch
that eliminates using LISTEN/NOTIFY for event confirmations :-).

I thought I should put this on the list for some scrutiny before
applying to CVS.  But it's a pretty keen change to make :-).
-------------- next part --------------
Index: local_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
retrieving revision 1.35
diff -c -u -r1.35 local_listen.c
--- local_listen.c	22 Nov 2005 05:11:58 -0000	1.35
+++ local_listen.c	7 Dec 2005 21:04:20 -0000
@@ -49,6 +49,7 @@
 	PGnotify   *notification;
 	char		restart_notify[256];
 	int			restart_request;
+	int poll_sleep = 0;
 
 	slon_log(SLON_DEBUG1, "localListenThread: thread starts\n");
 
@@ -69,9 +70,10 @@
 	 * Listen for local events
 	 */
 	slon_mkquery(&query1,
-				 "listen \"_%s_Event\"; "
-				 "listen \"_%s_Restart\"; ",
-				 rtcfg_cluster_name, rtcfg_cluster_name);
+		     /* "listen \"_%s_Event\"; " */
+		     "listen \"_%s_Restart\"; ",
+		     /*	 rtcfg_cluster_name,  */
+		     rtcfg_cluster_name);
 	res = PQexec(dbconn, dstring_data(&query1));
 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
@@ -636,6 +638,7 @@
 		 */
 		if (ntuples > 0)
 		{
+			poll_sleep = 0;  /* drop polling time back to 0... */
 			res = PQexec(dbconn, "commit transaction");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -654,6 +657,12 @@
 			/*
 			 * No database events received. Rollback instead.
 			 */
+
+			/* Increase the amount of time to sleep, to a max of sync_interval_timeout */
+			poll_sleep += sync_interval;
+			if (poll_sleep > sync_interval_timeout) {
+				poll_sleep = sync_interval_timeout;
+			}
 			res = PQexec(dbconn, "rollback transaction;");
 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
@@ -668,9 +677,9 @@
 		}
 
 		/*
-		 * Wait for notify
+		 * Wait for notify or for timeout
 		 */
-		if (sched_wait_conn(conn, SCHED_WAIT_SOCK_READ) != SCHED_STATUS_OK)
+		if (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep) != SCHED_STATUS_OK)
 			break;
 	}
 
Index: remote_listen.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
retrieving revision 1.26
diff -c -u -r1.26 remote_listen.c
--- remote_listen.c	6 Dec 2005 21:34:54 -0000	1.26
+++ remote_listen.c	7 Dec 2005 21:04:20 -0000
@@ -58,6 +58,10 @@
 static int remoteListen_receive_events(SlonNode * node,
 					SlonConn * conn, struct listat * listat);
 
+static int poll_sleep;
+enum pstate_enum {POLL=1, LISTEN};
+static enum pstate_enum pstate;
+
 extern char *lag_interval;
 
 /* ----------
@@ -98,8 +102,11 @@
 	listat_tail = NULL;
 	dstring_init(&query1);
 
+	poll_sleep = 0;
+	pstate = POLL;      /* Initially, start in Polling mode */
+
 	sprintf(conn_symname, "node_%d_listen", node->no_id);
-/*	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); */
+	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name);
 
 	/*
 	 * Work until doomsday
@@ -229,13 +236,24 @@
 			 * register the node connection.
 			 */
 			slon_mkquery(&query1,
-				     "listen \"_%s_Event\"; "
+				     /* "listen \"_%s_Event\"; " */
 				     /*	 skip confirms "listen \"_%s_Confirm\"; " */
 				     "select _%s.registerNodeConnection(%d); ",
-				     rtcfg_cluster_name, rtcfg_cluster_name,
+				     /* rtcfg_cluster_name,  */
+				     rtcfg_cluster_name,
 				     rtcfg_namespace, rtcfg_nodeid);
+
+			if (pstate == LISTEN) {
+				slon_appendquery(&query1, 
+						 "listen \"_%s_Event\"; ",
+						 rtcfg_cluster_name);
+			} else {
+				slon_appendquery(&query1, 
+						 "unlisten \"_%s_Event\"; ",
+						 rtcfg_cluster_name);
+			}
 			res = PQexec(dbconn, dstring_data(&query1));
-			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
 			{
 				slon_log(SLON_ERROR,
 					 "remoteListenThread_%d: \"%s\" - %s",
@@ -299,6 +317,7 @@
 		/*
 		 * Receive events from the provider node
 		 */
+		enum pstate_enum oldpstate = pstate;
 		rc = remoteListen_receive_events(node, conn, listat_head);
 		if (rc < 0)
 		{
@@ -313,6 +332,41 @@
 
 			continue;
 		}
+		if (oldpstate != pstate) { /* Switched states... */
+			switch (pstate) {
+			case POLL:
+				slon_log(SLON_DEBUG2, 
+					 "remoteListenThread_%d: UNLISTEN\n",
+					 node->no_id);
+
+				slon_mkquery(&query1,
+					     "unlisten \"_%s_Event\"; ",
+					     rtcfg_cluster_name);
+				break;
+			case LISTEN:
+				slon_log(SLON_DEBUG2, 
+					 "remoteListenThread_%d: LISTEN\n",
+					 node->no_id);
+				slon_mkquery(&query1,
+					     "listen \"_%s_Event\"; ",
+					     rtcfg_cluster_name);
+				break;
+			}			
+			res = PQexec(dbconn, dstring_data(&query1));
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			{
+				slon_log(SLON_ERROR,
+					 "remoteListenThread_%d: \"%s\" - %s",
+					 node->no_id,
+					 dstring_data(&query1), PQresultErrorMessage(res));
+				PQclear(res);
+				slon_disconnectdb(conn);
+				free(conn_conninfo);
+				conn = NULL;
+				conn_conninfo = NULL;
+				continue;
+			}
+		}
 
 		/*
 		 * If the remote node notified for new confirmations, read them and
@@ -343,7 +397,7 @@
 		/*
 		 * Wait for notification.
 		 */
-		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 10000);
+		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep);
 		if (rc == SCHED_STATUS_CANCEL)
 			continue;
 		if (rc != SCHED_STATUS_OK)
@@ -746,6 +800,16 @@
 		  (PQgetisnull(res, tupno, 14)) ? NULL : PQgetvalue(res, tupno, 14));
 	}
 
+	if (ntuples > 0) {
+		poll_sleep = 0;
+		pstate = POLL;
+	} else {
+		poll_sleep = poll_sleep * 2 + sync_interval;
+		if (poll_sleep > sync_interval_timeout) {
+			poll_sleep = sync_interval_timeout;
+			pstate = LISTEN;
+		}
+	}
 	PQclear(res);
 
 	return 0;


More information about the Slony1-general mailing list