Chris Browne cbbrowne at lists.slony.info
Tue Oct 23 09:55:51 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv12039

Modified Files:
	remote_worker.c 
Log Message:
- log shipping check - do a Restart Node if the request to update
  the shared archive file number fails due to serialization

- reshaping of DDL handling code to get the changes recently made
  to fix v1.2


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.157
retrieving revision 1.158
diff -C2 -d -r1.157 -r1.158
*** remote_worker.c	19 Oct 2007 18:38:35 -0000	1.157
--- remote_worker.c	23 Oct 2007 16:55:49 -0000	1.158
***************
*** 1250,1341 ****
  				int			ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
  				int num_statements = -1, stmtno;
  
  				PGresult *res;
  				ExecStatusType rstat;
  
  
! 				slon_appendquery(&query1,
! 						 "set session_replication_role to local; "
! 						 "select %s.ddlScript_prepare_int(%d, %d); ",
! 						 rtcfg_namespace,
! 						 ddl_setid, ddl_only_on_node);
  
! 				if (query_execute(node, local_dbconn, &query1) < 0) {
  						slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %d\n",
  							 node->no_id, ddl_setid, ddl_only_on_node);
  						slon_retry();
! 				}
  
! 				num_statements = scan_for_statements (ddl_script);
! 				slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
! 					 node->no_id, num_statements);
! 				if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
! 					slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", 
  						 node->no_id, num_statements);
! 					slon_retry();
! 				}
! 				
! 				for (stmtno=0; stmtno < num_statements;  stmtno++) {
! 					int startpos, endpos;
! 					char *dest;
! 					if (stmtno == 0)
! 						startpos = 0;
! 					else
! 						startpos = STMTS[stmtno-1];
! 
! 					endpos = STMTS[stmtno];
! 					dest = (char *) malloc (endpos - startpos + 1);
! 					if (dest == 0) {
! 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", 
! 							 node->no_id, endpos - startpos + 1);
  						slon_retry();
  					}
! 					strncpy(dest, ddl_script + startpos, endpos-startpos);
! 					dest[STMTS[stmtno]-startpos] = 0;
! 					(void) slon_mkquery(&query1, "%s", dest);
! 					slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", 
! 						 node->no_id, stmtno, dest);						 
! 					free(dest);
  
! 					res = PQexec(local_dbconn, dstring_data(&query1));
  
! 					if (PQresultStatus(res) != PGRES_COMMAND_OK && 
! 					    PQresultStatus(res) != PGRES_TUPLES_OK &&
! 					    PQresultStatus(res) != PGRES_EMPTY_QUERY)
! 					{
  						rstat = PQresultStatus(res);
! 						slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
! 						dstring_free(&query1);
! 						slon_retry();
  					}
- 					rstat = PQresultStatus(res);
- 					slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
- 				}
  	
! 				(void) slon_mkquery(&query1,
! 						"select %s.ddlScript_complete_int(%d, %d); " 
! 						"set session_replication_role to replica; ",
! 					     rtcfg_namespace,
! 					     ddl_setid,
! 					     ddl_only_on_node);
  
! 				/* DDL_SCRIPT needs to be turned into a log shipping script */
! 				/* Note that the issue about parsing that mandates breaking 
! 				   up compound statements into
! 				   individually-processed statements does not apply to log
! 				   shipping as psql parses and processes each statement
! 				   individually */
  
! 				if (archive_dir)
! 				{
! 					if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
  					{
  
! 						if (archive_append_str(node, "set session_replication_role to local;\n") < 0)
! 							slon_retry();
! 						if (archive_append_str(node, ddl_script) < 0)
! 							slon_retry();
! 						if (archive_append_str(node, "set session_replication_role to replica;\n") < 0)
! 							slon_retry();
  					}
  				}
--- 1250,1362 ----
  				int			ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
  				int num_statements = -1, stmtno;
+ 				int  node_in_set;
  
  				PGresult *res;
  				ExecStatusType rstat;
  
+ 				/**
+ 				 * Check to make sure this node is part of the set
+ 				 */
+ 				slon_log(SLON_INFO, "Checking local node id\n");
+ 				localNodeId = db_getLocalNodeId(local_dbconn);
+ 				slon_log(SLON_INFO,"Found local node id\n");
+ 				node_in_set = check_set_subscriber(ddl_setid,localNodeId,local_dbconn);
+ 				
+ 				if(!node_in_set) {
+ 					/**
+ 					 *
+ 					 * Node is not part of the set.  
+ 					 * Do not forward the DDL to the node,
+ 					 * nor should it be included in the log for log-shipping.
+ 					 */
+ 					slon_log(SLON_INFO,"Not forwarding DDL to node %d for set %d\n",
+ 						 node->no_id,ddl_setid);
+ 					
+ 				} else {
  
! 					slon_appendquery(&query1,
! 							 "set session_replication_role to local; "
! 							 "select %s.ddlScript_prepare_int(%d, %d); ",
! 							 rtcfg_namespace,
! 							 ddl_setid, ddl_only_on_node);
  
! 					if (query_execute(node, local_dbconn, &query1) < 0) {
  						slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %d\n",
  							 node->no_id, ddl_setid, ddl_only_on_node);
  						slon_retry();
! 					}
  
