CVS User Account cvsuser
Wed Oct 25 08:44:50 PDT 2006
Log Message:
-----------
Fixed archive log writing by moving global variables into the per
node structure.

Corrected node ids in the test_1_handover ducttape test and added
handover scripts to tests 2 and 3.

Modified Files:
--------------
    slony1-engine/src/ducttape:
        test_1_handover_to_1 (r1.1 -> r1.2)
        test_1_handover_to_2 (r1.1 -> r1.2)
    slony1-engine/src/slon:
        remote_worker.c (r1.126 -> r1.127)
        slon.h (r1.59 -> r1.60)

Added Files:
-----------
    slony1-engine/src/ducttape:
        test_2_handover_to_1 (r1.2)
        test_2_handover_to_2 (r1.2)
        test_3_handover_to_1 (r1.2)
        test_3_handover_to_2 (r1.2)

-------------- next part --------------
Index: test_1_handover_to_2
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_2,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/ducttape/test_1_handover_to_2 -Lsrc/ducttape/test_1_handover_to_2 -u -w -r1.1 -r1.2
--- src/ducttape/test_1_handover_to_2
+++ src/ducttape/test_1_handover_to_2
@@ -3,8 +3,8 @@
 # **********
 # test_1_handover_to_2
 #
-#	Script to change the origin of set 1 from node 1 to node 2.
-#	This still requires that node 1 is alive. This is called
+#	Script to change the origin of set 1 from node 11 to node 22.
+#	This still requires that node 11 is alive. This is called
 #	handover or move, not failover.
 # **********
 
@@ -20,10 +20,10 @@
 echo "**** Move set 1 to node 2"
 slonik <<_EOF_
 	cluster name = T1;
-	node 1 admin conninfo = 'dbname=$DB1';
-	node 2 admin conninfo = 'dbname=$DB2';
+	node 11 admin conninfo = 'dbname=$DB1';
+	node 22 admin conninfo = 'dbname=$DB2';
 
-	lock set (id = 1, origin = 1);
-	move set (id = 1, old origin = 1, new origin = 2);
+	lock set (id = 1, origin = 11);
+	move set (id = 1, old origin = 11, new origin = 22);
 _EOF_
 
Index: test_1_handover_to_1
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/ducttape/test_1_handover_to_1,v
retrieving revision 1.1
retrieving revision 1.2
diff -Lsrc/ducttape/test_1_handover_to_1 -Lsrc/ducttape/test_1_handover_to_1 -u -w -r1.1 -r1.2
--- src/ducttape/test_1_handover_to_1
+++ src/ducttape/test_1_handover_to_1
@@ -3,8 +3,8 @@
 # **********
 # test_1_handover_to_2
 #
-#	Script to change the origin of set 1 from node 1 to node 2.
-#	This still requires that node 1 is alive. This is called
+#	Script to change the origin of set 1 from node 22 back to node 11.
+#	This still requires that both nodes are alive. This is called
 #	handover or move, not failover.
 # **********
 
@@ -20,10 +20,10 @@
 echo "**** Move set 1 to node 2"
 slonik <<_EOF_
 	cluster name = T1;
-	node 1 admin conninfo = 'dbname=$DB1';
-	node 2 admin conninfo = 'dbname=$DB2';
+	node 11 admin conninfo = 'dbname=$DB1';
+	node 22 admin conninfo = 'dbname=$DB2';
 
-	lock set (id = 1, origin = 2);
-	move set (id = 1, old origin = 2, new origin = 1);
+	lock set (id = 1, origin = 22);
+	move set (id = 1, old origin = 22, new origin = 11);
 _EOF_
 
Index: remote_worker.c
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.126
retrieving revision 1.127
diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.126 -r1.127
--- src/slon/remote_worker.c
+++ src/slon/remote_worker.c
@@ -254,25 +254,21 @@
 		   WorkerGroupData * wd, SlonWorkMsg_event * event);
 static void *sync_helper(void *cdata);
 
-static char archive_name[SLON_MAX_PATH];
-static char archive_tmp[SLON_MAX_PATH];
-static FILE *archive_fp = NULL;
-static int	open_log_archive(int node_id, char *seqbuf);
-static int	close_log_archive();
-static void terminate_log_archive();
-static int	generate_archive_header(int node_id, const char *seqbuf);
-static int	submit_query_to_archive(SlonDString * ds);
-static int	submit_string_to_archive(const char *s);
-#ifndef HAVE_PQPUTCOPYDATA
-static int	submit_raw_data_to_archive(const char *s);
-#endif
-static int logarchive_tracking(const char *namespace, int sub_set, const char *firstseq,
+
+static int	archive_open(SlonNode *node, char *seqbuf);
+static int	archive_close(SlonNode *node);
+static void archive_terminate(SlonNode *node);
+static int	archive_append_ds(SlonNode *node, SlonDString * ds);
+static int	archive_append_str(SlonNode *node, const char *s);
+static int	archive_append_data(SlonNode *node, const char *s, int len);
+static int archive_tracking(SlonNode *node, const char *namespace, 
+					int sub_set, const char *firstseq,
 					const char *seqbuf, const char *timestamp);
-static int	write_void_log(int node_id, char *seqbuf, const char *message);
+static int	archive_void_log(SlonNode *node, char *seqbuf, const char *message);
+
 
 static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery);
 
