Christopher Browne cbbrowne at ca.afilias.info
Thu Dec 20 11:56:35 PST 2007
The patch below, applied to CVS HEAD, changes behaviour thus:

- All of the interesting cleanup work is now done in the stored
  function, cleanupEvent(interval, boolean).

  Interesting side-effect: You can now induce a cleanup manually,
  which will be useful for testing.

- This function now has two parameters, passed in from slon config
  parameters:

  interval - cleanup_interval (default '10 minutes')

   This controls how quickly old events are trimmed out.  It used to
   be a hard-coded value.

   Old events are trimmed out once the confirmations are aged by
   (cleanup_interval).

   This then controls when the data in sl_log_1/sl_log_2 can be
   dropped.

   Data in *those* tables is deleted when it is older than the
   earliest XID still captured in sl_event.

  boolean - cleanup_deletelogs (default 'false')

   This controls whether or not we DELETE data from sl_log_1/sl_log_2.

- We now consider initiating a log switch every time cleanupEvent()
  runs.

  If the call to logswitch_finish() indicates that there was no log
  switch in progress, we initiate one.

  This means that log switches will be initiated almost as often as
  possible.  That's a policy well worth debating :-).

- logswitch_finish() changes a fair bit...

  It uses the same logic as in cleanupEvent() to determine if there
  are any *relevant* tuples left in sl_log_[whatever], rather than
  (potentially) scanning the table to see if there are any undeleted
  tuples left.


Index: src/backend/slony1_funcs.sql
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/backend/slony1_funcs.sql,v
retrieving revision 1.126
diff -c -u -r1.126 slony1_funcs.sql
--- src/backend/slony1_funcs.sql	11 Dec 2007 20:39:41 -0000	1.126
+++ src/backend/slony1_funcs.sql	20 Dec 2007 19:42:14 -0000
@@ -1958,7 +1958,7 @@
 	-- Remember our snapshots xmax as for the set locking
 	-- ----
 	update @NAMESPACE at .sl_set
-			set set_locked = "pg_catalog".txid_snapshot_xmax("public".txid_current_snapshot())
+			set set_locked = "pg_catalog".txid_snapshot_xmax("pg_catalog".txid_current_snapshot())
 			where set_id = p_set_id;
 
 	return p_set_id;
@@ -2080,7 +2080,7 @@
 	if v_set_row.set_locked isnull then
 		raise exception ''Slony-I: set % is not locked'', p_set_id;
 	end if;
-	if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("public".txid_current_snapshot()) then
+	if v_set_row.set_locked > "pg_catalog".txid_snapshot_xmin("pg_catalog".txid_current_snapshot()) then
 		raise exception ''Slony-I: cannot move set % yet, transactions < % are still in progress'',
 				p_set_id, v_set_row.set_locked;
 	end if;
@@ -4382,16 +4382,22 @@
 p_con_timestamp, and raises an event to forward this confirmation.';
 
 -- ----------------------------------------------------------------------
--- FUNCTION cleanupEvent ()
+-- FUNCTION cleanupEvent (interval, deletelogs)
 --
 -- ----------------------------------------------------------------------
-create or replace function @NAMESPACE at .cleanupEvent ()
+create or replace function @NAMESPACE at .cleanupEvent (interval, boolean)
 returns int4
 as '
 declare
+	p_interval alias for $1;
+	p_deletelogs alias for $2;
 	v_max_row	record;
 	v_min_row	record;
 	v_max_sync	int8;
+	v_origin	int8;
+	v_seqno		int8;
+	v_xmin		bigint;
+	v_rc            int8;
 begin
 	-- ----
 	-- First remove all but the oldest confirm row per origin,receiver pair
@@ -4466,14 +4472,33 @@
 	-- ----
 	perform @NAMESPACE at .cleanupNodelock();
 
+	-- ----
+	-- Find the eldest event left, for each origin
+	-- ----
+        for v_origin, v_seqno, v_xmin in
+	  select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from @NAMESPACE at .sl_event
+          where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from @NAMESPACE at .sl_event where ev_type = ''SYNC'' group by ev_origin)
+	loop
+		if p_deletelogs then
+			delete from @NAMESPACE at .sl_log_1 where log_origin = v_origin and log_txid < v_xmin;		
+			delete from @NAMESPACE at .sl_log_2 where log_origin = v_origin and log_txid < v_xmin;		
+		end if;
+		delete from @NAMESPACE at .sl_seqlog where seql_origin = v_origin and seql_ev_seqno < v_seqno;
+        end loop;
+	
+	v_rc := @NAMESPACE at .logswitch_finish();
+	if v_rc = 0 then   -- no switch in progress
+		perform @NAMESPACE at .logswitch_start();
+	end if;
+
 	return 0;
 end;
 ' language plpgsql;