! 					num_statements = scan_for_statements (ddl_script);
! 					slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
  						 node->no_id, num_statements);
! 					if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
! 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", 
! 							 node->no_id, num_statements);
  						slon_retry();
  					}
! 				
! 					for (stmtno=0; stmtno < num_statements;  stmtno++) {
! 						int startpos, endpos;
! 						char *dest;
! 						if (stmtno == 0)
! 							startpos = 0;
! 						else
! 							startpos = STMTS[stmtno-1];
  
! 						endpos = STMTS[stmtno];
! 						dest = (char *) malloc (endpos - startpos + 1);
! 						if (dest == 0) {
! 							slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", 
! 								 node->no_id, endpos - startpos + 1);
! 							slon_retry();
! 						}
! 						strncpy(dest, ddl_script + startpos, endpos-startpos);
! 						dest[STMTS[stmtno]-startpos] = 0;
! 						(void) slon_mkquery(&query1, "%s", dest);
! 						slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", 
! 							 node->no_id, stmtno, dest);						 
! 						free(dest);
  
! 						res = PQexec(local_dbconn, dstring_data(&query1));
! 
! 						if (PQresultStatus(res) != PGRES_COMMAND_OK && 
! 						    PQresultStatus(res) != PGRES_TUPLES_OK &&
! 						    PQresultStatus(res) != PGRES_EMPTY_QUERY)
! 						{
! 							rstat = PQresultStatus(res);
! 							slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
! 							dstring_free(&query1);
! 							slon_retry();
! 						}
  						rstat = PQresultStatus(res);
! 						slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
  					}
  	
! 					(void) slon_mkquery(&query1,
! 							    "select %s.ddlScript_complete_int(%d, %d); " 
! 							    "set session_replication_role to replica; ",
! 							    rtcfg_namespace,
! 							    ddl_setid,
! 							    ddl_only_on_node);
  
! 					/* DDL_SCRIPT needs to be turned into a log shipping script */
! 					/* Note that the issue about parsing that mandates breaking 
! 					   up compound statements into
! 					   individually-processed statements does not apply to log
! 					   shipping as psql parses and processes each statement
! 					   individually */
  
! 					if (archive_dir)
  					{
+ 						if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
+ 						{
  
! 							if (archive_append_str(node, "set session_replication_role to local;\n") < 0)
! 								slon_retry();
! 							if (archive_append_str(node, ddl_script) < 0)
! 								slon_retry();
! 							if (archive_append_str(node, "set session_replication_role to replica;\n") < 0)
! 								slon_retry();
! 						}
  					}
  				}
***************
*** 4992,5000 ****
  	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
  	{
! 		slon_log(SLON_ERROR,
! 				 "remoteWorkerThread_%d: \"%s\" %s %s\n",
  				 node->no_id, dstring_data(&query),
  				 PQresStatus(rc),
! 				 PQresultErrorMessage(res));
  		PQclear(res);
  		dstring_free(&query);
--- 5013,5033 ----
  	if ((rc = PQresultStatus(res)) != PGRES_TUPLES_OK)
  	{
! 		/* see what kind of error it is... */
! #define CONCUPDATEMSG "ERROR:  could not serialize access due to concurrent update"
! 		if (strncmp(CONCUPDATEMSG, PQresultErrorMessage(res), strlen(CONCUPDATEMSG)) == 0) {
! 			slon_log(SLON_WARN, "serialization problem updating sl_archive_counter: restarting slon\n");
! 			slon_mkquery(&query,
! 				     "notify \"_%s_Restart\"; ",
! 				     rtcfg_cluster_name);
! 			PQexec(dbconn, dstring_data(&query));
! 		} else {
! 
! 			slon_log(SLON_WARN, "error message was [%s]\n", PQresultErrorMessage(res));
! 			slon_log(SLON_ERROR,
! 					 "remoteWorkerThread_%d: \"%s\" %s %s\n",
  				 node->no_id, dstring_data(&query),
  				 PQresStatus(rc),
! 					 PQresultErrorMessage(res));
! 		}
  		PQclear(res);
  		dstring_free(&query);



More information about the Slony1-commit mailing list