CVS User Account cvsuser
Tue Dec 6 20:59:23 PST 2005
Log Message:
-----------
Restructuring of the sync_helper thread in preparation for fixing
the out of memory problems on large rows.

Jan

Modified Files:
--------------
    slony1-engine/src/slon:
        remote_worker.c (r1.101 -> r1.102)
        slon.h (r1.55 -> r1.56)

-------------- next part --------------
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.101
retrieving revision 1.102
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.101 -r1.102
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -4782,18 +4782,25 @@
 	SlonNode   *node = wd->node;
 	PGconn	   *dbconn;
 	WorkerGroupLine *line = NULL;
-	int			line_no;
 	SlonDString query;
-	PGresult   *res;
-	int			ntuples;
-	int			tupno;
 	int			errors;
-	WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
 	int			alloc_lines = 0;
 	struct timeval tv_start;
 	struct timeval tv_now;
 	int			first_fetch;
 
+	WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE];
+	int			data_line_alloc;
+	int			data_line_first;
+	int			data_line_last;
+
+	PGresult   *res;
+	int			ntuples;
+	int			tupno;
+
+	int			line_no;
+	int			line_ncmds;
+
 	dstring_init(&query);
 
 	for (;;)
@@ -4827,6 +4834,10 @@
 		dbconn = provider->conn->dbconn;
 		pthread_mutex_unlock(&(provider->helper_lock));
 
+		slon_log(SLON_DEBUG4,
+				 "remoteHelperThread_%d_%d: got work to do\n",
+				 node->no_id, provider->no_id);
+
 		errors = 0;
 		do
 		{
@@ -4858,6 +4869,7 @@
 
 			gettimeofday(&tv_start, NULL);
 			first_fetch = true;
+			res = NULL;
 
 			if (query_execute(node, dbconn, &query) < 0)
 			{
@@ -4865,74 +4877,137 @@
 				break;
 			}
 
+			slon_mkquery(&query, "fetch %d from LOG; ",
+						 SLON_DATA_FETCH_SIZE * SLON_COMMANDS_PER_LINE);
+			data_line_alloc = 0;
+			data_line_first = 0;
+			data_line_last = 0;
+
+			res = NULL;
+			ntuples = 0;
+			tupno = 0;
+
+			while (!errors)
+			{
 			/*
-			 * Now fetch the log data and forward it via the line pool to the
-			 * main worker who pushes it into the local database.
+				 * Deliver filled line buffers to the worker process.
 			 */
-			alloc_lines = 0;
-			while (errors == 0)
+				if (data_line_last > data_line_first)
 			{
+					slon_log(SLON_DEBUG4,
+							 "remoteHelperThread_%d_%d: deliver %d lines to worker\n",
+							 node->no_id, provider->no_id,
+							 data_line_last - data_line_first);
+
+					pthread_mutex_lock(&(wd->workdata_lock));
+					while (data_line_first < data_line_last)
+					{
+						DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail,
+										data_line[data_line_first]);
+						data_line_first++;
+					}
+					pthread_cond_signal(&(wd->repldata_cond));
+					pthread_mutex_unlock(&(wd->workdata_lock));
+				}
+
 				/*
-				 * Allocate at least some lines - ideally the whole fetch
-				 * size.
+				 * If we cycled through all the allocated line buffers,
+				 * reset the indexes.
 				 */
-				while (alloc_lines == 0 && !errors)
+				if (data_line_first == data_line_alloc)
 				{
-					slon_log(SLON_DEBUG4,
-							 "remoteHelperThread_%d_%d: allocate lines\n",
-							 node->no_id, provider->no_id);
+					data_line_alloc = 0;
+					data_line_first = 0;
+					data_line_last = 0;
+				}
 
 					/*
-					 * Wait until there are lines available in the pool.
+				 * Make sure we are inside memory limits and that we
+				 * have available line buffers.
 					 */
 					pthread_mutex_lock(&(wd->workdata_lock));
-					while (wd->linepool_head == NULL &&
+				if (data_line_alloc == 0 /* || oversize */)
+				{
+					/*
+					 * First make sure that the overall memory usage is
+					 * inside bouds.
+					 */
+					if (0 /* oversize */)
+					{
+						slon_log(SLON_DEBUG4,
+								 "remoteHelperThread_%d_%d: wait for oversize memory to free\n",
+								 node->no_id, provider->no_id);
+
+						while (/* oversize && */
 						   wd->workgroup_status == SLON_WG_BUSY)
 					{
 						pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
 					}
+						if (wd->workgroup_status != SLON_WG_BUSY)
+						{
+							slon_log(SLON_DEBUG4,
+								   "remoteHelperThread_%d_%d: abort operation\n",
+									 node->no_id, provider->no_id);
+							errors++;
+							break;
+						}
+					}
 
 					/*
-					 * If any error occured somewhere in the group, the main
-					 * worker will set the status to ABORT.
+					 * Second make sure that we have at least 1 line
+					 * buffer.
 					 */
+					if (data_line_alloc == 0)
+					{
+						slon_log(SLON_DEBUG4,
+							   "remoteHelperThread_%d_%d: allocate line buffers\n",
+								 node->no_id, provider->no_id);
+						while (data_line_alloc == 0 && !errors)
+						{
+							while (wd->linepool_head == NULL &&
+								   wd->workgroup_status == SLON_WG_BUSY)
+							{
+								pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock));
+							}
 					if (wd->workgroup_status != SLON_WG_BUSY)
 					{
 						slon_log(SLON_DEBUG4,
 							   "remoteHelperThread_%d_%d: abort operation\n",
 								 node->no_id, provider->no_id);
-						pthread_mutex_unlock(&(wd->workdata_lock));
 						errors++;
 						break;
 					}
 
 					/*
-					 * So far so good. Fill our array of lines from the pool.
+							 * While we are at it, we can as well allocate
+							 * up to FETCH_SIZE buffers.
 					 */
-					while (alloc_lines < SLON_DATA_FETCH_SIZE &&
+							while (data_line_alloc < SLON_DATA_FETCH_SIZE &&
 						   wd->linepool_head != NULL)
 					{
-						data_line[alloc_lines] = wd->linepool_head;
+								data_line[data_line_alloc] = wd->linepool_head;
 						DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail,
-									  data_line[alloc_lines]);
-						alloc_lines++;
+											  data_line[data_line_alloc]);
+								data_line_alloc++;
 					}
-					pthread_mutex_unlock(&(wd->workdata_lock));
 				}
-
-				if (errors)
-					break;
-
-				slon_log(SLON_DEBUG4,
-						 "remoteHelperThread_%d_%d: have %d line buffers\n",
-						 node->no_id, provider->no_id, alloc_lines);
+					}
+				}
+				pthread_mutex_unlock(&(wd->workdata_lock));
 
 				/*
-				 * Now that we have allocated some buffer space, try to fetch
-				 * that many rows from the cursor.
+				 * We are within memory limits and have allocated
+				 * line buffers. Make sure that we have log lines
+				 * fetched.
 				 */
