Steve Singer ssinger at ca.afilias.info
Tue Jun 29 06:00:31 PDT 2010
Check the no_active column in the database when adding the
node structure in memory.  This will allow slon to create
a remoteWorkerThread when a node is being added.
---
 src/slon/local_listen.c   |    6 +++---
 src/slon/remote_worker.c  |    8 +++++---
 src/slon/runtime_config.c |   41 +++++++++++++++++++++++++++++++++++------
 src/slon/slon.c           |    4 ++--
 src/slon/slon.h           |    4 ++--
 5 files changed, 47 insertions(+), 16 deletions(-)

diff --git a/src/slon/local_listen.c b/src/slon/local_listen.c
index e10efe7..8ea126b 100644
--- a/src/slon/local_listen.c
+++ b/src/slon/local_listen.c
@@ -229,7 +229,7 @@ localListenThread_main(/* @unused@ */ void *dummy)
 				no_comment = PQgetvalue(res, tupno, 7);
 
 				if (no_id != rtcfg_nodeid)
-					rtcfg_storeNode(no_id, no_comment);
+					rtcfg_storeNode(no_id, no_comment,dbconn);
 
 				rtcfg_reloadListen(dbconn);
 			}
@@ -295,7 +295,7 @@ localListenThread_main(/* @unused@ */ void *dummy)
 				no_provider = (int)strtol(PQgetvalue(res, tupno, 7), NULL, 10);
 				no_comment = PQgetvalue(res, tupno, 8);
 
-				rtcfg_storeNode(no_id, no_comment);
+				rtcfg_storeNode(no_id, no_comment,dbconn);
 			}
 			else if (strcmp(ev_type, "STORE_PATH") == 0)
 			{
@@ -313,7 +313,7 @@ localListenThread_main(/* @unused@ */ void *dummy)
 				pa_connretry = (int)strtol(PQgetvalue(res, tupno, 9), NULL, 10);
 
 				if (pa_client == rtcfg_nodeid)
-					rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
+					rtcfg_storePath(pa_server, pa_conninfo, pa_connretry,dbconn);
 
 				rtcfg_reloadListen(dbconn);
 			}
diff --git a/src/slon/remote_worker.c b/src/slon/remote_worker.c
index a3d58e5..1761487 100644
--- a/src/slon/remote_worker.c
+++ b/src/slon/remote_worker.c
@@ -753,7 +753,7 @@ remoteWorkerThread_main(void *cdata)
 				char	   *no_comment = event->ev_data2;
 
 				if (no_id != rtcfg_nodeid)