-comment on function @NAMESPACE at .cleanupEvent () is
+comment on function @NAMESPACE at .cleanupEvent (interval, boolean) is
 'cleaning old data out of sl_confirm, sl_event.  Removes all but the
 last sl_confirm row per (origin,receiver), and then removes all events
 that are confirmed by all nodes in the whole cluster up to the last
-SYNC.  ';
+SYNC.  Deletes now-orphaned entries from sl_log_* if delete_logs parameter is set';
 
 
 -- ----------------------------------------------------------------------
@@ -5080,6 +5105,10 @@
 DECLARE
 	v_current_status	int4;
 	v_dummy				record;
+	v_origin	int8;
+	v_seqno		int8;
+	v_xmin		bigint;
+	v_purgeable boolean;
 BEGIN
 	-- ----
 	-- Grab the central configuration lock to prevent race conditions
@@ -5103,18 +5132,29 @@
 	-- status = 2: sl_log_1 active, cleanup sl_log_2
 	-- ----
 	if v_current_status = 2 then
+		v_purgeable := ''true'';
 		-- ----
 		-- The cleanup thread calls us after it did the delete and
 		-- vacuum of both log tables. If sl_log_2 is empty now, we
 		-- can truncate it and the log switch is done.
 		-- ----
-		for v_dummy in select 1 from @NAMESPACE at .sl_log_2 loop
+		
+	        for v_origin, v_seqno, v_xmin in
+		  select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from @NAMESPACE at .sl_event
+	          where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from @NAMESPACE at .sl_event where ev_type = ''SYNC'' group by ev_origin)
+		loop
+			select 1 from @NAMESPACE at .sl_log_2 where log_origin = v_origin and log_txid < v_xmin limit 1;		
+			if exists then
+				v_purgeable := ''false'';
+			end if;
+	        end loop;
+		if not v_purgeable then
 			-- ----
 			-- Found a row ... log switch is still in progress.
 			-- ----
 			raise notice ''Slony-I: log switch to sl_log_1 still in progress - sl_log_2 not truncated'';
 			return -1;
-		end loop;
+		end if;
 
 		raise notice ''Slony-I: log switch to sl_log_1 complete - truncate sl_log_2'';
 		truncate @NAMESPACE at .sl_log_2;
@@ -5132,18 +5172,28 @@
 	-- status = 3: sl_log_2 active, cleanup sl_log_1
 	-- ----
 	if v_current_status = 3 then
+		v_purgeable := ''true'';
 		-- ----
 		-- The cleanup thread calls us after it did the delete and
 		-- vacuum of both log tables. If sl_log_2 is empty now, we
 		-- can truncate it and the log switch is done.
 		-- ----
-		for v_dummy in select 1 from @NAMESPACE at .sl_log_1 loop
+	        for v_origin, v_seqno, v_xmin in
+		  select ev_origin, ev_seqno, "pg_catalog".txid_snapshot_xmin(ev_snapshot) from @NAMESPACE at .sl_event
+	          where (ev_origin, ev_seqno) in (select ev_origin, min(ev_seqno) from @NAMESPACE at .sl_event where ev_type = ''SYNC'' group by ev_origin)
+		loop
+			select 1 from @NAMESPACE at .sl_log_1 where log_origin = v_origin and log_txid < v_xmin limit 1;		
+			if exists then
+				v_purgeable := ''false'';
+			end if;
+	        end loop;
+		if not v_purgeable then
 			-- ----
 			-- Found a row ... log switch is still in progress.
 			-- ----
 			raise notice ''Slony-I: log switch to sl_log_2 still in progress - sl_log_1 not truncated'';
 			return -1;
-		end loop;
+		end if;
 
 		raise notice ''Slony-I: log switch to sl_log_2 complete - truncate sl_log_1'';
 		truncate @NAMESPACE at .sl_log_1;
@@ -5160,7 +5210,13 @@
 comment on function @NAMESPACE at .logswitch_finish() is
 'logswitch_finish()
 
-Attempt to finalize a log table switch in progress';
+Attempt to finalize a log table switch in progress
+return values:
+  -1 if switch in progress, but not complete
+   0 if no switch in progress
+   1 if performed truncate on sl_log_2
+   2 if performed truncate on sl_log_1
+';
 
 
 -- ----------------------------------------------------------------------