-				slon_mkquery(&query, "fetch %d from LOG; ",
-							 alloc_lines * SLON_COMMANDS_PER_LINE);
+				if (tupno >= ntuples)
+				{
+					slon_log(SLON_DEBUG4,
+						   "remoteHelperThread_%d_%d: fetch from cursor\n",
+							 node->no_id, provider->no_id);
+					if (res != NULL)
+						PQclear(res);
+
 				res = PQexec(dbconn, dstring_data(&query));
 				if (PQresultStatus(res) != PGRES_TUPLES_OK)
 				{
@@ -4940,7 +5015,6 @@
 							 node->no_id, provider->no_id,
 							 dstring_data(&query),
 							 PQresultErrorMessage(res));
-					PQclear(res);
 					errors++;
 					break;
 				}
@@ -4955,16 +5029,34 @@
 					first_fetch = false;
 				}
 
-				/*
-				 * Fill the line buffers with queries from the retrieved log
-				 * rows.
-				 */
-				line_no = 0;
 				ntuples = PQntuples(res);
-				slon_log(SLON_DEBUG3,
-						 "remoteHelperThread_%d_%d: got %d log rows\n",
+					tupno = 0;
+
+					slon_log(SLON_DEBUG4,
+						   "remoteHelperThread_%d_%d: fetched %d log rows\n",
 						 node->no_id, provider->no_id, ntuples);
-				for (tupno = 0; tupno < ntuples; tupno++)
+				}
+
+				/*
+				 * If there are no more tuples, we're done
+				 */
+				if (ntuples == 0)
+					break;
+
+				/*
+				 * Now move tuples from the fetch result into the
+				 * line buffers.
+				 */
+				line_no = data_line_last++;
+				line_ncmds = 0;
+
+				line = data_line[line_no];
+				line->code = SLON_WGLC_ACTION;
+				line->provider = provider;
+				dstring_reset(&(line->data));
+				dstring_reset(&(line->log));
+
+				while (tupno < ntuples && line_no < data_line_alloc)
 				{
 					char	   *log_origin = PQgetvalue(res, tupno, 0);
 					char	   *log_xid = PQgetvalue(res, tupno, 1);
@@ -4974,14 +5066,7 @@
 					char	   *log_cmdtype = PQgetvalue(res, tupno, 4);
 					char	   *log_cmddata = PQgetvalue(res, tupno, 5);
 
-					if (tupno % SLON_COMMANDS_PER_LINE == 0)
-					{
-						line = data_line[line_no++];
-						line->code = SLON_WGLC_ACTION;
-						line->provider = provider;
-						dstring_reset(&(line->data));
-						dstring_reset(&(line->log));
-					}
+					tupno++;
 
 					/*
 					 * This can happen if the table belongs to a set that
@@ -4993,6 +5078,10 @@
 						wd->tab_fqname[log_tableid] == NULL)
 						continue;
 
+					/*
+					 * If we are forwarding this set, add the insert
+					 * into sl_log_?
+					 */
 					if (wd->tab_forward[log_tableid])
 					{
 						slon_appendquery(&(line->log),
@@ -5005,6 +5094,10 @@
 										 log_origin, log_xid, log_tableid,
 									log_actionseq, log_cmdtype, log_cmddata);
 					}
+
+					/*
+					 * Add the actual replicating command to the line buffer
+					 */
 					switch (*log_cmdtype)
 					{
 						case 'I':
@@ -5028,62 +5121,63 @@
 											 log_cmddata);
 							break;
 					}
