Chris Browne cbbrowne at lists.slony.info
Tue Apr 3 14:55:06 PDT 2007
Update of /home/cvsd/slony1/slony1-engine/src/slon
In directory main.slony.info:/tmp/cvs-serv3668/src/slon

Modified Files:
      Tag: REL_1_2_STABLE
	remote_worker.c 
Log Message:
Update to DDL handling - when a script is specified with "EXECUTE ONLY ON"
a specific node, it should be invoked, by slonik, only on that node.


Index: remote_worker.c
===================================================================
RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v
retrieving revision 1.124.2.12
retrieving revision 1.124.2.13
diff -C2 -d -r1.124.2.12 -r1.124.2.13
*** remote_worker.c	6 Mar 2007 18:47:45 -0000	1.124.2.12
--- remote_worker.c	3 Apr 2007 21:55:03 -0000	1.124.2.13
***************
*** 269,272 ****
--- 269,275 ----
  static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery);
  
+ static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node,
+ 							  PGconn * local_dbconn, char * seqbuf );
+ static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn);
  
  /* ----------
***************
*** 1340,1438 ****
  			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
  			{
! 				int			ddl_setid = (int)strtol(event->ev_data1, NULL, 10);
! 				char	   *ddl_script = event->ev_data2;
! 				int			ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
! 				int num_statements = -1, stmtno;
! 
! 				PGresult *res;
! 				ExecStatusType rstat;
! 
! 
! 				slon_appendquery(&query1,
! 						 "select %s.ddlScript_prepare_int(%d, %d); ",
! 						 rtcfg_namespace,
! 						 ddl_setid, ddl_only_on_node);
! 
! 				if (query_execute(node, local_dbconn, &query1) < 0) {
! 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n",
! 							 node->no_id, ddl_setid, ddl_only_on_node);
! 						slon_retry();
! 				}
! 
! 				num_statements = scan_for_statements (ddl_script);
! 				slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
! 					 node->no_id, num_statements);
! 				if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
! 					slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", 
! 						 node->no_id, num_statements);
! 					slon_retry();
! 				}
! 				
! 				for (stmtno=0; stmtno < num_statements;  stmtno++) {
! 					int startpos, endpos;
! 					char *dest;
! 					if (stmtno == 0)
! 						startpos = 0;
! 					else
! 						startpos = STMTS[stmtno-1];
! 
! 					endpos = STMTS[stmtno];
! 					dest = (char *) malloc (endpos - startpos + 1);
! 					if (dest == 0) {
! 						slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", 
! 							 node->no_id, endpos - startpos + 1);
! 						slon_retry();
! 					}
! 					strncpy(dest, ddl_script + startpos, endpos-startpos);
! 					dest[STMTS[stmtno]-startpos] = 0;
! 					slon_mkquery(&query1, dest);
! 					slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", 
! 						 node->no_id, stmtno, dest);						 
! 					free(dest);
! 
! 					res = PQexec(local_dbconn, dstring_data(&query1));
! 
! 					if (PQresultStatus(res) != PGRES_COMMAND_OK && 
! 					    PQresultStatus(res) != PGRES_TUPLES_OK &&
! 					    PQresultStatus(res) != PGRES_EMPTY_QUERY)
! 					{
! 						rstat = PQresultStatus(res);
! 						slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
! 						dstring_free(&query1);
! 						slon_retry();
! 					}
! 					rstat = PQresultStatus(res);
! 					slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
! 				}
! 	
! 				slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ", 
! 					     rtcfg_namespace,
! 					     ddl_setid,
! 					     ddl_only_on_node);
! 
! 				/* DDL_SCRIPT needs to be turned into a log shipping script */
! 				/* Note that the issue about parsing that mandates breaking 
! 				   up compound statements into
! 				   individually-processed statements does not apply to log
! 				   shipping as psql parses and processes each statement
! 				   individually */
! 
! 				if (archive_dir)
! 				{
! 					if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
! 					{
! 
! 						if (archive_open(node, seqbuf) < 0)
! 							slon_retry();
! 						if (archive_tracking(node, rtcfg_namespace, 
! 								ddl_setid, seqbuf, seqbuf, 
! 								event->ev_timestamp_c) < 0)
! 							slon_retry();
! 						if (archive_append_str(node, ddl_script) < 0)
! 							slon_retry();
! 						if (archive_close(node) < 0)
! 							slon_retry();
! 					}
! 				}
  			}
  			else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
--- 1343,1347 ----
  			else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0)
  			{
! 				process_ddl_script(event,node,local_dbconn,seqbuf);
  			}
  			else if (strcmp(event->ev_type, "RESET_CONFIG") == 0)
***************
*** 6098,6099 ****
--- 6007,6181 ----
  	slon_log(SLON_DEBUG4, " compressed actionseq subquery... %s\n", dstring_data(action_subquery));
  }
