Jan Wieck JanWieck
Thu Dec 8 16:22:12 PST 2005
On 12/8/2005 10:51 AM, CVS User Account wrote:

> Log Message:
> -----------
> Feature Request #1280 - Reduce generation of LISTEN/NOTIFY entries to cut pg_listener bloat
> 
> Attached is a patch to the slon code which should dramatically reduce
> the amount of pg_listener bloat.
> 
> The notion is that when the slon is working well, processing events with
> great regularity, we switch over to do polling, and UNLISTEN to events
> being handled via pg_listener, which entirely eliminates the
> tendancy for pg_listener to bloat up.
> 
> By now, people probably know my tendancy to prefer adaptive
> self-configuration, where possible  :-) .
> 
> We have two states enumerated:
> 
> enum pstate_enum {POLL=1, LISTEN};
> static enum pstate_enum pstate;
> 
> Polling times are managed via poll_sleep...
> static int poll_sleep;
> 
> - Any time the event loop finds events, the sleep time gets set to 0 so
> that the next iteration starts immediately.
> 
> - Any time the event loop doesn't find events, we double poll_sleep and
> add sync_interval (the -s option value)

But this is the subscribers sync_interval, not the one on the origin or 
data provider. Normally a subscriber doesn't need a very high sync 
interval. The sync_interval controls how often a node looks at the 
log_actionseq if there have been any changes, so it is in fact a polling 
interval of the origins slon against the master DB. Consider this case:

The origin is running with -s1000, the subscriber -s10000. This means 
that on a busy site, the origin will generate a SYNC every second. If 
your subscriber just got an event via NOTIFY, it sets the sleep time to 
0 seconds. So it polls immediately after receiving this event, doesn't 
find another one, adds its own sync_interval to the value and will poll 
again in 10 seconds.

I'd say this is a little too sleepy ;-)


Jan