-					rtcfg_storeNode(no_id, no_comment);
+					rtcfg_storeNode(no_id, no_comment,local_dbconn);
 
 				slon_appendquery(&query1,
 								 "select %s.storeNode_int(%d, '%q'); ",
@@ -831,12 +831,14 @@ remoteWorkerThread_main(void *cdata)
 				int			no_provider = (int) strtol(event->ev_data2, NULL, 10);
 				char	   *no_comment = event->ev_data3;
 
-				rtcfg_storeNode(no_id, no_comment);
 
 				slon_appendquery(&query1,
 							"select %s.cloneNodePrepare_int(%d, %d, '%q'); ",
 								 rtcfg_namespace,
 								 no_id, no_provider, no_comment);
+				query_execute(node,local_dbconn,&query1);
+				dstring_reset(&query1);
+				rtcfg_storeNode(no_id, no_comment,local_dbconn);
 			}
 			else if (strcmp(event->ev_type, "STORE_PATH") == 0)
 			{
@@ -846,7 +848,7 @@ remoteWorkerThread_main(void *cdata)
 				int			pa_connretry = (int) strtol(event->ev_data4, NULL, 10);
 
 				if (pa_client == rtcfg_nodeid)
-					rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
+				  rtcfg_storePath(pa_server, pa_conninfo, pa_connretry,local_dbconn);
 
 				slon_appendquery(&query1,
 							   "select %s.storePath_int(%d, %d, '%q', %d); ",
diff --git a/src/slon/runtime_config.c b/src/slon/runtime_config.c
index cfdb5c1..c046f97 100644
--- a/src/slon/runtime_config.c
+++ b/src/slon/runtime_config.c
@@ -104,9 +104,11 @@ rtcfg_unlock(void)
  * ----------
  */
 void
-rtcfg_storeNode(int no_id, char *no_comment)
+rtcfg_storeNode(int no_id, char *no_comment,PGconn * dbconn)
 {
 	SlonNode   *node;
+	SlonDString query;
+	PGresult   *res;
 
 	rtcfg_lock();
 
@@ -134,6 +136,7 @@ rtcfg_storeNode(int no_id, char *no_comment)
 			 "storeNode: no_id=%d no_comment='%s'\n",
 			 no_id, no_comment);
 
+
 	node = (SlonNode *) malloc(sizeof(SlonNode));
 	if (node == NULL)
 	{
@@ -145,6 +148,35 @@ rtcfg_storeNode(int no_id, char *no_comment)
 	node->no_id = no_id;
 	node->no_active = false;
 	node->no_comment = strdup(no_comment);
+
+	if(dbconn != NULL)
+	{
+		/**
+		 * Get the active status of the node from sl_node.
+		 * dbconn can be NULL if the caller knows that
+		 * no node entry exists yet.
+		 */
+		dstring_init(&query);
+		slon_mkquery(&query,"select no_active from %s.sl_node where"
+					 " no_id=%d",rtcfg_namespace,no_id);
+		res = PQexec(dbconn,dstring_data(&query));
+
+		if(PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			slon_log(SLON_FATAL,"Cannot get no_active from sl_node - no_id=%d",
+					 no_id);
+			PQclear(res);
+			dstring_free(&query);
+			slon_retry();
+		}
+		if(PQntuples(res) > 0)
+		{
+			node->no_active=(*PQgetvalue(res, 0, 0) == 't') ? 1 : 0;
+		}
+		PQclear(res);
+		dstring_free(&query);
+	}/*dbconn*/
+
 	pthread_mutex_init(&(node->message_lock), NULL);
 	pthread_cond_init(&(node->message_cond), NULL);
 
@@ -318,7 +350,7 @@ rtcfg_findNode(int no_id)
  * ----------
  */
 void
-rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
+rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry,PGconn * dbconn)
 {
 	SlonNode   *node;
 
@@ -332,7 +364,7 @@ rtcfg_storePath(int pa_server, char *pa_conninfo, int pa_connretry)
 
 		slon_log(SLON_WARN,
 			   "storePath: unknown node ID %d - event pending\n", pa_server);
-		rtcfg_storeNode(pa_server, "<event pending>");
+		rtcfg_storeNode(pa_server, "<event pending>",dbconn);
 
 		rtcfg_lock();
 		node = rtcfg_findNode(pa_server);
@@ -468,11 +500,9 @@ rtcfg_reloadListen(PGconn *db)
 	{
 		int			li_origin = (int)strtol(PQgetvalue(res, i, 0), NULL, 10);
 		int			li_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10);
-
 		rtcfg_storeListen(li_origin, li_provider);
 	}
 	PQclear(res);
-
 	dstring_free(&query);
 
 	for (node = rtcfg_node_list_head; node; node = node->next)
@@ -915,7 +945,6 @@ rtcfg_startStopNodeThread(SlonNode * node)
 	int			need_wakeup = false;
 
 	rtcfg_lock();
-
 	if (sched_get_status() == SCHED_STATUS_OK && node->no_active)
 	{
 		/*
diff --git a/src/slon/slon.c b/src/slon/slon.c
index 7d11126..61c6c39 100644
--- a/src/slon/slon.c
+++ b/src/slon/slon.c
@@ -532,7 +532,7 @@ SlonMain(void)
 			 * Add a remote node
 			 */
 			slon_scanint64(PQgetvalue(res, i, 3), &last_event);
-			rtcfg_storeNode(no_id, no_comment);
+			rtcfg_storeNode(no_id, no_comment,NULL);
 			rtcfg_setNodeLastEvent(no_id, last_event);
 
 			/*
@@ -567,7 +567,7 @@ SlonMain(void)
 		char	   *pa_conninfo = PQgetvalue(res, i, 1);
 		int			pa_connretry = (int)strtol(PQgetvalue(res, i, 2), NULL, 10);
 
-		rtcfg_storePath(pa_server, pa_conninfo, pa_connretry);
+		rtcfg_storePath(pa_server, pa_conninfo, pa_connretry,NULL);
 	}
 	PQclear(res);
 
diff --git a/src/slon/slon.h b/src/slon/slon.h
index b6e509a..b7f4323 100644
--- a/src/slon/slon.h
+++ b/src/slon/slon.h
@@ -428,7 +428,7 @@ extern pthread_cond_t slon_wait_listen_cond;
 extern void rtcfg_lock(void);
 extern void rtcfg_unlock(void);
 
-extern void rtcfg_storeNode(int no_id, char *no_comment);
+extern void rtcfg_storeNode(int no_id, char *no_comment,PGconn * dbconn);
 extern void rtcfg_enableNode(int no_id);
 extern void rtcfg_disableNode(int no_id);
 extern SlonNode *rtcfg_findNode(int no_id);
@@ -436,7 +436,7 @@ extern int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq);
 extern int64 rtcfg_getNodeLastEvent(int no_id);
 
 extern void rtcfg_storePath(int pa_server, char *pa_conninfo,
-				int pa_connretry);
+							int pa_connretry,PGconn * dbconn);
 extern void rtcfg_dropPath(int pa_server);
 
 extern void rtcfg_reloadListen(PGconn *db);
-- 
1.6.3.3


--------------050007080808000208000800--


More information about the Slony1-patches mailing list