CVS User Account cvsuser
Wed Dec 7 03:52:44 PST 2005
Log Message:
-----------
Fix for the exzessive memory allocation problem when replicating
data with large attributes.

New config options 
    sync_max_rowsize (default 8k)
	sync_max_largemem (default 5M)

Slon will try to keep the memory allocation "per provider" within
500 x sync_max_rowsize ... 500 x sync_max_rowsize + sync_max_largemem.
With the default settings, this means 5-10 MB.

Jan

Modified Files:
--------------
    slony1-engine/share:
        slon.conf-sample (r1.3 -> r1.4)
    slony1-engine/src/slon:
        confoptions.h (r1.26 -> r1.27)
        remote_worker.c (r1.102 -> r1.103)
        slon.h (r1.56 -> r1.57)

-------------- next part --------------
Index: slon.conf-sample
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/share/slon.conf-sample,v
retrieving revision 1.3
retrieving revision 1.4
diff -Lshare/slon.conf-sample -Lshare/slon.conf-sample -u -w -r1.3 -r1.4
--- share/slon.conf-sample
+++ share/slon.conf-sample
@@ -31,6 +31,20 @@
 # Range:  [0,100], default: 6
 #sync_group_maxsize=6
 
+# Size above which an sl_log_? row's log_cmddata is considered large.
+# Up to 500 rows of this size are allowed in memory at once. Rows larger
+# than that count into the sync_max_largemem space allocated and free'd
+# on demand.
+# Range:  [1024,32768], default: 8192
+#sync_max_rowsize=8192
+
+# Maximum amount of memory allowed for large rows. Note that the algorithm
+# will stop fetching rows AFTER this amount is exceeded, not BEFORE. This
+# is done to ensure that a single row exceeding this limit alone does not
+# stall replication.
+# Range:  [1048576,1073741824], default: 5242880
+#sync_max_largemem=5242880
+
 # If this parameter is 1, messages go both to syslog and the standard 
 # output. A value of 2 sends output only to syslog (some messages will 
 # still go to the standard output/error).  The default is 0, which means 
Index: confoptions.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/confoptions.h,v
retrieving revision 1.26
retrieving revision 1.27
diff -Lsrc/slon/confoptions.h -Lsrc/slon/confoptions.h -u -w -r1.26 -r1.27
--- src/slon/confoptions.h
+++ src/slon/confoptions.h
@@ -21,6 +21,8 @@
 extern int	slon_log_level;
 extern int	sync_interval;
 extern int	sync_interval_timeout;
+extern int	sync_max_rowsize;
+extern int	sync_max_largemem;
 
 extern int	sync_group_maxsize;
 extern int	desired_sync_time;
@@ -218,6 +220,30 @@
 		0,
 		2147483647
 	},
+	{
+		{
+			(const char *)"sync_max_rowsize",		/* conf name */
+			gettext_noop("sl_log_? rows larger than that are read separately"),		/* short desc */
+			gettext_noop("sl_log_? rows larger than that are read separately"),		/* long desc */
+			SLON_C_INT			/* config type */
+		},
+		&sync_max_rowsize,			/* var name */
+		8192,						/* default val */
+		1024,						/* min val */
+		32768						/* max val */
+	},
+	{
+		{
+			(const char *)"sync_max_largemem",		/* conf name */
+			gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"),		/* short desc */
+			gettext_noop("How much memory to allow for sl_log_? rows exceeding sync_max_rowsize"),		/* long desc */
+			SLON_C_INT			/* config type */
+		},
+		&sync_max_largemem,			/* var name */
+		5242880,					/* default val */
+		1048576,					/* min val */
+		1073741824					/* max val */
+	},
 	{0}
 };
 
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.102
retrieving revision 1.103
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.102 -r1.103
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -171,6 +171,7 @@
 
 	pthread_mutex_t workdata_lock;
 	WorkGroupStatus workgroup_status;
+	int				workdata_largemem;
 
 	pthread_cond_t repldata_cond;
 	WorkerGroupLine *repldata_head;