Index: src/slon/cleanup_thread.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/cleanup_thread.c,v
retrieving revision 1.40
diff -c -u -r1.40 cleanup_thread.c
--- src/slon/cleanup_thread.c	19 Oct 2007 18:38:35 -0000	1.40
+++ src/slon/cleanup_thread.c	20 Dec 2007 19:42:14 -0000
@@ -30,6 +30,9 @@
  * ----------
  */
 int			vac_frequency = SLON_VACUUM_FREQUENCY;
+char *cleanup_interval;
+bool cleanup_deletelogs;
+
 static int	vac_bias = 0;
 static unsigned long earliest_xid = 0;
 static unsigned long get_earliest_xid(PGconn *dbconn);
@@ -45,17 +48,16 @@
 cleanupThread_main(/*@unused@*/ void *dummy)
 {
 	SlonConn   *conn;
-	SlonDString query1;
+	SlonDString query_baseclean;
 	SlonDString query2;
-	SlonDString query3;
+	SlonDString query_pertbl;
 
 	PGconn	   *dbconn;
 	PGresult   *res;
 	PGresult   *res2;
 	struct timeval tv_start;
 	struct timeval tv_end;
-	int			n,
-				t;
+	int			t;
 	int			vac_count = 0;
 	int			vac_enable = SLON_VACUUM_FREQUENCY;
 	char	   *vacuum_action;
@@ -91,8 +93,12 @@
 	/*
 	 * Build the query string for calling the cleanupEvent() stored procedure
 	 */
-	dstring_init(&query1);
-	slon_mkquery(&query1, "select %s.cleanupEvent(); ", rtcfg_namespace);
+	dstring_init(&query_baseclean);
+	slon_mkquery(&query_baseclean, "select %s.cleanupEvent('%s'::interval, '%s'::boolean); ", 
+		     rtcfg_namespace, 
+		     cleanup_interval,
+		     cleanup_deletelogs ? "true" : "false"
+		);
 	dstring_init(&query2);
 
 	/*
@@ -109,12 +115,12 @@
 		 * Call the stored procedure cleanupEvent()
 		 */
 		gettimeofday(&tv_start, NULL);
-		res = PQexec(dbconn, dstring_data(&query1));
+		res = PQexec(dbconn, dstring_data(&query_baseclean));
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
 			slon_log(SLON_FATAL,
 					 "cleanupThread: \"%s\" - %s",
-					 dstring_data(&query1), PQresultErrorMessage(res));
+					 dstring_data(&query_baseclean), PQresultErrorMessage(res));
 			PQclear(res);
 			slon_retry();
 			break;
@@ -125,87 +131,17 @@
 				 "cleanupThread: %8.3f seconds for cleanupEvent()\n",
 				 TIMEVAL_DIFF(&tv_start, &tv_end));
 
-		/*
-		 * Clean up the logs and eventually finish switching logs
-		 */
-		gettimeofday(&tv_start, NULL);
 		slon_mkquery(&query2,
-					 "select ev_origin, ev_seqno, "
-					 "  \"public\".txid_snapshot_xmin(ev_snapshot) "
-					 "from %s.sl_event "
-					 "where (ev_origin, ev_seqno) in "
-					 "    (select ev_origin, min(ev_seqno) "
-					 "     from %s.sl_event "
-					 "     where ev_type = 'SYNC' "
-					 "     group by ev_origin); ",
-					 rtcfg_namespace, rtcfg_namespace);
-		res = PQexec(dbconn, dstring_data(&query2));
-		if (PQresultStatus(res) != PGRES_TUPLES_OK)
-		{
-			slon_log(SLON_FATAL,
-					 "cleanupThread: \"%s\" - %s",
-					 dstring_data(&query2), PQresultErrorMessage(res));
-			PQclear(res);
-			slon_retry();
-			break;
-		}
-		n = PQntuples(res);
-		for (t = 0; t < n; t++)
+			     "select %s.logswitch_weekly(); ",
+			     rtcfg_namespace);
+		res2 = PQexec(dbconn, dstring_data(&query2));
+		if (PQresultStatus(res2) != PGRES_TUPLES_OK)
 		{
-			slon_mkquery(&query2,
-						 "delete from %s.sl_log_1 "
-						 "where log_origin = '%s' "
-						 "and log_txid < '%s'; "
-						 "delete from %s.sl_log_2 "
-						 "where log_origin = '%s' "
-						 "and log_txid < '%s'; "
-						 "delete from %s.sl_seqlog "
-						 "where seql_origin = '%s' "
-						 "and seql_ev_seqno < '%s'; "
-						 "select %s.logswitch_finish(); ",
-						 rtcfg_namespace, PQgetvalue(res, t, 0),
-						 PQgetvalue(res, t, 2),
-						 rtcfg_namespace, PQgetvalue(res, t, 0),
-						 PQgetvalue(res, t, 2),
-						 rtcfg_namespace, PQgetvalue(res, t, 0),
-						 PQgetvalue(res, t, 1),
-						 rtcfg_namespace);
-			res2 = PQexec(dbconn, dstring_data(&query2));
-			if (PQresultStatus(res2) != PGRES_TUPLES_OK)
-			{
-				slon_log(SLON_FATAL,
-						 "cleanupThread: \"%s\" - %s",
-						 dstring_data(&query2), PQresultErrorMessage(res2));
-				PQclear(res);
-				PQclear(res2);
-				slon_retry();
-				break;
-			}
-			PQclear(res2);
-
-			/*
-			 * Eventually kick off a logswitch. This might fail,
-			 * but this is not really slon's problem, so we just
-			 * shrug and move on if it does.
-			 */
-			slon_mkquery(&query2,
-						 "select %s.logswitch_weekly(); ",
-						 rtcfg_namespace);
-			res2 = PQexec(dbconn, dstring_data(&query2));
-			if (PQresultStatus(res2) != PGRES_TUPLES_OK)
-			{
-				slon_log(SLON_WARN,
-						 "cleanupThread: \"%s\" - %s",
-						 dstring_data(&query2), PQresultErrorMessage(res2));
-			}
-			PQclear(res2);
+			slon_log(SLON_WARN,
+				 "cleanupThread: \"%s\" - %s",
+				 dstring_data(&query2), PQresultErrorMessage(res2));
 		}