-#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive();
 
 /* ----------
  * slon_remoteWorkerThread
@@ -645,14 +641,10 @@
 				need_reloadListen = true;
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_NODE");
+					rc = archive_void_log(node, seqbuf, "-- STORE_NODE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
-				}
 
 			}
 			else if (strcmp(event->ev_type, "ENABLE_NODE") == 0)
@@ -671,15 +663,11 @@
 
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- ENABLE_NODE");
+					rc = archive_void_log(node, seqbuf, "-- ENABLE_NODE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "DROP_NODE") == 0)
 			{
 				int			no_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -730,15 +718,11 @@
 				need_reloadListen = true;
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_NODE");
+					rc = archive_void_log(node, seqbuf, "-- DROP_NODE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "STORE_PATH") == 0)
 			{
 				int			pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -757,15 +741,11 @@
 				need_reloadListen = true;
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_PATH");
+					rc = archive_void_log(node, seqbuf, "-- STORE_PATH");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "DROP_PATH") == 0)
 			{
 				int			pa_server = (int)strtol(event->ev_data1, NULL, 10);
@@ -782,16 +762,11 @@
 				need_reloadListen = true;
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_PATH");
+					rc = archive_void_log(node, seqbuf, "-- DROP_PATH");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "STORE_LISTEN") == 0)
 			{
 				int			li_origin = (int)strtol(event->ev_data1, NULL, 10);
@@ -807,15 +782,11 @@
 								 li_origin, li_provider, li_receiver);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_LISTEN");
+					rc = archive_void_log(node, seqbuf, "-- STORE_LISTEN");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "DROP_LISTEN") == 0)
 			{
 				int			li_origin = (int)strtol(event->ev_data1, NULL, 10);
@@ -831,14 +802,10 @@
 								 li_origin, li_provider, li_receiver);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_LISTEN");
+					rc = archive_void_log(node, seqbuf, "-- DROP_LISTEN");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
-				}
 
 			}
 			else if (strcmp(event->ev_type, "STORE_SET") == 0)
@@ -857,15 +824,11 @@
 
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_SET");
+					rc = archive_void_log(node, seqbuf, "-- STORE_SET");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "DROP_SET") == 0)
 			{
 				int			set_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -881,39 +844,15 @@
 				 */
 				if (archive_dir)
 				{
-					rc = open_log_archive(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
 					slon_mkquery(&lsquery,
 								 "delete from %s.sl_setsync_offline "
 								 "  where ssy_setid= %d;",
 								 rtcfg_namespace, set_id);
-					rc = submit_query_to_archive(&lsquery);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
+					if (archive_open(node, seqbuf) < 0 ||
+						archive_append_ds(node, &lsquery) < 0 ||
+						archive_close(node) < 0)
 						slon_retry();
 					}
-					rc = close_log_archive();
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-				}
 			}
 			else if (strcmp(event->ev_type, "MERGE_SET") == 0)
 			{
@@ -933,40 +872,16 @@
 				 */
 				if (archive_dir)
 				{
-					rc = open_log_archive(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
 					rc = slon_mkquery(&lsquery,
 							  "delete from %s.sl_setsync_offline "
 							  "  where ssy_setid= %d;",
 							  rtcfg_namespace, add_id);
-					rc = submit_query_to_archive(&lsquery);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-					rc = close_log_archive();
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
+					if (archive_open(node, seqbuf) < 0 ||
+						archive_append_ds(node, &lsquery) < 0 ||
+						archive_close(node) < 0)
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0)
 			{
 				/*
@@ -976,15 +891,11 @@
 				 */
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE");
+					rc = archive_void_log(node, seqbuf, "-- SET_ADD_TABLE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0)
 			{
 				/*
@@ -994,15 +905,11 @@
 				 */
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE");
+					rc = archive_void_log(node, seqbuf, "-- SET_ADD_SEQUENCE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0)
 			{
 				int			tab_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1012,15 +919,11 @@
 								 tab_id);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE");
+					rc = archive_void_log(node, seqbuf, "-- SET_DROP_TABLE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0)
 			{
 				int			seq_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1030,15 +933,11 @@
 								 seq_id);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE");
+					rc = archive_void_log(node, seqbuf, "-- SET_DROP_SEQUENCE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0)
 			{
 				int			tab_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1049,15 +948,11 @@
 								 tab_id, new_set_id);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE");
+					rc = archive_void_log(node, seqbuf, "-- SET_MOVE_TABLE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0)
 			{
 				int			seq_id = (int)strtol(event->ev_data1, NULL, 10);
@@ -1068,15 +963,11 @@
 								 seq_id, new_set_id);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE");
+					rc = archive_void_log(node, seqbuf, "-- SET_MOVE_SEQUENCE");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0)
 			{
 				int			trig_tabid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1088,15 +979,11 @@
 								 trig_tabid, trig_tgname);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER");
+					rc = archive_void_log(node, seqbuf, "-- STORE_TRIGGER");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0)
 			{
 				int			trig_tabid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1108,15 +995,11 @@
 								 trig_tabid, trig_tgname);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER");
+					rc = archive_void_log(node, seqbuf, "-- DROP_TRIGGER");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "ACCEPT_SET") == 0)
 			{
 				int			set_id,
@@ -1289,14 +1172,10 @@
 
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- FAILOVER_SET");
+					rc = archive_void_log(node, seqbuf, "-- FAILOVER_SET");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
-				}
 				need_reloadListen = true;
 			}
 			else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0)
@@ -1315,14 +1194,10 @@
 						   sub_set, sub_provider, sub_receiver, sub_forward);
 				if (archive_dir)
 				{
-					rc = write_void_log(rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET");
+					rc = archive_void_log(node, seqbuf, "-- SUBSCRIBE_SET");
 					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
 						slon_retry();
 					}
-				}
 				need_reloadListen = true;
 			}
 			else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)