-				}
-				PQclear(res);
+					line_ncmds++;
 
-				/*
-				 * Now put all the line buffers back. Filled ones into the
-				 * repldata, unused ones into the pool.
-				 */
-				pthread_mutex_lock(&(wd->workdata_lock));
-				for (tupno = 0; tupno < alloc_lines; tupno++)
+					if (line_ncmds >= SLON_COMMANDS_PER_LINE)
 				{
-					if (tupno < line_no)
-						DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail,
-										data_line[tupno]);
-					else
-						DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail,
-										data_line[tupno]);
-				}
-				if (line_no > 0)
-					pthread_cond_signal(&(wd->repldata_cond));
-				if (line_no < alloc_lines)
-					pthread_cond_broadcast(&(wd->linepool_cond));
-				pthread_mutex_unlock(&(wd->workdata_lock));
+						if (data_line_last >= data_line_alloc)
+							break;
+						line_no = data_line_last++;
+
+						line = data_line[line_no];
+						line->code = SLON_WGLC_ACTION;
+						line->provider = provider;
+						dstring_reset(&(line->data));
+						dstring_reset(&(line->log));
 
-				slon_log(SLON_DEBUG3,
-					  "remoteHelperThread_%d_%d: %d log buffers delivered\n",
-						 node->no_id, provider->no_id, line_no);
+						line_ncmds = 0;
+					}
+				}
 
-				if (line_no < alloc_lines)
+				/*
+				 * Move one line back if we actually ran out of
+				 * tuples on an exact SLON_COMMANDS_PER_LINE boundary.
+				 */
+				if (line_ncmds == 0)
 				{
-					slon_log(SLON_DEBUG4,
-							 "remoteHelperThread_%d_%d: no more log rows\n",
-							 node->no_id, provider->no_id);
-					alloc_lines = 0;
-					break;
-				}
-				alloc_lines = 0;
+					data_line_last--;
 			}
+			} /* Cursor returned EOF */
 		} while (0);
 
 		/*
 		 * if there are still line buffers allocated, give them back.
 		 */
-		if (alloc_lines > 0)
+		if (data_line_first < data_line_alloc)
 		{
 			slon_log(SLON_DEBUG4,
-					 "remoteHelperThread_%d_%d: return unused line buffers\n",
-					 node->no_id, provider->no_id);
+					 "remoteHelperThread_%d_%d: return %d unused line buffers\n",
+					 node->no_id, provider->no_id,
+					 data_line_alloc - data_line_first);
 			pthread_mutex_lock(&(wd->workdata_lock));
-			while (alloc_lines > 0)
+			while (data_line_first < data_line_alloc)
 			{
-				alloc_lines--;
+				data_line_alloc--;
 				DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail,
-								data_line[alloc_lines]);
+								data_line[data_line_alloc]);
 			}
 			pthread_cond_broadcast(&(wd->linepool_cond));
 			pthread_mutex_unlock(&(wd->workdata_lock));
+
+			data_line_alloc = 0;
+			data_line_first = 0;
+			data_line_last = 0;
+		}
+
+		if(res != NULL)
+		{
+			PQclear(res);
+			res = NULL;
 		}
 
 		/*
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.55
retrieving revision 1.56
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.55 -r1.56
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -37,7 +37,7 @@
 #else
 #define SLON_COMMANDS_PER_LINE		10
 #define SLON_DATA_FETCH_SIZE		10
-#define SLON_WORKLINES_PER_HELPER	(SLON_DATA_FETCH_SIZE * 50)
+#define SLON_WORKLINES_PER_HELPER	(SLON_DATA_FETCH_SIZE * 5)
 #endif
 
 #define SLON_MAX_PATH 1024


More information about the Slony1-commit mailing list