@@ -188,6 +189,7 @@
 	ProviderInfo *provider;
 	SlonDString data;
 	SlonDString log;
+	int			line_largemem;
 
 	WorkerGroupLine *prev;
 	WorkerGroupLine *next;
@@ -212,6 +214,8 @@
 pthread_mutex_t node_confirm_lock = PTHREAD_MUTEX_INITIALIZER;
 
 int			sync_group_maxsize;
+int			sync_max_rowsize;
+int			sync_max_largemem;
 
 int			last_sync_group_size;
 int			next_sync_group_size;
@@ -304,6 +308,7 @@
 	pthread_mutex_lock(&(wd->workdata_lock));
 	wd->workgroup_status = SLON_WG_IDLE;
 	wd->node = node;
+	wd->workdata_largemem = 0;
 
 	wd->tab_fqname_size = SLON_MAX_PATH;
 	wd->tab_fqname = (char **)malloc(sizeof(char *) * wd->tab_fqname_size);
@@ -1700,6 +1705,7 @@
 
 						line = (WorkerGroupLine *) malloc(sizeof(WorkerGroupLine));
 						memset(line, 0, sizeof(WorkerGroupLine));
+						line->line_largemem = 0;
 						dstring_init(&(line->data));
 						dstring_init(&(line->log));
 						DLLIST_ADD_TAIL(wd->linepool_head, wd->linepool_tail,
@@ -4533,8 +4539,26 @@
 		for (wgline = lines_head; wgline; wgline = wgnext)
 		{
 			wgnext = wgline->next;
+			if (wgline->line_largemem > 0)
+			{
+				/*
+				 * Really free the lines that contained large rows
+				 */
+				dstring_free(&(wgline->data));
+				dstring_free(&(wgline->log));
+				dstring_init(&(wgline->data));
+				dstring_init(&(wgline->log));
+				wd->workdata_largemem -= wgline->line_largemem;
+				wgline->line_largemem = 0;
+			}
+			else
+			{
+				/*
+				 * just reset (and allow to grow further) the small ones
+				 */
 			dstring_reset(&(wgline->data));
 			dstring_reset(&(wgline->log));
+			}
 			DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, wgline);
 		}
 		if (num_errors == 1)
@@ -4783,6 +4807,7 @@
 	PGconn	   *dbconn;
 	WorkerGroupLine *line = NULL;
 	SlonDString query;
+	SlonDString query2;
 	int			errors;
 	int			alloc_lines = 0;
 	struct timeval tv_start;
@@ -4795,6 +4820,7 @@
 	int			data_line_last;
 
 	PGresult   *res;
+	PGresult   *res2;
 	int			ntuples;
 	int			tupno;
 
@@ -4802,6 +4828,7 @@
 	int			line_ncmds;
 
 	dstring_init(&query);