+ 
+ 
+ /**
+  *
+  * Process a ddl_script command.
+  */
+ static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node,
+ 							  PGconn * local_dbconn,
+ 							  char * seqbuf) 
+ {
+ 	int			ddl_setid = (int)strtol(event->ev_data1, NULL, 10);
+ 	char	   *ddl_script = event->ev_data2;
+ 	int			ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10);
+ 	int num_statements = -1, stmtno;
+ 	int node_in_set;
+ 	int localNodeId;
+ 	PGresult *res;
+ 	ExecStatusType rstat;
+ 	SlonDString query1;
+ 
+ 	
+ 
+ 	dstring_init(&query1);
+ 	/**
+ 	 * Check to make sure this node is part of the set
+ 	 */
+ 	slon_log(SLON_INFO, "Checking local node id\n");
+ 	localNodeId = db_getLocalNodeId(local_dbconn);
+ 	slon_log(SLON_INFO,"Found local node id\n");
+ 	node_in_set = check_set_subscriber(ddl_setid,localNodeId,local_dbconn);
+ 	
+ 	if(!node_in_set) {
+ 		/**
+ 		 *
+ 		 * Node is not part of the set.  
+ 		 * Do not forward teh DDL to the node,
+ 		 * nor should it be included in the log for log-shipping.
+ 		 */
+ 		slon_log(SLON_INFO,"Not forwarding DDL to node %d for set %d\n",
+ 				 node->no_id,ddl_setid);
+ 		
+ 	}
+ 	else 
+ 	{
+ 		slon_appendquery(&query1,
+ 						 "select %s.ddlScript_prepare_int(%d, %d); ",
+ 						 rtcfg_namespace,
+ 						 ddl_setid, ddl_only_on_node);
+ 		
+ 		if (query_execute(node, local_dbconn, &query1) < 0) {
+ 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n",
+ 					 node->no_id, ddl_setid, ddl_only_on_node);			
+ 			slon_retry();
+ 		}
+ 		
+ 		num_statements = scan_for_statements (ddl_script);
+ 		slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n",
+ 				 node->no_id, num_statements);
+ 		if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) {
+ 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", 
+ 					 node->no_id, num_statements);
+ 			slon_retry();
+ 		}
+ 		
+ 		for (stmtno=0; stmtno < num_statements;  stmtno++) {
+ 			int startpos, endpos;
+ 			char *dest;
+ 			if (stmtno == 0)
+ 				startpos = 0;
+ 			else
+ 				startpos = STMTS[stmtno-1];
+ 			
+ 			endpos = STMTS[stmtno];
+ 			dest = (char *) malloc (endpos - startpos + 1);
+ 			if (dest == 0) {
+ 				slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", 
+ 						 node->no_id, endpos - startpos + 1);
+ 				slon_retry();
+ 			}
+ 			strncpy(dest, ddl_script + startpos, endpos-startpos);
+ 			dest[STMTS[stmtno]-startpos] = 0;
+ 			slon_mkquery(&query1, dest);
+ 			slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", 
+ 					 node->no_id, stmtno, dest);						 
+ 			free(dest);
+ 			
+ 			res = PQexec(local_dbconn, dstring_data(&query1));
+ 			
+ 			if (PQresultStatus(res) != PGRES_COMMAND_OK && 
+ 				PQresultStatus(res) != PGRES_TUPLES_OK &&
+ 				PQresultStatus(res) != PGRES_EMPTY_QUERY)
+ 				{
+ 					rstat = PQresultStatus(res);
+ 					slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat));
+ 					dstring_free(&query1);
+ 					slon_retry();
+ 				}
+ 			rstat = PQresultStatus(res);
+ 			slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat));
+ 		}
+ 		
+ 		slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ", 
+ 					 rtcfg_namespace,
+ 					 ddl_setid,
+ 					 ddl_only_on_node);
+ 
+ 		if (query_execute(node, local_dbconn, &query1) < 0) {
+ 			slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL completion failed - set %d - only on node %\n",
+ 					 node->no_id, ddl_setid, ddl_only_on_node);			
+ 			slon_retry();
+ 		}
+ 		
+ 		/* DDL_SCRIPT needs to be turned into a log shipping script */
+ 		/* Note that the issue about parsing that mandates breaking 
+ 		   up compound statements into
+ 		   individually-processed statements does not apply to log
+ 		   shipping as psql parses and processes each statement
+ 		   individually */
+ 		
+ 		if (archive_dir)
+ 			{
+ 				if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid))
+ 					{
+ 						
+ 						if (archive_open(node, seqbuf) < 0)
+ 							slon_retry();
+ 						if (archive_tracking(node, rtcfg_namespace, 
+ 											 ddl_setid, seqbuf, seqbuf, 
+ 											 event->ev_timestamp_c) < 0)
+ 							slon_retry();
+ 						if (archive_append_str(node, ddl_script) < 0)
+ 							slon_retry();
+ 						if (archive_close(node) < 0)
+ 							slon_retry();
+ 					}
+ 			}
+ 	}/*else node a subscriber */
+ 	
+ 	dstring_free(&query1);
+ 
+ }
+ 
+ /**
+  * Checks to see if the node specified is a member of the set.
+  *
+  */
+ static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn) 
+ {
+   
+   
+   SlonDString query1;
+   PGresult* res;
+   dstring_init(&query1);
+ 
+   slon_appendquery(&query1,"select 1 from %s.sl_subscribe WHERE sub_set=%d AND sub_receiver=%d for update"
+ 	       ,rtcfg_namespace,set_id,node_id);
+   res = PQexec(local_dbconn,dstring_data(&query1));
+   if(PQresultStatus(res)!=PGRES_TUPLES_OK) {
+     slon_log(SLON_ERROR,"remoteWorkerThread_%d: DDL preperation can not check set membership"
+ 	     ,node_id);
+ 	dstring_free(&query1);
+     slon_retry();
+   }
+   dstring_free(&query1);
+   if(PQntuples(res)==0) {
+     PQclear(res);
+     return 0;
+   }
+   PQclear(res);
+   return 1;
+ 
+ 
+ }



More information about the Slony1-commit mailing list