-		PQclear(res);
-		gettimeofday(&tv_end, NULL);
-		slon_log(SLON_INFO,
-				 "cleanupThread: %8.3f seconds for delete logs\n",
-				 TIMEVAL_DIFF(&tv_start, &tv_end));
-
+		PQclear(res2);
 		/*
 		 * Detain the usual suspects (vacuum event and log data)
 		 */
@@ -266,21 +202,21 @@
 
 				slon_log (SLON_DEBUG1, "cleanupThread: %s analyze \"%s\".%s;\n",
 					      vacuum_action, tab_nspname, tab_relname);
-				dstring_init(&query3);
-				slon_mkquery (&query3, "%s analyze \"%s\".%s;",
+				dstring_init(&query_pertbl);
+				slon_mkquery (&query_pertbl, "%s analyze \"%s\".%s;",
 					      vacuum_action, tab_nspname, tab_relname);
-				res2 = PQexec(dbconn, dstring_data(&query3));
+				res2 = PQexec(dbconn, dstring_data(&query_pertbl));
 				if (PQresultStatus(res) != PGRES_COMMAND_OK)  /* query error */
                                 {
                  	                slon_log(SLON_ERROR,
 	                                        "cleanupThread: \"%s\" - %s",
-                                                dstring_data(&query3), PQresultErrorMessage(res2));
+                                                dstring_data(&query_pertbl), PQresultErrorMessage(res2));
                                                 /*
                                                  * slon_retry(); break;
                                                  */                  
                                 }
 				PQclear(res2);
-				dstring_reset(&query3);
+				dstring_reset(&query_pertbl);
 			}
 			gettimeofday(&tv_end, NULL);
 			slon_log(SLON_INFO,
@@ -290,7 +226,7 @@
 			/*
 			 * Free Resources
 			 */
-			dstring_free(&query3);
+			dstring_free(&query_pertbl);
 
 		}
 	}
@@ -298,7 +234,7 @@
 	/*
 	 * Free Resources
 	 */