@@ -1453,39 +1328,15 @@
 				need_reloadListen = true;
 				if (archive_dir)
 				{
-					rc = open_log_archive(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-					rc = generate_archive_header(rtcfg_nodeid, seqbuf);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
 					slon_mkquery(&lsquery,
 								 "delete from %s.sl_setsync_offline "
 								 "  where ssy_setid= %d;",
 								 rtcfg_namespace, sub_set);
-					rc = submit_query_to_archive(&lsquery);
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
+					if (archive_open(node, seqbuf) < 0 ||
+						archive_append_ds(node, &lsquery) < 0 ||
+						archive_close(node) < 0)
 						slon_retry();
 					}
-					rc = close_log_archive();
-					if (rc < 0)
-					{
-						slon_log(SLON_ERROR, "remoteWorkerThread_%d: log archive failed %s - %s",
-								 node->no_id, archive_tmp, strerror(errno));
-						slon_retry();
-					}
-				}
 			}
 			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
 			{
@@ -1572,50 +1423,19 @@
 					if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
 					{
 
-						rc = open_log_archive(node->no_id, seqbuf);
-						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-								   "Could not open DDL archive file %s - %s",
-								  node->no_id, archive_tmp, strerror(errno));
+						if (archive_open(node, seqbuf) < 0)
 							slon_retry();
-						}
-						generate_archive_header(node->no_id, seqbuf);
-						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-							 "Could not generate DDL archive header %s - %s",
-								  node->no_id, archive_tmp, strerror(errno));
+						if (archive_tracking(node, rtcfg_namespace, 
+								ddl_setid, seqbuf, seqbuf, 
+								event->ev_timestamp_c) < 0)
 							slon_retry();
-						}
-						rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c);
-						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-							"Could not generate DDL archive tracker %s - %s",
-								  node->no_id, archive_tmp, strerror(errno));
-							slon_retry();
-						}
-						rc = submit_string_to_archive(ddl_script);
-						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-									 "Could not submit DDL Script %s - %s",
-								  node->no_id, archive_tmp, strerror(errno));
+						if (archive_append_str(node, ddl_script) < 0)
 							slon_retry();
-						}
-
-						rc = close_log_archive();
-						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-									 "Could not close DDL Script %s - %s",
-								  node->no_id, archive_tmp, strerror(errno));
+						if (archive_close(node) < 0)
 							slon_retry();
 						}
 					}
 				}
-			}
 			else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
 			{
 				int			reset_config_setid = (int)strtol(event->ev_data1, NULL, 10);
@@ -1626,14 +1446,16 @@
 								 rtcfg_namespace,
 							   reset_config_setid, reset_configonly_on_node);
 				if (archive_dir)
-					write_void_log(rtcfg_nodeid, seqbuf, "-- RESET_CONFIG");
+					if (archive_void_log(node, seqbuf, "-- RESET_CONFIG") < 0)
+						slon_retry();
 			}
 			else
 			{
 				printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n",
 					   node->no_id, event->ev_origin, event->ev_seqno, event->ev_type);
 				if (archive_dir)
-					write_void_log(rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!");
+					if (archive_void_log(node, seqbuf, "-- UNHANDLED EVENT!!!") < 0)
+						slon_retry();
 			}
 
 			/*
@@ -2681,34 +2503,16 @@
 	 */
 	if (archive_dir)
 	{
-		rc = open_log_archive(rtcfg_nodeid, seqbuf);
+		rc = archive_open(node, seqbuf);
 		if (rc < 0)
 		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-					 "Could not open COPY SET archive file %s - %s",
-					 node->no_id, archive_tmp, strerror(errno));
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
 			dstring_free(&query2);
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
-			return -1;
-		}
-		rc = generate_archive_header(rtcfg_nodeid, seqbuf);
-		if (rc < 0)
-		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-					 "Could not generate COPY SET archive header %s - %s",
-					 node->no_id, archive_tmp, strerror(errno));
-			slon_disconnectdb(pro_conn);
-			dstring_free(&query1);
-			dstring_free(&query2);
-			dstring_free(&query3);
-			dstring_free(&lsquery);
-			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -2727,7 +2531,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 
@@ -2761,7 +2565,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (*(PQgetvalue(res1, 0, 0)) == 't')
@@ -2776,7 +2580,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		PQclear(res1);
@@ -2794,7 +2598,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -2842,7 +2646,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -2880,7 +2684,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2908,7 +2712,7 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 			rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -2938,7 +2742,7 @@
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
 				dstring_free(&query3);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 		}