> 
> This causes polling to rapidly double, heading towards... OUCH! 
> sync_interval_timeout.
> 
> If it reaches sync_interval_timeout, we switch states from POLL to
> LISTEN, and turn on listening to the node's Event.
> 
> And when we get an event, again, we switch back to POLLing mode.
> 
> So, we'll generally have two kinds of behaviour:
> 
> 1.  If your application is applying changes very infrequently, then
> you'll LISTEN, waking up once in a while when there's an update.  Not
> many events generated will mean not many dead pg_listener tuples.
> 
> 2.  If your application applies changes very frequently, then you'll
> mostly POLL, which will generate exactly 0 dead pg_listener tuples  :-) .
> 
> Both of those have the similar "not many dead pg_listener tuples" property.
> 
> This would doubtless resolve the common desire for "less dead
> pg_listener tuples," particularly in combination with a previous patch
> that eliminates using LISTEN/NOTIFY for event confirmations.
> 
> Modified Files:
> --------------
>     slony1-engine/src/slon:
>         local_listen.c (r1.35 -> r1.36)
>         remote_listen.c (r1.27 -> r1.28)
> 
> 
> 
> ------------------------------------------------------------------------
> 
> Index: remote_listen.c
> ===================================================================
> RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_listen.c,v
> retrieving revision 1.27
> retrieving revision 1.28
> diff -Lsrc/slon/remote_listen.c -Lsrc/slon/remote_listen.c -u -w -r1.27 -r1.28
> --- src/slon/remote_listen.c
> +++ src/slon/remote_listen.c
> @@ -58,6 +58,10 @@
>  static int remoteListen_receive_events(SlonNode * node,
>  					SlonConn * conn, struct listat * listat);
>  
> +static int poll_sleep;
> +enum pstate_enum {POLL=1, LISTEN};
> +static enum pstate_enum pstate;
> +
>  extern char *lag_interval;
>  
>  /* ----------
> @@ -98,8 +102,11 @@
>  	listat_tail = NULL;
>  	dstring_init(&query1);
>  
> +	poll_sleep = 0;
> +	pstate = POLL;      /* Initially, start in Polling mode */
> +
>  	sprintf(conn_symname, "node_%d_listen", node->no_id);
> -/*	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name); */
> +	sprintf(notify_confirm, "_%s_Confirm", rtcfg_cluster_name);
>  
>  	/*
>  	 * Work until doomsday
> @@ -229,13 +236,23 @@
>  			 * register the node connection.
>  			 */
>  			slon_mkquery(&query1,
> -				     "listen \"_%s_Event\"; "
> +				     /* "listen \"_%s_Event\"; " */
>  				     /*	 skip confirms "listen \"_%s_Confirm\"; " */
> -				     "select %s.registerNodeConnection(%d); ",
> -				     rtcfg_cluster_name, /* rtcfg_cluster_name, */
> +				     "select _%s.registerNodeConnection(%d); ",
> +				     /* rtcfg_cluster_name,  */
>  				     rtcfg_namespace, rtcfg_nodeid);
> +
> +			if (pstate == LISTEN) {
> +				slon_appendquery(&query1, 
> +						 "listen \"_%s_Event\"; ",
> +						 rtcfg_cluster_name);
> +			} else {
> +				slon_appendquery(&query1, 
> +						 "unlisten \"_%s_Event\"; ",
> +						 rtcfg_cluster_name);
> +			}
>  			res = PQexec(dbconn, dstring_data(&query1));
> -			if (PQresultStatus(res) != PGRES_TUPLES_OK)
> +			if (PQresultStatus(res) != PGRES_COMMAND_OK)
>  			{
>  				slon_log(SLON_ERROR,
>  					 "remoteListenThread_%d: \"%s\" - %s",
> @@ -299,6 +316,7 @@
>  		/*
>  		 * Receive events from the provider node
>  		 */
> +		enum pstate_enum oldpstate = pstate;
>  		rc = remoteListen_receive_events(node, conn, listat_head);
>  		if (rc < 0)
>  		{
> @@ -313,6 +331,41 @@
>  
>  			continue;
>  		}
> +		if (oldpstate != pstate) { /* Switched states... */
> +			switch (pstate) {
> +			case POLL:
> +				slon_log(SLON_DEBUG2, 
> +					 "remoteListenThread_%d: UNLISTEN\n",
> +					 node->no_id);
> +
> +				slon_mkquery(&query1,
> +					     "unlisten \"_%s_Event\"; ",
> +					     rtcfg_cluster_name);
> +				break;
> +			case LISTEN:
> +				slon_log(SLON_DEBUG2, 
> +					 "remoteListenThread_%d: LISTEN\n",
> +					 node->no_id);
> +				slon_mkquery(&query1,
> +					     "listen \"_%s_Event\"; ",
> +					     rtcfg_cluster_name);
> +				break;
> +			}			
> +			res = PQexec(dbconn, dstring_data(&query1));
> +			if (PQresultStatus(res) != PGRES_COMMAND_OK)
> +			{
> +				slon_log(SLON_ERROR,
> +					 "remoteListenThread_%d: \"%s\" - %s",
> +					 node->no_id,
> +					 dstring_data(&query1), PQresultErrorMessage(res));
> +				PQclear(res);
> +				slon_disconnectdb(conn);
> +				free(conn_conninfo);
> +				conn = NULL;
> +				conn_conninfo = NULL;
> +				continue;
> +			}
> +		}
>  
>  		/*
>  		 * If the remote node notified for new confirmations, read them and
> @@ -343,7 +396,7 @@
>  		/*
>  		 * Wait for notification.
>  		 */
> -		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, 10000);
> +		rc = sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep);
>  		if (rc == SCHED_STATUS_CANCEL)
>  			continue;
>  		if (rc != SCHED_STATUS_OK)
> @@ -746,6 +799,16 @@
>  		  (PQgetisnull(res, tupno, 14)) ? NULL : PQgetvalue(res, tupno, 14));
>  	}
>  
> +	if (ntuples > 0) {
> +		poll_sleep = 0;
> +		pstate = POLL;
> +	} else {
> +		poll_sleep = poll_sleep * 2 + sync_interval;
> +		if (poll_sleep > sync_interval_timeout) {
> +			poll_sleep = sync_interval_timeout;
> +			pstate = LISTEN;
> +		}
> +	}
>  	PQclear(res);
>  
>  	return 0;
> Index: local_listen.c
> ===================================================================
> RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/local_listen.c,v
> retrieving revision 1.35
> retrieving revision 1.36
> diff -Lsrc/slon/local_listen.c -Lsrc/slon/local_listen.c -u -w -r1.35 -r1.36
> --- src/slon/local_listen.c
> +++ src/slon/local_listen.c
> @@ -49,6 +49,7 @@
>  	PGnotify   *notification;
>  	char		restart_notify[256];
>  	int			restart_request;
> +	int poll_sleep = 0;
>  
>  	slon_log(SLON_DEBUG1, "localListenThread: thread starts\n");
>  
> @@ -69,9 +70,10 @@
>  	 * Listen for local events
>  	 */
>  	slon_mkquery(&query1,
> -				 "listen \"_%s_Event\"; "
> +		     /* "listen \"_%s_Event\"; " */
>  				 "listen \"_%s_Restart\"; ",
> -				 rtcfg_cluster_name, rtcfg_cluster_name);
> +		     /*	 rtcfg_cluster_name,  */
> +		     rtcfg_cluster_name);
>  	res = PQexec(dbconn, dstring_data(&query1));
>  	if (PQresultStatus(res) != PGRES_COMMAND_OK)
>  	{
> @@ -636,6 +638,7 @@
>  		 */
>  		if (ntuples > 0)
>  		{
> +			poll_sleep = 0;  /* drop polling time back to 0... */
>  			res = PQexec(dbconn, "commit transaction");
>  			if (PQresultStatus(res) != PGRES_COMMAND_OK)
>  			{
> @@ -654,6 +657,12 @@
>  			/*
>  			 * No database events received. Rollback instead.
>  			 */
> +
> +			/* Increase the amount of time to sleep, to a max of sync_interval_timeout */
> +			poll_sleep += sync_interval;
> +			if (poll_sleep > sync_interval_timeout) {
> +				poll_sleep = sync_interval_timeout;
> +			}
>  			res = PQexec(dbconn, "rollback transaction;");
>  			if (PQresultStatus(res) != PGRES_COMMAND_OK)
>  			{
> @@ -668,9 +677,9 @@
>  		}
>  
>  		/*
> -		 * Wait for notify
> +		 * Wait for notify or for timeout
>  		 */
> -		if (sched_wait_conn(conn, SCHED_WAIT_SOCK_READ) != SCHED_STATUS_OK)
> +		if (sched_wait_time(conn, SCHED_WAIT_SOCK_READ, poll_sleep) != SCHED_STATUS_OK)
>  			break;
>  	}
>  
> 
> 
> ------------------------------------------------------------------------
> 
> _______________________________________________
> Slony1-commit mailing list
> Slony1-commit at gborg.postgresql.org
> http://gborg.postgresql.org/mailman/listinfo/slony1-commit


-- 
#======================================================================#
# It's easier to get forgiveness for being wrong than for being right. #
# Let's break this rule - forgive me.                                  #
#================================================== JanWieck at Yahoo.com #


More information about the Slony1-general mailing list