-	dstring_free(&query1);
+	dstring_free(&query_baseclean);
 	dstring_free(&query2);
 
 	/*
@@ -330,11 +266,11 @@
 {
 	long long	xid;
 	PGresult   *res;
-	SlonDString query1;
+	SlonDString query;
 
-	dstring_init(&query1);
-	(void) slon_mkquery(&query1, "select %s.getMinXid();", rtcfg_namespace);
-	res = PQexec(dbconn, dstring_data(&query1));
+	dstring_init(&query);
+	(void) slon_mkquery(&query, "select %s.getMinXid();", rtcfg_namespace);
+	res = PQexec(dbconn, dstring_data(&query));
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		slon_log(SLON_FATAL, "cleanupThread: could not getMinXid()!\n");
@@ -345,7 +281,7 @@
 	xid = strtoll(PQgetvalue(res, 0, 0), NULL, 10);
 	slon_log(SLON_DEBUG1, "cleanupThread: minxid: %d\n", xid);
 	PQclear(res);
-	dstring_free(&query1);
+	dstring_free(&query);
 	return (unsigned long)xid;
 }
 
Index: src/slon/confoptions.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/confoptions.c,v
retrieving revision 1.25
diff -c -u -r1.25 confoptions.c
--- src/slon/confoptions.c	30 Jul 2007 22:34:33 -0000	1.25
+++ src/slon/confoptions.c	20 Dec 2007 19:42:14 -0000
@@ -720,6 +720,17 @@
 		true
 	},
 
+	{
+		{
+			(const char *)"cleanup_deletelogs",
+			gettext_noop("Should the cleanup thread DELETE sl_log_? entries or not"),
+			gettext_noop("Should the cleanup thread DELETE sl_log_? entries or not"),
+			SLON_C_BOOL
+		},
+		&cleanup_deletelogs,
+		false
+	},
+
 	{{0}}
 };
 
@@ -859,6 +870,18 @@
 		"slon"
 	},
 #endif
+	{
+		{
+			(const char *)"cleanup_interval",
+			gettext_noop("A PostgreSQL value compatible with ::interval "
+						 "which indicates what aging interval should be used "
+						 "for deleting old events, and hence for purging sl_log_* tables."),
+			NULL,
+			SLON_C_STRING
+		},
+		&cleanup_interval,
+		"10 minutes"
+	},
 	{{0}}
 };
 
Index: src/slon/confoptions.h
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.35
diff -c -u -r1.35 confoptions.h
--- src/slon/confoptions.h	20 Apr 2007 20:53:18 -0000	1.35
+++ src/slon/confoptions.h	20 Dec 2007 19:42:14 -0000
@@ -13,7 +13,6 @@
 extern char *pid_file;
 extern char *archive_dir;
 
-extern int	vac_frequency;
 extern int	slon_log_level;
 extern int	sync_interval;
 extern int	sync_interval_timeout;
@@ -27,6 +26,16 @@
 extern int	quit_sync_provider;
 extern int	quit_sync_finalsync;
 
+/*
+ * ----------
+ * Global variables in cleanup_thread.c
+ * ----------
+ */
+
+extern int	vac_frequency;
+extern char *cleanup_interval;
+extern bool cleanup_deletelogs;
+
 char	   *Syslog_ident;
 char	   *Syslog_facility;
 int			Use_syslog;
Index: src/slon/slon.h
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.65
diff -c -u -r1.65 slon.h
--- src/slon/slon.h	19 Oct 2007 18:38:35 -0000	1.65
+++ src/slon/slon.h	20 Dec 2007 19:42:14 -0000
@@ -472,6 +472,8 @@
  */
 
 extern int	vac_frequency;
+extern char *cleanup_interval;
+extern bool cleanup_deletelogs;
 
 /* ----------
  * Functions in cleanup_thread.c
Index: tests/test1/README
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/tests/test1/README,v
retrieving revision 1.11
diff -c -u -r1.11 README
--- tests/test1/README	11 Dec 2007 20:37:39 -0000	1.11
+++ tests/test1/README	20 Dec 2007 19:42:14 -0000
@@ -29,3 +29,5 @@
 
 8.  It uses a slon.conf file for node #2 to make sure that the logic
 of the config file parser gets exercised.
+
+9.  It runs the cleanupEvent() function by hand
Index: tests/test1/generate_dml.sh
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/tests/test1/generate_dml.sh,v
retrieving revision 1.15
diff -c -u -r1.15 generate_dml.sh
--- tests/test1/generate_dml.sh	11 Dec 2007 20:37:39 -0000	1.15
+++ tests/test1/generate_dml.sh	20 Dec 2007 19:42:14 -0000
@@ -83,5 +83,14 @@
       warn 3 "generate_sync_event() failed - rc=${rc} see $mktmp/gensync.log* for details"
   fi
   status "completed generate_sync_event() test"
+
+  pint="1 second"
+  dellogs="true"
+  $pgbindir/psql -h $host -p $port -d $db -U $user -c "select \"_${CLUSTER1}\".cleanupEvent('${pint}'::interval,'${dellogs}'::boolean);" 1> $mktmp/gensync.log.1 2> $mktmp/gensync.log
+  rc=$?
+  if [ $rc -ne 0 ]; then
+      warn 3 "cleanupEvent() failed - rc=${rc} see $mktmp/gensync.log* for details"
+  fi
+  status "completed generate_sync_event(${pint},${dellogs}) test"
   status "done"
 }

-- 
let name="cbbrowne" and tld="ca.afilias.info" in String.concat "@" [name;tld];;
<http://dba2.int.libertyrms.com/>
Christopher Browne
(416) 673-4124 (land)


More information about the Slony1-hackers mailing list