+	dstring_init(&query2);
 
 	for (;;)
 	{
@@ -4862,8 +4889,13 @@
 			slon_mkquery(&query,
 						 "declare LOG cursor for select "
 						 "    log_origin, log_xid, log_tableid, "
-						 "    log_actionseq, log_cmdtype, log_cmddata "
+						 "    log_actionseq, log_cmdtype, "
+						 "    octet_length(log_cmddata), "
+						 "    case when octet_length(log_cmddata) <= %d "
+						 "        then log_cmddata "
+						 "        else null end "
 						 "from %s.sl_log_1 %s order by log_actionseq; ",
+						 sync_max_rowsize,
 						 rtcfg_namespace,
 						 dstring_data(&(provider->helper_qualification)));
 
@@ -4926,19 +4958,20 @@
 				 * have available line buffers.
 				 */
 				pthread_mutex_lock(&(wd->workdata_lock));
-				if (data_line_alloc == 0 /* || oversize */)
+				if (data_line_alloc == 0 || 
+						wd->workdata_largemem > sync_max_largemem)
 				{
 					/*
 					 * First make sure that the overall memory usage is
 					 * inside bouds.
 					 */
-					if (0 /* oversize */)
+					if (wd->workdata_largemem > sync_max_largemem)
 					{
 						slon_log(SLON_DEBUG4,
 								 "remoteHelperThread_%d_%d: wait for oversize memory to free\n",
 								 node->no_id, provider->no_id);
 
-						while (/* oversize && */
+						while (wd->workdata_largemem > sync_max_largemem &&
 								wd->workgroup_status == SLON_WG_BUSY)
 						{
 							pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
@@ -5064,10 +5097,49 @@
 													 NULL, 10);
 					char	   *log_actionseq = PQgetvalue(res, tupno, 3);
 					char	   *log_cmdtype = PQgetvalue(res, tupno, 4);
-					char	   *log_cmddata = PQgetvalue(res, tupno, 5);
+					int			log_cmdsize = strtol(PQgetvalue(res, tupno, 5),
+													 NULL, 10);
+					char	   *log_cmddata = PQgetvalue(res, tupno, 6);
+					int			largemem = 0;
 
 					tupno++;
 
+					if (log_cmdsize >= sync_max_rowsize)
+					{
+						slon_mkquery(&query2,
+								"select log_cmddata "
+								"from %s.sl_log_1 "
+								"where log_origin = '%s' "
+								"  and log_xid = '%s' "
+								"  and log_actionseq = '%s'",
+								rtcfg_namespace,
+								log_origin, log_xid, log_actionseq);
+						res2 = PQexec(dbconn, dstring_data(&query2));
+						if (PQresultStatus(res2) != PGRES_TUPLES_OK)
+						{
+							slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: \"%s\" %s",
+									 node->no_id, provider->no_id,
+									 dstring_data(&query),
+									 PQresultErrorMessage(res2));
+							PQclear(res2);
+							errors++;
+							break;
+						}
+						if (PQntuples(res2) != 1)
+						{
+							slon_log(SLON_ERROR, "remoteHelperThread_%d_%d: large log_cmddata for actionseq %s not found\n",
+									 node->no_id, provider->no_id,
+									 dstring_data(&query),
+									 log_actionseq);
+							PQclear(res2);
+							errors++;
+							break;
+						}
+
+						log_cmddata = PQgetvalue(res2, 0, 0);
+						largemem = log_cmdsize;
+					}
+
 					/*
 					 * This can happen if the table belongs to a set that
 					 * already has a better sync status than the event we're
@@ -5093,11 +5165,13 @@
 										 rtcfg_namespace,
 										 log_origin, log_xid, log_tableid,
 									log_actionseq, log_cmdtype, log_cmddata);
+						largemem *= 2;
 					}
 
 					/*
 					 * Add the actual replicating command to the line buffer
 					 */
+					line->line_largemem += largemem;
 					switch (*log_cmdtype)
 					{
 						case 'I':
@@ -5137,6 +5211,30 @@
 
 						line_ncmds = 0;
 					}
+
+					/*
+					 * If this was a large log_cmddata entry 
+					 * (> sync_max_rowsize), add this to the memory
+					 * usage of the workgroup and check if we are
+					 * exceeding limits.
+					 */
+					if (largemem > 0)
+					{
+						PQclear(res2);
+						pthread_mutex_lock(&(wd->workdata_lock));
+						wd->workdata_largemem += largemem;
+						if (wd->workdata_largemem >= sync_max_largemem)
+						{
+							/*
+							 * This is it ... we exit the loop here
+							 * and wait for the worker to apply enough
+							 * of the large rows first.
+							 */
+							pthread_mutex_unlock(&(wd->workdata_lock));
+							break;
+						}
+						pthread_mutex_unlock(&(wd->workdata_lock));
+					}
 				}
 
 				/*
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.56
retrieving revision 1.57
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.56 -r1.57
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -509,6 +509,8 @@
  * ----------
  */
 extern int	sync_group_maxsize;
+extern int	sync_max_rowsize;
+extern int	sync_max_largemem;
 
 
 /* ----------


More information about the Slony1-commit mailing list