@@ -2959,7 +2763,7 @@
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
 			dstring_free(&query3);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -3000,7 +2804,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -3027,7 +2831,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -3066,7 +2870,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -3108,7 +2912,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -3136,7 +2940,7 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 			rc = *PQgetvalue(res2, 0, 0) == 't';
@@ -3161,7 +2965,7 @@
 					dstring_free(&query3);
 					dstring_free(&lsquery);
 					dstring_free(&indexregenquery);
-					terminate_log_archive();
+					archive_terminate(node);
 					return -1;
 				}
 				slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: "
@@ -3201,7 +3005,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 
@@ -3226,7 +3030,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		ntuples2 = PQntuples(res2);
@@ -3245,7 +3049,7 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 		}
@@ -3277,7 +3081,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 
@@ -3299,7 +3103,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 
@@ -3332,7 +3136,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (archive_dir)
@@ -3340,20 +3144,16 @@
 			slon_mkquery(&query1,
 			 "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname,
 						 nodeon73 ? "" : PQgetvalue(res3, 0, 0));
-			rc = submit_query_to_archive(&query1);
+			rc = archive_append_ds(node, &query1);
 			if (rc < 0)
 			{
-				slon_log(SLON_ERROR, "remoteWorkerThread_d: "
-						 "Could not generate copy_set request for %s - %s",
-						 node->no_id, tab_fqname, strerror(errno));
-
 				slon_disconnectdb(pro_conn);
 				dstring_free(&query1);
 				dstring_free(&query2);
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 		}
@@ -3386,7 +3186,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 
@@ -3418,17 +3218,14 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 			if (archive_dir)
 			{
-				rc = fwrite(copydata, 1, len, archive_fp);
-				if (rc != len)
+				rc = archive_append_data(node, copydata, len);
+				if (rc < 0)
 				{
-					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-							 "PQputCopyData() - log shipping - %s",
-							 node->no_id, strerror(errno));
 #ifdef SLON_MEMDEBUG
 					memset(copydata, 88, len);
 #endif
@@ -3443,7 +3240,7 @@
 					dstring_free(&query3);
 					dstring_free(&lsquery);
 					dstring_free(&indexregenquery);
-					terminate_log_archive();
+					archive_terminate(node);
 					return -1;
 
 				}
@@ -3468,7 +3265,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 
@@ -3493,7 +3290,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		PQclear(res3);
@@ -3514,7 +3311,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		res2 = PQgetResult(loc_dbconn);
@@ -3532,12 +3329,12 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (archive_dir)
 		{
-			rc = submit_string_to_archive("\\.");
+			rc = archive_append_str(node, "\\.");
 			if (rc < 0) {
 			  PQclear(res2);
 			  PQclear(res1);
@@ -3547,7 +3344,7 @@
 			  dstring_free(&query3);
 			  dstring_free(&lsquery);
 			  dstring_free(&indexregenquery);
-			  terminate_log_archive();
+			  archive_terminate(node);
 			  return -1;
 			}
 		}
@@ -3574,7 +3371,7 @@
 				  PQputline(loc_dbconn, copybuf);
 				  PQputline(loc_dbconn, "\n");
 				  if (archive_dir) {
-				    rc = submit_string_to_archive(copybuf);
+				    rc = archive_append_str(node, copybuf);
 				    if (rc < 0) {
 				      PQclear(res2);
 				      PQclear(res1);
@@ -3584,7 +3381,7 @@
 				      dstring_free(&query3);
 				      dstring_free(&lsquery);
 				      dstring_free(&indexregenquery);
-				      terminate_log_archive();
+				      archive_terminate(node);
 				      return -1;
 				    }
 				  }
