Tue Apr 3 14:55:06 PDT 2007
- Previous message: [Slony1-commit] slony1-engine RELEASE-1.2.10 configure
- Next message: [Slony1-commit] slony1-engine/src/slonik slonik.c
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Update of /home/cvsd/slony1/slony1-engine/src/slon In directory main.slony.info:/tmp/cvs-serv3668/src/slon Modified Files: Tag: REL_1_2_STABLE remote_worker.c Log Message: Update to DDL handling - when a script is specified with "EXECUTE ONLY ON" a specific node, it should be invoked, by slonik, only on that node. Index: remote_worker.c =================================================================== RCS file: /home/cvsd/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.124.2.12 retrieving revision 1.124.2.13 diff -C2 -d -r1.124.2.12 -r1.124.2.13 *** remote_worker.c 6 Mar 2007 18:47:45 -0000 1.124.2.12 --- remote_worker.c 3 Apr 2007 21:55:03 -0000 1.124.2.13 *************** *** 269,272 **** --- 269,275 ---- static void compress_actionseq(const char *ssy_actionseq, SlonDString * action_subquery); + static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node, + PGconn * local_dbconn, char * seqbuf ); + static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn); /* ---------- *************** *** 1340,1438 **** else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) { ! int ddl_setid = (int)strtol(event->ev_data1, NULL, 10); ! char *ddl_script = event->ev_data2; ! int ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10); ! int num_statements = -1, stmtno; ! ! PGresult *res; ! ExecStatusType rstat; ! ! ! slon_appendquery(&query1, ! "select %s.ddlScript_prepare_int(%d, %d); ", ! rtcfg_namespace, ! ddl_setid, ddl_only_on_node); ! ! if (query_execute(node, local_dbconn, &query1) < 0) { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n", ! node->no_id, ddl_setid, ddl_only_on_node); ! slon_retry(); ! } ! ! num_statements = scan_for_statements (ddl_script); ! slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n", ! node->no_id, num_statements); ! if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", ! node->no_id, num_statements); ! slon_retry(); ! } ! ! for (stmtno=0; stmtno < num_statements; stmtno++) { ! int startpos, endpos; ! char *dest; ! if (stmtno == 0) ! startpos = 0; ! else ! startpos = STMTS[stmtno-1]; ! ! endpos = STMTS[stmtno]; ! dest = (char *) malloc (endpos - startpos + 1); ! if (dest == 0) { ! slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", ! node->no_id, endpos - startpos + 1); ! slon_retry(); ! } ! strncpy(dest, ddl_script + startpos, endpos-startpos); ! dest[STMTS[stmtno]-startpos] = 0; ! slon_mkquery(&query1, dest); ! slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", ! node->no_id, stmtno, dest); ! free(dest); ! ! res = PQexec(local_dbconn, dstring_data(&query1)); ! ! if (PQresultStatus(res) != PGRES_COMMAND_OK && ! PQresultStatus(res) != PGRES_TUPLES_OK && ! PQresultStatus(res) != PGRES_EMPTY_QUERY) ! { ! rstat = PQresultStatus(res); ! slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat)); ! dstring_free(&query1); ! slon_retry(); ! } ! rstat = PQresultStatus(res); ! slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat)); ! } ! ! slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ", ! rtcfg_namespace, ! ddl_setid, ! ddl_only_on_node); ! ! /* DDL_SCRIPT needs to be turned into a log shipping script */ ! /* Note that the issue about parsing that mandates breaking ! up compound statements into ! individually-processed statements does not apply to log ! shipping as psql parses and processes each statement ! individually */ ! ! if (archive_dir) ! { ! if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) ! { ! ! if (archive_open(node, seqbuf) < 0) ! slon_retry(); ! if (archive_tracking(node, rtcfg_namespace, ! ddl_setid, seqbuf, seqbuf, ! event->ev_timestamp_c) < 0) ! slon_retry(); ! if (archive_append_str(node, ddl_script) < 0) ! slon_retry(); ! if (archive_close(node) < 0) ! slon_retry(); ! } ! } } else if (strcmp(event->ev_type, "RESET_CONFIG") == 0) --- 1343,1347 ---- else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) { ! process_ddl_script(event,node,local_dbconn,seqbuf); } else if (strcmp(event->ev_type, "RESET_CONFIG") == 0) *************** *** 6098,6099 **** --- 6007,6181 ---- slon_log(SLON_DEBUG4, " compressed actionseq subquery... %s\n", dstring_data(action_subquery)); } + + + /** + * + * Process a ddl_script command. + */ + static int process_ddl_script(SlonWorkMsg_event * event,SlonNode * node, + PGconn * local_dbconn, + char * seqbuf) + { + int ddl_setid = (int)strtol(event->ev_data1, NULL, 10); + char *ddl_script = event->ev_data2; + int ddl_only_on_node = (int)strtol(event->ev_data3, NULL, 10); + int num_statements = -1, stmtno; + int node_in_set; + int localNodeId; + PGresult *res; + ExecStatusType rstat; + SlonDString query1; + + + + dstring_init(&query1); + /** + * Check to make sure this node is part of the set + */ + slon_log(SLON_INFO, "Checking local node id\n"); + localNodeId = db_getLocalNodeId(local_dbconn); + slon_log(SLON_INFO,"Found local node id\n"); + node_in_set = check_set_subscriber(ddl_setid,localNodeId,local_dbconn); + + if(!node_in_set) { + /** + * + * Node is not part of the set. + * Do not forward teh DDL to the node, + * nor should it be included in the log for log-shipping. + */ + slon_log(SLON_INFO,"Not forwarding DDL to node %d for set %d\n", + node->no_id,ddl_setid); + + } + else + { + slon_appendquery(&query1, + "select %s.ddlScript_prepare_int(%d, %d); ", + rtcfg_namespace, + ddl_setid, ddl_only_on_node); + + if (query_execute(node, local_dbconn, &query1) < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL preparation failed - set %d - only on node %\n", + node->no_id, ddl_setid, ddl_only_on_node); + slon_retry(); + } + + num_statements = scan_for_statements (ddl_script); + slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL request with %d statements\n", + node->no_id, num_statements); + if ((num_statements < 0) || (num_statements >= MAXSTATEMENTS)) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL had invalid number of statements - %d\n", + node->no_id, num_statements); + slon_retry(); + } + + for (stmtno=0; stmtno < num_statements; stmtno++) { + int startpos, endpos; + char *dest; + if (stmtno == 0) + startpos = 0; + else + startpos = STMTS[stmtno-1]; + + endpos = STMTS[stmtno]; + dest = (char *) malloc (endpos - startpos + 1); + if (dest == 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: malloc() failure in DDL_SCRIPT - could not allocate %d bytes of memory\n", + node->no_id, endpos - startpos + 1); + slon_retry(); + } + strncpy(dest, ddl_script + startpos, endpos-startpos); + dest[STMTS[stmtno]-startpos] = 0; + slon_mkquery(&query1, dest); + slon_log(SLON_CONFIG, "remoteWorkerThread_%d: DDL Statement %d: [%s]\n", + node->no_id, stmtno, dest); + free(dest); + + res = PQexec(local_dbconn, dstring_data(&query1)); + + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK && + PQresultStatus(res) != PGRES_EMPTY_QUERY) + { + rstat = PQresultStatus(res); + slon_log(SLON_ERROR, "DDL Statement failed - %s\n", PQresStatus(rstat)); + dstring_free(&query1); + slon_retry(); + } + rstat = PQresultStatus(res); + slon_log (SLON_CONFIG, "DDL success - %s\n", PQresStatus(rstat)); + } + + slon_mkquery(&query1, "select %s.ddlScript_complete_int(%d, %d); ", + rtcfg_namespace, + ddl_setid, + ddl_only_on_node); + + if (query_execute(node, local_dbconn, &query1) < 0) { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: DDL completion failed - set %d - only on node %\n", + node->no_id, ddl_setid, ddl_only_on_node); + slon_retry(); + } + + /* DDL_SCRIPT needs to be turned into a log shipping script */ + /* Note that the issue about parsing that mandates breaking + up compound statements into + individually-processed statements does not apply to log + shipping as psql parses and processes each statement + individually */ + + if (archive_dir) + { + if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) + { + + if (archive_open(node, seqbuf) < 0) + slon_retry(); + if (archive_tracking(node, rtcfg_namespace, + ddl_setid, seqbuf, seqbuf, + event->ev_timestamp_c) < 0) + slon_retry(); + if (archive_append_str(node, ddl_script) < 0) + slon_retry(); + if (archive_close(node) < 0) + slon_retry(); + } + } + }/*else node a subscriber */ + + dstring_free(&query1); + + } + + /** + * Checks to see if the node specified is a member of the set. + * + */ + static int check_set_subscriber(int set_id, int node_id,PGconn * local_dbconn) + { + + + SlonDString query1; + PGresult* res; + dstring_init(&query1); + + slon_appendquery(&query1,"select 1 from %s.sl_subscribe WHERE sub_set=%d AND sub_receiver=%d for update" + ,rtcfg_namespace,set_id,node_id); + res = PQexec(local_dbconn,dstring_data(&query1)); + if(PQresultStatus(res)!=PGRES_TUPLES_OK) { + slon_log(SLON_ERROR,"remoteWorkerThread_%d: DDL preperation can not check set membership" + ,node_id); + dstring_free(&query1); + slon_retry(); + } + dstring_free(&query1); + if(PQntuples(res)==0) { + PQclear(res); + return 0; + } + PQclear(res); + return 1; + + + }
- Previous message: [Slony1-commit] slony1-engine RELEASE-1.2.10 configure
- Next message: [Slony1-commit] slony1-engine/src/slonik slonik.c
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list