@@ -3592,7 +3389,7 @@
 				case 1:
 				  PQputline(loc_dbconn, copybuf);
 				  if (archive_dir) {
-				    rc = submit_raw_data_to_archive(copybuf);
+				    rc = archive_append_data(node, copybuf, strlen(copybuf));
 				    if (rc < 0) {
 				      PQclear(res2);
 				      PQclear(res1);
@@ -3602,7 +3399,7 @@
 				      dstring_free(&query3);
 				      dstring_free(&lsquery);
 				      dstring_free(&indexregenquery);
-				      terminate_log_archive();
+				      archive_terminate(node);
 				      return -1;
 				    }
 
@@ -3615,7 +3412,7 @@
 		PQputline(loc_dbconn, "\\.\n");
 		if (archive_dir)
 		{
-			rc = submit_string_to_archive("\\.");
+			rc = archive_append_str(node, "\\.");
 			if (rc < 0) {
 			  PQclear(res2);
 			  PQclear(res1);
@@ -3625,7 +3422,7 @@
 			  dstring_free(&query3);
 			  dstring_free(&lsquery);
 			  dstring_free(&indexregenquery);
-			  terminate_log_archive();
+			  archive_terminate(node);
 			  return -1;
 			}
 		}
@@ -3648,7 +3445,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		PQclear(res3);
@@ -3674,7 +3471,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 #endif   /* HAVE_PQPUTCOPYDATA */
@@ -3700,12 +3497,12 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (archive_dir)
 		{
-		  rc = submit_query_to_archive(&query1);
+		  rc = archive_append_ds(node, &query1);
 		  if (rc < 0) {
 		    return -1;
 		  }
@@ -3755,7 +3552,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	ntuples1 = PQntuples(res1);
@@ -3782,7 +3579,7 @@
 
 			if (archive_dir)
 			{
-				rc = submit_query_to_archive(&query1);
+				rc = archive_append_ds(node, &query1);
 				if (rc < 0) {
 				  PQclear(res1);
 				  slon_disconnectdb(pro_conn);
@@ -3791,7 +3588,7 @@
 				  dstring_free(&query3);
 				  dstring_free(&lsquery);
 				  dstring_free(&indexregenquery);
-				  terminate_log_archive();
+				  archive_terminate(node);
 				  return -1;
 				}
 			}
@@ -3814,7 +3611,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -3862,7 +3659,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (PQntuples(res1) != 1)
@@ -3877,7 +3674,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (PQgetisnull(res1, 0, 0))
@@ -3928,7 +3725,7 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 			if (PQntuples(res1) != 1)
@@ -3943,7 +3740,7 @@
 				dstring_free(&query3);
 				dstring_free(&lsquery);
 				dstring_free(&indexregenquery);
-				terminate_log_archive();
+				archive_terminate(node);
 				return -1;
 			}
 			ssy_seqno = PQgetvalue(res1, 0, 0);
@@ -3990,7 +3787,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		ntuples1 = PQntuples(res2);
@@ -4035,7 +3832,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		if (PQntuples(res1) != 1)
@@ -4050,7 +3847,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 		dstring_init(&ssy_action_list);
@@ -4083,7 +3880,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	if (archive_dir)
@@ -4092,7 +3889,7 @@
 			     "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) "
 			     "values ('%d', '%d');",
 			     rtcfg_namespace, set_id, ssy_seqno);
-		rc = submit_query_to_archive(&lsquery);
+		rc = archive_append_ds(node, &lsquery);
 		if (rc < 0)
 		{
 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
@@ -4104,7 +3901,7 @@
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -4116,19 +3913,16 @@
 
 	if (archive_dir)
 	{
-		rc = close_log_archive();
+		rc = archive_close(node);
 		if (rc < 0)
 		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-					 " could not close archive log %s - %s",
-					 node->no_id, archive_tmp, strerror(errno));
 			slon_disconnectdb(pro_conn);
 			dstring_free(&query1);
 			dstring_free(&query2);
 			dstring_free(&query3);
 			dstring_free(&lsquery);
 			dstring_free(&indexregenquery);
-			terminate_log_archive();
+			archive_terminate(node);
 			return -1;
 		}
 	}
@@ -4147,7 +3941,7 @@
 		dstring_free(&query3);
 		dstring_free(&lsquery);
 		dstring_free(&indexregenquery);
-		terminate_log_archive();
+		archive_terminate(node);
 		return -1;
 	}
 	slon_disconnectdb(pro_conn);
@@ -4217,21 +4011,10 @@
 	 */
 	if (archive_dir)
 	{
-		rc = open_log_archive(node->no_id, seqbuf);
-		if (rc == -1)
-		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-					 "Cannot open archive file %s - %s\n",
-					 node->no_id, archive_tmp, strerror(errno));
-			dstring_free(&query);
-			return 60;
-		}
-		rc = generate_archive_header(node->no_id, seqbuf);
+		rc = archive_open(node, seqbuf);
 		if (rc < 0)
 		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-					 "Cannot write to archive file %s - %s\n",
-					 node->no_id, archive_tmp, strerror(errno));
+			dstring_free(&query);
 			return 60;
 		}
 	}
@@ -4257,7 +4040,8 @@
 				slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
 						 "No pa_conninfo for data provider %d\n",
 						 node->no_id, provider->no_id);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return 10;
 			}
 			sprintf(conn_symname, "subscriber_%d_provider_%d",
@@ -4270,7 +4054,8 @@
 						 "cannot connect to data provider %d on '%s'\n",
 						 node->no_id, provider->no_id,
 						 provider->pa_conninfo);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return provider->pa_connretry;
 			}
 
@@ -4282,7 +4067,8 @@
 						 rtcfg_namespace, rtcfg_nodeid);
 			if (query_execute(node, provider->conn->dbconn, &query) < 0)
 			{
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				slon_disconnectdb(provider->conn);
 				provider->conn = NULL;
 				return provider->pa_connretry;
@@ -4318,7 +4104,8 @@
 						 "for ev_origin %d\n",
 						 node->no_id, provider->no_id,
 						 event->ev_origin);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return 10;
 			}
 			if (prov_seqno < event->ev_seqno)
@@ -4328,7 +4115,8 @@
 						 "ev_seqno " INT64_FORMAT " for ev_origin %d\n",
 						 node->no_id, provider->no_id,
 						 prov_seqno, event->ev_origin);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return 10;
 			}
 			slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -4395,7 +4183,8 @@
 					 PQresultErrorMessage(res1));
 			PQclear(res1);
 			dstring_free(&new_qual);
-			TERMINATE_QUERY_AND_ARCHIVE;
+			dstring_free(&query);
+			archive_terminate(node);
 			return 60;
 		}
 
@@ -4444,7 +4233,8 @@
 				PQclear(res2);
 				PQclear(res1);
 				dstring_free(&new_qual);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return 60;
 			}
 			ntuples2 = PQntuples(res2);
@@ -4578,16 +4368,11 @@
 				slon_log(SLON_DEBUG2, "writing archive log...\n");
 				fflush(stderr);
 				fflush(stdout);
-				rc = logarchive_tracking(rtcfg_namespace, sub_set,
+				rc = archive_tracking(node, rtcfg_namespace, sub_set,
 										 PQgetvalue(res1, tupno1, 1), seqbuf,
 										 event->ev_timestamp_c);
 				if (rc < 0)
-				{
-					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-							 "Cannot write to archive file %s - %s\n",
-							 node->no_id, archive_tmp, strerror(errno));
-					return 60;
-				}
+					slon_retry();
 			}
 		}
 		PQclear(res1);
@@ -4620,14 +4405,9 @@
 		dstring_free(&query);
 		if (archive_dir)
 		{
-			rc = close_log_archive();
+			rc = archive_close(node);
 			if (rc < 0)
-			{
-				slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-						 "Could not close out archive file %s - %s\n",
-						 node->no_id, archive_tmp, strerror(errno));
-				return 60;
-			}
+				slon_retry();
 		}
 		return 0;
 	}
@@ -4644,7 +4424,8 @@
 				 node->no_id, dstring_data(&query),
 				 PQresultErrorMessage(res1));
 		PQclear(res1);
-		TERMINATE_QUERY_AND_ARCHIVE;
+		dstring_free(&query);
+		archive_terminate(node);
 		slon_disconnectdb(provider->conn);
 		provider->conn = NULL;
 		return 20;
@@ -4655,7 +4436,8 @@
 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: cannot determine current log status\n",
 				 node->no_id);
 		PQclear(res1);
-		TERMINATE_QUERY_AND_ARCHIVE;
+		dstring_free(&query);
+		archive_terminate(node);
 		slon_disconnectdb(provider->conn);
 		provider->conn = NULL;
 		return 20;
@@ -4778,19 +4560,9 @@
 					 */
 					if (archive_dir)
 					{
-						rc = submit_string_to_archive(dstring_data(&(wgline->data)));
-
-						/*
-						 * rc = fprintf(archive_fp, "%s",
-						 * dstring_data(&(wgline->data)));
-						 */
+						rc = archive_append_ds(node, &(wgline->data));
 						if (rc < 0)
-						{
-							slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-									 "Cannot write to archive file %s - %s\n",
-								  node->no_id, archive_tmp, strerror(errno));
-							return 60;
-						}
+							slon_retry();
 					}
 					break;
 
@@ -4889,7 +4661,8 @@
 	 */
 	if (num_errors != 0)
 	{
-		TERMINATE_QUERY_AND_ARCHIVE;
+		dstring_free(&query);
+		archive_terminate(node);
 		slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
 				 node->no_id);
 		return 10;
@@ -4926,7 +4699,8 @@
 					 node->no_id, dstring_data(&query),
 					 PQresultErrorMessage(res1));
 			PQclear(res1);
-			TERMINATE_QUERY_AND_ARCHIVE;
+			dstring_free(&query);
+			archive_terminate(node);
 			slon_disconnectdb(provider->conn);
 			provider->conn = NULL;
 			return 20;
@@ -4944,7 +4718,8 @@
 			if (query_execute(node, local_dbconn, &query) < 0)
 			{
 				PQclear(res1);
-				TERMINATE_QUERY_AND_ARCHIVE;
+				dstring_free(&query);
+				archive_terminate(node);
 				return 60;
 			}
 
@@ -4957,14 +4732,9 @@
 							 "select %s.sequenceSetValue_offline(%s,'%s');\n",
 							 rtcfg_namespace,
 							 seql_seqid, seql_last_value);
-				rc = submit_query_to_archive(&lsquery);
+				rc = archive_append_ds(node, &lsquery);
 				if (rc < 0)
-				{
-					slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-							 "Cannot write to archive file %s - %s\n",
-							 node->no_id, archive_tmp, strerror(errno));
-					return 60;
-				}
+					slon_retry();
 			}
 		}
 		PQclear(res1);
@@ -5006,7 +4776,8 @@
 					 node->no_id, dstring_data(&query),
 					 PQresultErrorMessage(res1));
 			PQclear(res1);
-			TERMINATE_QUERY_AND_ARCHIVE;
+			dstring_free(&query);
+			archive_terminate(node);
 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n",
 					 node->no_id);
 			return 10;
@@ -5032,7 +4803,8 @@
 				 node->no_id, dstring_data(&query),
 				 PQresultErrorMessage(res1));
 		PQclear(res1);
-		TERMINATE_QUERY_AND_ARCHIVE;
+		dstring_free(&query);
+		archive_terminate(node);
 		return 60;
 	}
 	if (PQntuples(res1) > 0)
@@ -5046,7 +4818,8 @@
 		if (query_execute(node, local_dbconn, &query) < 0)
 		{
 			PQclear(res1);
-			TERMINATE_QUERY_AND_ARCHIVE;
+			dstring_free(&query);
+			archive_terminate(node);
 			return 60;
 		}
 		slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: "
@@ -5061,18 +4834,13 @@
 	 */
 	if (archive_dir)
 	{
-		rc = close_log_archive();
+		rc = archive_close(node);
 		if (rc < 0)
-		{
-			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
-				 "Could not close out archive file %s - %s\n",
-				 node->no_id, archive_tmp, strerror(errno));
-			return 60;
+			slon_retry();
 			
-		}
 		if (command_on_logarchive) {
 			char command[512];
-			sprintf(command, "%s %s", command_on_logarchive, archive_name);
+			sprintf(command, "%s %s", command_on_logarchive, node->archive_name);
 			slon_log(SLON_INFO, "remoteWorkerThread_%d: Run Archive Command %s\n",
 				 node->no_id, command);
 			system(command);
@@ -5760,7 +5528,7 @@
 /* ----------
  * Functions for processing log archives...
  *
- * - First, you open the log archive using open_log_archive()
+ * - First, you open the log archive using archive_open()
  *
  * - Second, you generate the header using generate_archive_header()
  *
@@ -5783,47 +5551,76 @@
 
 
 /* ----------
- * open_log_archive
+ * archive_open
  *
  * Stores the archive name in archive_name (as .sql name) and 
  * archive_tmp (.tmp file)
  * ----------
  */
-int
-open_log_archive(int node_id, char *seqbuf)
+static int
+archive_open(SlonNode *node, char *seqbuf)
 {
 	int			i;
+	int		rc;
 
-	sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id);
+	if (node->archive_name == NULL)
+	{
+		node->archive_name = malloc(SLON_MAX_PATH);
+		node->archive_temp = malloc(SLON_MAX_PATH);
+		if (node->archive_name == NULL || node->archive_temp == NULL)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+					"Out of memory in archive_open()\n",
+					node->no_id);
+			return -1;
+		}
+	}
+
+	sprintf(node->archive_name, "%s/slony1_log_%d_", archive_dir, 
+			node->no_id);
 	for (i = strlen(seqbuf); i < 20; i++)
-		strcat(archive_name, "0");
-	strcat(archive_name, seqbuf);
-	strcat(archive_name, ".sql");
-	strcpy(archive_tmp, archive_name);
-	strcat(archive_tmp, ".tmp");
-	archive_fp = fopen(archive_tmp, "w");
-	if (archive_fp == NULL)
+		strcat(node->archive_name, "0");
+	strcat(node->archive_name, seqbuf);
+	strcat(node->archive_name, ".sql");
+	strcpy(node->archive_temp, node->archive_name);
+	strcat(node->archive_temp, ".tmp");
+	node->archive_fp = fopen(node->archive_temp, "w");
+	if (node->archive_fp == NULL)
 	{
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot open archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
 		return -1;
 	}
-	else
+
+	rc = fprintf(node->archive_fp,
+				   "-- Slony-I log shipping archive\n"
+				   "-- Node %d, Event %s\n"
+				   "start transaction;\n",
+				   node->no_id, seqbuf);
+	if (rc < 0)
 	{
-		return 0;
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot write to archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
+		return -1;
 	}
+
+	return 0;
 }
 
 /* ----------
- * close_log_archive
+ * archive_close
  * ----------
  */
-int
-close_log_archive()
+static int
+archive_close(SlonNode *node)
 {
 	int			rc = 0;
 
 	if (archive_dir)
 	{
-		rc = fprintf(archive_fp,
+		rc = fprintf(node->archive_fp,
 			"\n------------------------------------------------------------------\n"
 			"-- End Of Archive Log\n"
 			"------------------------------------------------------------------\n"
@@ -5831,116 +5628,166 @@
 			"vacuum analyze %s.sl_setsync_offline;\n",
 			rtcfg_namespace);
 		if (rc < 0)
+		{
+			archive_terminate(node);
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+					"Cannot write to archive file %s - %s\n",
+					node->no_id, node->archive_temp, strerror(errno));
 			return -1;
-		rc = fclose(archive_fp);
-		archive_fp = NULL;
+		}
+
+		rc = fclose(node->archive_fp);
+		node->archive_fp = NULL;
 		if (rc != 0)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+					"Cannot close archive file %s - %s\n",
+					node->no_id, node->archive_temp, strerror(errno));
 			return -1;
-		rc = rename(archive_tmp, archive_name);
 	}
-	return rc;
+
+		rc = rename(node->archive_temp, node->archive_name);
+		if (rc != 0)
+		{
+			slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+					"Cannot rename archive file %s to %s - %s\n",
+					node->no_id, node->archive_temp, node->archive_name, 
+					strerror(errno));
+			return -1;
+		}
+	}
+
+	return 0;
 }
 
 /* ----------
- * logarchive_tracking
+ * archive_terminate
  * ----------
  */
-int
-logarchive_tracking(const char *namespace, int sub_set, const char *firstseq,
-					const char *seqbuf, const char *timestamp)
+static void
+archive_terminate(SlonNode *node)
+{
+	if (node->archive_fp != NULL)
 {
-	return fprintf(archive_fp, 
+		fclose(node->archive_fp);
+		node->archive_fp = NULL;
+	}
+}
+
+/* ----------
+ * archive_tracking
+ * ----------
+ */
+static int
+archive_tracking(SlonNode *node, const char *namespace, int sub_set, 
+					const char *firstseq, const char *seqbuf, 
+					const char *timestamp)
+{
+	int		rc;
+
+	rc = fprintf(node->archive_fp, 
 		"\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n"
 		"-- end of log archiving header\n"
 		"------------------------------------------------------------------\n"
 		"-- start of Slony-I data\n"
 		"------------------------------------------------------------------\n",
 		namespace, sub_set, firstseq, seqbuf, timestamp);
+	if (rc < 0)
+	{
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot write to archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
+		return -1;
 }
 
-/* ----------
- * submit_query_to_archive
- * ----------
- */
-int
-submit_query_to_archive(SlonDString * ds)
-{
-	return fprintf(archive_fp, "%s\n", ds->data);
+	return 0;
 }
 
 /* ----------
- * submit_string_to_archive
+ * archive_append_ds
  * ----------
  */
-int
-submit_string_to_archive(const char *s)
+static int
+archive_append_ds(SlonNode *node, SlonDString * ds)
 {
-	return fprintf(archive_fp, "%s\n", s);
-}
+	int		rc;
 
-#ifndef HAVE_PQPUTCOPYDATA
-/* ----------
- * submit_raw_data_to_archive
- *
- * Raw form used for COPY where we don't want any extra cr/lf output
- * ----------
- */
-int
-submit_raw_data_to_archive(const char *s)
+	rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds));
+	if (rc < 0)
 {
-	return fprintf(archive_fp, "%s", s);
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot write to archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
+		return -1;
+	}
+
+	return 0;
 }
-#endif
 
 /* ----------
- * terminate_log_archive
+ * archive_append_str
  * ----------
  */
-void
-terminate_log_archive()
+static int
+archive_append_str(SlonNode *node, const char *s)
 {
-	if (archive_fp)
+	int		rc;
+
+	rc = fprintf(node->archive_fp, "%s\n", s);
+	if (rc < 0)
 	{
-		fclose(archive_fp);
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot write to archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
+		return -1;
 	}
+
+	return 0;
 }
 
 /* ----------
- * generate_archive_header
+ * archive_append_data
+ *
+ * Raw form used for COPY where we don't want any extra cr/lf output
  * ----------
  */
-int
-generate_archive_header(int node_id, const char *seqbuf)
+static int
+archive_append_data(SlonNode *node, const char *s, int len)
 {
-	return fprintf(archive_fp,
-				   "-- Slony-I log shipping archive\n"
-				   "-- Node %d, Event %s\n"
-				   "start transaction;\n",
-				   node_id, seqbuf);
+	int		rc;
+
+	rc = fwrite(s, len, 1, node->archive_fp);
+	if (rc != 1)
+	{
+		slon_log(SLON_ERROR, "remoteWorkerThread_%d: "
+				"Cannot write to archive file %s - %s\n",
+				node->no_id, node->archive_temp, strerror(errno));
+		return -1;
+	}
+
+	return 0;
 }
 
 /* ----------
- * write_void_log
+ * archive_void_log
  *
  * writes out a "void" log consisting of the message which must either 
  * be a valid SQL query or a SQL comment.
  * ----------
  */
-int
-write_void_log(int node_id, char *seqbuf, const char *message)
+static int
+archive_void_log(SlonNode *node, char *seqbuf, const char *message)
 {
 	int			rc;
 
-	rc = open_log_archive(node_id, seqbuf);
-	if (rc < 0)
-		return rc;
-	rc = generate_archive_header(node_id, seqbuf);
+	rc = archive_open(node, seqbuf);
 	if (rc < 0)
 		return rc;
-	rc = submit_string_to_archive(message);
+	rc = archive_append_str(node, message);
 	if (rc < 0)
 		return rc;
-	rc = close_log_archive();
+	rc = archive_close(node);
+
 	return rc;
 }
 
Index: slon.h
===================================================================
RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v
retrieving revision 1.59
retrieving revision 1.60
diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.59 -r1.60
--- src/slon/slon.h
+++ src/slon/slon.h
@@ -106,6 +106,10 @@
 	SlonWorkMsg *message_head;
 	SlonWorkMsg *message_tail;
 
+	char	   *archive_name;
+	char	   *archive_temp;
+	FILE	   *archive_fp;
+
 	SlonNode   *prev;
 	SlonNode   *next;
 };



More information about the Slony1-commit mailing list