Wed Oct 25 05:57:07 PDT 2006
- Previous message: [Slony1-commit] By cbbrowne: Bug #1591 - large tuples in sl_log_2 not being found - The
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Fixed archive log writing. Jan Tags: ---- REL_1_1_STABLE Modified Files: -------------- slony1-engine/src/slon: remote_worker.c (r1.86.2.14 -> r1.86.2.15) slon.h (r1.48.2.1 -> r1.48.2.2) -------------- next part -------------- Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.86.2.14 retrieving revision 1.86.2.15 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.86.2.14 -r1.86.2.15 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -240,21 +240,20 @@ WorkerGroupData * wd, SlonWorkMsg_event * event); static void *sync_helper(void *cdata); -static char archive_name[SLON_MAX_PATH]; -static char archive_tmp[SLON_MAX_PATH]; -static FILE *archive_fp = NULL; -static int open_log_archive (int node_id, char *seqbuf); -static int close_log_archive (); -static void terminate_log_archive (); -static int generate_archive_header (int node_id, const char *seqbuf); -static int submit_query_to_archive(SlonDString *ds); -static int submit_string_to_archive (const char *s); -static int submit_raw_data_to_archive (const char *s); -static int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, +static int archive_open (SlonNode *node, char *seqbuf); +static int archive_close (SlonNode *node); +static void archive_terminate (SlonNode *node); +static int archive_append_ds (SlonNode *node, SlonDString *ds); +static int archive_append_str (SlonNode *node, const char *s); +static int archive_append_data (SlonNode *node, const char *data, int len); + + +static int archive_tracking (SlonNode *node, const char *namespace, + int sub_set, const char *firstseq, const char *seqbuf, const char *timestamp); -static void write_void_log (int node_id, char *seqbuf, const char *message); -#define TERMINATE_QUERY_AND_ARCHIVE dstring_free(&query); terminate_log_archive(); +static void archive_void_log (SlonNode *node, char *seqbuf, const char *message); + /* * ---------- slon_remoteWorkerThread @@ -592,7 +591,7 @@ need_reloadListen = true; if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_NODE"); + archive_void_log (node, seqbuf, "-- STORE_NODE"); } else if (strcmp(event->ev_type, "ENABLE_NODE") == 0) @@ -610,7 +609,7 @@ need_reloadListen = true; if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- ENABLE_NODE"); + archive_void_log (node, seqbuf, "-- ENABLE_NODE"); } else if (strcmp(event->ev_type, "DROP_NODE") == 0) { @@ -661,7 +660,7 @@ need_reloadListen = true; if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_NODE"); + archive_void_log (node, seqbuf, "-- DROP_NODE"); } else if (strcmp(event->ev_type, "STORE_PATH") == 0) { @@ -680,7 +679,7 @@ need_reloadListen = true; if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_PATH"); + archive_void_log (node, seqbuf, "-- STORE_PATH"); } else if (strcmp(event->ev_type, "DROP_PATH") == 0) { @@ -697,7 +696,7 @@ need_reloadListen = true; if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_PATH"); + archive_void_log (node, seqbuf, "-- DROP_PATH"); } else if (strcmp(event->ev_type, "STORE_LISTEN") == 0) { @@ -713,7 +712,7 @@ rtcfg_namespace, li_origin, li_provider, li_receiver); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_LISTEN"); + archive_void_log (node, seqbuf, "-- STORE_LISTEN"); } else if (strcmp(event->ev_type, "DROP_LISTEN") == 0) { @@ -729,7 +728,7 @@ rtcfg_namespace, li_origin, li_provider, li_receiver); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_LISTEN"); + archive_void_log (node, seqbuf, "-- DROP_LISTEN"); } else if (strcmp(event->ev_type, "STORE_SET") == 0) { @@ -746,7 +745,7 @@ set_id, set_origin, set_comment); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_SET"); + archive_void_log (node, seqbuf, "-- STORE_SET"); } else if (strcmp(event->ev_type, "DROP_SET") == 0) { @@ -761,14 +760,17 @@ /* The table deleted needs to be * dropped from log shipping too */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - rc = generate_archive_header(rtcfg_nodeid, seqbuf); slon_mkquery(&query1, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, set_id); - rc = submit_query_to_archive(&query1); - rc = close_log_archive(); + + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &query1) < 0 || + archive_close(node) < 0) + { + slon_abort(); + } } } else if (strcmp(event->ev_type, "MERGE_SET") == 0) @@ -787,14 +789,17 @@ * being merged from the set being * maintained. */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - rc = slon_mkquery(&query1, + slon_mkquery(&query1, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, add_id); - rc = submit_query_to_archive(&query1); - rc = close_log_archive(); + + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &query1) < 0 || + archive_close(node) < 0) + { + slon_abort(); + } } } else if (strcmp(event->ev_type, "SET_ADD_TABLE") == 0) @@ -805,7 +810,7 @@ * in the runtime configuration. */ if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_TABLE"); + archive_void_log (node, seqbuf, "-- SET_ADD_TABLE"); } else if (strcmp(event->ev_type, "SET_ADD_SEQUENCE") == 0) { @@ -815,7 +820,7 @@ * maintained in the runtime configuration. */ if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_ADD_SEQUENCE"); + archive_void_log (node, seqbuf, "-- SET_ADD_SEQUENCE"); } else if (strcmp(event->ev_type, "SET_DROP_TABLE") == 0) { @@ -825,7 +830,7 @@ rtcfg_namespace, tab_id); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_TABLE"); + archive_void_log (node, seqbuf, "-- SET_DROP_TABLE"); } else if (strcmp(event->ev_type, "SET_DROP_SEQUENCE") == 0) { @@ -835,7 +840,7 @@ rtcfg_namespace, seq_id); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_DROP_SEQUENCE"); + archive_void_log (node, seqbuf, "-- SET_DROP_SEQUENCE"); } else if (strcmp(event->ev_type, "SET_MOVE_TABLE") == 0) { @@ -846,7 +851,7 @@ rtcfg_namespace, tab_id, new_set_id); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_TABLE"); + archive_void_log (node, seqbuf, "-- SET_MOVE_TABLE"); } else if (strcmp(event->ev_type, "SET_MOVE_SEQUENCE") == 0) { @@ -857,7 +862,7 @@ rtcfg_namespace, seq_id, new_set_id); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SET_MOVE_SEQUENCE"); + archive_void_log (node, seqbuf, "-- SET_MOVE_SEQUENCE"); } else if (strcmp(event->ev_type, "STORE_TRIGGER") == 0) { @@ -869,7 +874,7 @@ rtcfg_namespace, trig_tabid, trig_tgname); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- STORE_TRIGGER"); + archive_void_log (node, seqbuf, "-- STORE_TRIGGER"); } else if (strcmp(event->ev_type, "DROP_TRIGGER") == 0) { @@ -881,7 +886,7 @@ rtcfg_namespace, trig_tabid, trig_tgname); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- DROP_TRIGGER"); + archive_void_log (node, seqbuf, "-- DROP_TRIGGER"); } else if (strcmp(event->ev_type, "ACCEPT_SET") == 0) { @@ -1023,7 +1028,7 @@ failed_node, backup_node, set_id); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- FAILOVER_SET"); + archive_void_log (node, seqbuf, "-- FAILOVER_SET"); need_reloadListen = true; } else if (strcmp(event->ev_type, "SUBSCRIBE_SET") == 0) @@ -1041,7 +1046,7 @@ rtcfg_namespace, sub_set, sub_provider, sub_receiver, sub_forward); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- SUBSCRIBE_SET"); + archive_void_log (node, seqbuf, "-- SUBSCRIBE_SET"); need_reloadListen = true; } else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0) @@ -1169,14 +1174,17 @@ need_reloadListen = true; if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - rc = generate_archive_header(rtcfg_nodeid, seqbuf); slon_mkquery(&query1, "delete from %s.sl_setsync_offline " " where ssy_setid= %d;", rtcfg_namespace, sub_set); - rc = submit_query_to_archive(&query1); - rc = close_log_archive(); + + if (archive_open(node, seqbuf) < 0 || + archive_append_ds(node, &query1) < 0 || + archive_close(node) < 0) + { + slon_abort(); + } } } else if (strcmp(event->ev_type, "DDL_SCRIPT") == 0) @@ -1194,46 +1202,19 @@ /* DDL_SCRIPT needs to be turned into a log shipping script */ if (archive_dir) { if ((ddl_only_on_node < 1) || (ddl_only_on_node == rtcfg_nodeid)) { - - rc = open_log_archive(node->no_id, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not open DDL archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0) slon_abort(); - } - generate_archive_header(node->no_id, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate DDL archive header %s - %s", - node->no_id, archive_tmp, strerror(errno)); + rc = archive_tracking(node, rtcfg_namespace, ddl_setid, + seqbuf, seqbuf, event->ev_timestamp_c); + if (rc < 0) slon_abort(); - } - rc = logarchive_tracking(rtcfg_namespace, ddl_setid, seqbuf, seqbuf, event->ev_timestamp_c); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate DDL archive tracker %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_append_str(node, ddl_script) < 0) slon_abort(); - } - rc = submit_string_to_archive(ddl_script); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not submit DDL Script %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_abort(); - } - - rc = close_log_archive(); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not close DDL Script %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_close(node) < 0) slon_abort(); } } } - } else if (strcmp(event->ev_type, "RESET_CONFIG") == 0) { int reset_config_setid = (int)strtol(event->ev_data1, NULL, 10); @@ -1244,14 +1225,14 @@ rtcfg_namespace, reset_config_setid, reset_configonly_on_node); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- RESET_CONFIG"); + archive_void_log (node, seqbuf, "-- RESET_CONFIG"); } else { printf("TODO: ********** remoteWorkerThread: node %d - EVENT %d," INT64_FORMAT " %s - unknown event type\n", node->no_id, event->ev_origin, event->ev_seqno, event->ev_type); if (archive_dir) - write_void_log (rtcfg_nodeid, seqbuf, "-- UNHANDLED EVENT!!!"); + archive_void_log (node, seqbuf, "-- UNHANDLED EVENT!!!"); } /* @@ -2281,30 +2262,13 @@ Isn't it convenient that seqbuf was just populated??? :-) */ if (archive_dir) { - rc = open_log_archive(rtcfg_nodeid, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not open COPY SET archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); - slon_disconnectdb(pro_conn); - dstring_free(&query1); - dstring_free(&query2); - dstring_free(&query3); - dstring_free(&indexregenquery); - terminate_log_archive(); - return -1; - } - rc = generate_archive_header(rtcfg_nodeid, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not generate COPY SET archive header %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (archive_open(node, seqbuf) < 0) + { slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); return -1; } } @@ -2321,7 +2285,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2354,7 +2318,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (*(PQgetvalue(res1, 0, 0)) == 't') @@ -2368,7 +2332,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res1); @@ -2385,7 +2349,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2428,7 +2392,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -2465,7 +2429,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2492,7 +2456,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2521,7 +2485,7 @@ slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query3); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2561,7 +2525,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -2587,7 +2551,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2625,7 +2589,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -2666,7 +2630,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2693,7 +2657,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } rc = *PQgetvalue(res2, 0, 0) == 't'; @@ -2717,7 +2681,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } slon_log(SLON_DEBUG3, "remoteWorkerThread_%d: " @@ -2756,7 +2720,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2780,7 +2744,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples2 = PQntuples(res2); @@ -2798,7 +2762,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2828,7 +2792,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2848,7 +2812,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2876,25 +2840,21 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { slon_mkquery(&query1, "delete from %s;copy %s %s from stdin;", tab_fqname, tab_fqname, nodeon73 ? "" : PQgetvalue(res3, 0, 0)); - rc = submit_query_to_archive(&query1); + rc = archive_append_ds(node, &query1); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_d: " - "Could not generate copy_set request for %s - %s", - node->no_id, tab_fqname, strerror(errno)); - slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -2925,7 +2885,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -2956,15 +2916,12 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - rc = fwrite(copydata, 1, len, archive_fp); - if (rc != len) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "PQputCopyData() - log shipping - %s", - node->no_id, strerror(errno)); + rc = archive_append_data(node, copydata, len); + if (rc < 0) { #ifdef SLON_MEMDEBUG memset(copydata, 88, len); #endif @@ -2978,7 +2935,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3002,7 +2959,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } @@ -3026,7 +2983,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res3); @@ -3046,7 +3003,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } res2 = PQgetResult(loc_dbconn); @@ -3063,11 +3020,12 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - rc = submit_string_to_archive("\\."); + if (archive_append_str(node, "\\.") < 0) + slon_abort(); } #else /* ! HAVE_PQPUTCOPYDATA */ copydone = false; @@ -3092,12 +3050,14 @@ PQputline(loc_dbconn, copybuf); PQputline(loc_dbconn, "\n"); if (archive_dir) - submit_string_to_archive(copybuf); + if (archive_append_str(node, copybuf) < 0) + slon_abort(); break; case 1: PQputline(loc_dbconn, copybuf); if (archive_dir) - submit_raw_data_to_archive(copybuf); + if (archive_append_data(node, copybuf, strlen(copybuf) < 0) + slon_abort(); break; } @@ -3105,7 +3065,8 @@ } PQputline(loc_dbconn, "\\.\n"); if (archive_dir) { - rc = submit_string_to_archive("\\."); + if (archive_append_str(node, "\\.") < 0) + slon_abort(); } /* * End the COPY to stdout on the provider @@ -3124,7 +3085,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } PQclear(res3); @@ -3149,7 +3110,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } #endif /* HAVE_PQPUTCOPYDATA */ @@ -3174,11 +3135,12 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { - submit_query_to_archive(&query1); + if (archive_append_ds(node, &query1) < 0) + slon_abort(); } gettimeofday(&tv_now, NULL); @@ -3224,7 +3186,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res1); @@ -3250,7 +3212,8 @@ seq_fqname, seql_last_value); if (archive_dir) { - submit_query_to_archive(&query1); + if (archive_append_ds(node, &query1) < 0) + slon_abort(); } } else @@ -3270,7 +3233,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3317,7 +3280,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -3331,7 +3294,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQgetisnull(res1, 0, 0)) @@ -3381,7 +3344,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -3395,7 +3358,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ssy_seqno = PQgetvalue(res1, 0, 0); @@ -3441,7 +3404,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } ntuples1 = PQntuples(res2); @@ -3485,7 +3448,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (PQntuples(res1) != 1) @@ -3499,7 +3462,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } dstring_init(&ssy_action_list); @@ -3531,7 +3494,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } if (archive_dir) { @@ -3539,17 +3502,14 @@ "insert into %s.sl_setsync_offline (ssy_setid, ssy_seqno) " "values ('%d', '%d');", rtcfg_namespace, set_id, ssy_seqno); - rc = submit_query_to_archive(&query1); + rc = archive_append_ds(node, &query1); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - " could not insert to sl_setsync_offline", - node->no_id); slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3560,17 +3520,14 @@ TIMEVAL_DIFF(&tv_start2, &tv_now)); if (archive_dir) { - rc = close_log_archive(); + rc = archive_close(node); if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - " could not close archive log %s - %s", - node->no_id, archive_tmp, strerror(errno)); slon_disconnectdb(pro_conn); dstring_free(&query1); dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } } @@ -3588,7 +3545,7 @@ dstring_free(&query2); dstring_free(&query3); dstring_free(&indexregenquery); - terminate_log_archive(); + archive_terminate(node); return -1; } slon_disconnectdb(pro_conn); @@ -3646,21 +3603,12 @@ */ if (archive_dir) { - rc = open_log_archive(node->no_id, seqbuf); - if (rc == -1) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot open archive file %s - %s\n", - node->no_id, archive_tmp, strerror(errno)); + rc = archive_open(node, seqbuf); + if (rc < 0) + { dstring_free(&query); return 60; } - rc = generate_archive_header(node->no_id, seqbuf); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); - return 60; - } } /* @@ -3684,7 +3632,8 @@ slon_log(SLON_ERROR, "remoteWorkerThread_%d: " "No pa_conninfo for data provider %d\n", node->no_id, provider->no_id); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } sprintf(conn_symname, "subscriber_%d_provider_%d", @@ -3697,7 +3646,8 @@ "cannot connect to data provider %d on '%s'\n", node->no_id, provider->no_id, provider->pa_conninfo); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return provider->pa_connretry; } @@ -3709,7 +3659,8 @@ rtcfg_namespace, rtcfg_nodeid); if (query_execute(node, provider->conn->dbconn, &query) < 0) { - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return provider->pa_connretry; @@ -3745,7 +3696,8 @@ "for ev_origin %d\n", node->no_id, provider->no_id, event->ev_origin); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } if (prov_seqno < event->ev_seqno) @@ -3755,7 +3707,8 @@ "ev_seqno " INT64_FORMAT " for ev_origin %d\n", node->no_id, provider->no_id, prov_seqno, event->ev_origin); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 10; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -3822,7 +3775,8 @@ PQresultErrorMessage(res1)); PQclear(res1); dstring_free(&new_qual); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } @@ -3871,7 +3825,8 @@ PQclear(res2); PQclear(res1); dstring_free(&new_qual); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } ntuples2 = PQntuples(res2); @@ -3994,17 +3949,13 @@ slon_log(SLON_DEBUG2, "writing archive log...\n"); fflush(stderr); fflush(stdout); - rc = logarchive_tracking(rtcfg_namespace, sub_set, + rc = archive_tracking(node, rtcfg_namespace, sub_set, PQgetvalue(res1, tupno1, 1), seqbuf, event->ev_timestamp_c); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + if (rc < 0) return 60; } } - } PQclear(res1); /* @@ -4034,14 +3985,10 @@ node->no_id); dstring_free(&query); if (archive_dir) { - rc = close_log_archive(); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Could not close out archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + rc = archive_close(node); + if (rc < 0) return 60; } - } return 0; } @@ -4157,15 +4104,10 @@ * the archive log. */ if (archive_dir) { - rc = submit_string_to_archive(dstring_data(&(wgline->data))); - /* rc = fprintf(archive_fp, "%s", dstring_data(&(wgline->data))); */ - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + rc = archive_append_ds(node, &(wgline->data)); + if (rc < 0) return 60; } - } break; case SLON_WGLC_DONE: @@ -4245,7 +4187,8 @@ */ if (num_errors != 0) { - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; @@ -4282,7 +4225,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_disconnectdb(provider->conn); provider->conn = NULL; return 20; @@ -4300,7 +4244,8 @@ if (query_execute(node, local_dbconn, &query) < 0) { PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } @@ -4313,15 +4258,11 @@ "select %s.sequenceSetValue_offline(%s,'%s');\n", rtcfg_namespace, seql_seqid, seql_last_value); - rc = submit_query_to_archive(&query); - if (rc < 0) { - slon_log(SLON_ERROR, "remoteWorkerThread_%d: " - "Cannot write to archive file %s - %s", - node->no_id, archive_tmp, strerror(errno)); + rc = archive_append_ds(node, &query); + if (rc < 0) return 60; } } - } PQclear(res1); } @@ -4361,7 +4302,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); slon_log(SLON_ERROR, "remoteWorkerThread_%d: SYNC aborted\n", node->no_id); return 10; @@ -4387,7 +4329,8 @@ node->no_id, dstring_data(&query), PQresultErrorMessage(res1)); PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } if (PQntuples(res1) > 0) @@ -4401,7 +4344,8 @@ if (query_execute(node, local_dbconn, &query) < 0) { PQclear(res1); - TERMINATE_QUERY_AND_ARCHIVE; + dstring_free(&query); + archive_terminate(node); return 60; } slon_log(SLON_DEBUG2, "remoteWorkerThread_%d: " @@ -4416,7 +4360,8 @@ */ if (archive_dir) { - close_log_archive(); + if (archive_close(node) < 0) + slon_abort(); } /* @@ -4811,49 +4756,85 @@ /* Functions for processing log archives... - - First, you open the log archive using open_log_archive() - - - Second, you generate the header using generate_archive_header() + - First, you open the log archive using archive_open() - - Third, you need to set up the sync tracking function in the log + - Second, you need to set up the sync tracking function in the log using logarchive_tracking() ============= Here Ends The Header of the Log Shipping Archive ================== Then come the various queries (inserts/deletes/updates) that comprise the "body" of the SYNC. Probably submitted using - submit_query_to_archive(). + archive_append_*(). ============= Here Ends The Body of the Log Shipping Archive ================== Finally, the log ends, notably with a COMMIT statement, generated - using close_log_archive(), which closes the file and renames it + using archive_close(), which closes the file and renames it from ".tmp" form to the final name. */ /* Stores the archive name in archive_name (as .sql name) and archive_tmp (.tmp file) */ -int open_log_archive (int node_id, char *seqbuf) { +static int +archive_open (SlonNode *node, char *seqbuf) +{ int i; - sprintf(archive_name, "%s/slony1_log_%d_", archive_dir, node_id); + int rc; + + if (node->archive_name == NULL) + { + node->archive_name = malloc(SLON_MAX_PATH); + node->archive_temp = malloc(SLON_MAX_PATH); + if (node->archive_name == NULL || node->archive_temp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Out of memory in archive_open()", node->no_id); + slon_abort(); + } + } + + sprintf(node->archive_name, "%s/slony1_log_%d_", + archive_dir, node->no_id); for (i = strlen(seqbuf); i < 20; i++) - strcat(archive_name, "0"); - strcat(archive_name, seqbuf); - strcat(archive_name, ".sql"); - strcpy(archive_tmp, archive_name); - strcat(archive_tmp, ".tmp"); - archive_fp = fopen(archive_tmp, "w"); - if (archive_fp == NULL) { + strcat(node->archive_name, "0"); + strcat(node->archive_name, seqbuf); + strcat(node->archive_name, ".sql"); + strcpy(node->archive_temp, node->archive_name); + strcat(node->archive_temp, ".tmp"); + + node->archive_fp = fopen(node->archive_temp, "w"); + if (node->archive_fp == NULL) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not open archive file %s - %s", + node->archive_temp, strerror(errno)); return -1; - } else { - return 0; } + rc = fprintf(node->archive_fp, + "-- Slony-I log shipping archive\n" + "-- Node %d, Event %s\n" + "start transaction;\n", + node->no_id, seqbuf); + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not open archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; } -int close_log_archive () { + return 0; +} + + +static int +archive_close (SlonNode *node) +{ int rc = 0; - if (archive_dir) { - rc = fprintf(archive_fp, + + if (node->archive_fp != NULL) { + rc = fprintf(node->archive_fp, "\n------------------------------------------------------------------\n" "-- End Of Archive Log\n" "------------------------------------------------------------------\n" @@ -4861,59 +4842,135 @@ "vacuum analyze %s.sl_setsync_offline;\n", rtcfg_namespace); if ( rc < 0 ) + { + archive_terminate(node); + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not write to archive file %s - %s", + node->archive_temp, strerror(errno)); return -1; - rc = fclose(archive_fp); - archive_fp = NULL; + } + + rc = fclose(node->archive_fp); + node->archive_fp = NULL; if ( rc < 0 ) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not close archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; + } + + rc = rename(node->archive_temp, node->archive_name); + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not rename archive file %s to %s - %s", + node->archive_temp, node->archive_name, strerror(errno)); return -1; - rc = rename(archive_tmp, archive_name); } - return rc; } -int logarchive_tracking (const char *namespace, int sub_set, const char *firstseq, - const char *seqbuf, const char *timestamp) { - return fprintf(archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" - "-- end of log archiving header\n" - "------------------------------------------------------------------\n" - "-- start of Slony-I data\n" - "------------------------------------------------------------------\n", - namespace, sub_set, firstseq, seqbuf, timestamp); + return 0; } -int submit_query_to_archive(SlonDString *ds) { - return fprintf(archive_fp, "%s\n", ds->data); + +static void +archive_terminate (SlonNode *node) +{ + if (node->archive_fp != NULL) { + fclose(node->archive_fp); + node->archive_fp = NULL; + } } -int submit_string_to_archive (const char *s) { - return fprintf(archive_fp, "%s\n", s); + +static int +archive_append_ds (SlonNode *node, SlonDString *ds) +{ + int rc; + + rc = fprintf(node->archive_fp, "%s\n", dstring_data(ds)); + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not write to archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; } -/* Raw form used for COPY where we don't want any extra cr/lf output */ -int submit_raw_data_to_archive (const char *s) { - return fprintf(archive_fp, "%s", s); + return 0; } -void terminate_log_archive () { - if (archive_fp) { - fclose(archive_fp); + +static int +archive_append_str (SlonNode *node, const char *str) +{ + int rc; + + rc = fprintf(node->archive_fp, "%s\n", str); + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not write to archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; } + + return 0; } -int generate_archive_header (int node_id, const char *seqbuf) { - return fprintf(archive_fp, - "-- Slony-I log shipping archive\n" - "-- Node %d, Event %s\n" - "start transaction;\n", - node_id, seqbuf); + +static int +archive_append_data (SlonNode *node, const char *data, int len) { + int rc; + + rc = fwrite(data, len, 1, node->archive_fp); + if (rc != 1) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not write to archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; + } + + return 0; +} + + +static int +archive_tracking (SlonNode *node, const char *namespace, int sub_set, + const char *firstseq, const char *seqbuf, + const char *timestamp) +{ + int rc; + + rc = fprintf(node->archive_fp, "\nselect %s.setsyncTracking_offline(%d, '%s', '%s', '%s');\n" + "-- end of log archiving header\n" + "------------------------------------------------------------------\n" + "-- start of Slony-I data\n" + "------------------------------------------------------------------\n", + namespace, sub_set, firstseq, seqbuf, timestamp); + + if (rc < 0) + { + slon_log(SLON_ERROR, "remoteWorkerThread_%d: " + "Could not write to archive file %s - %s", + node->archive_temp, strerror(errno)); + return -1; } -/* write_void_log() writes out a "void" log consisting of the message + return 0; +} + + +/* archive_void_log() writes out a "void" log consisting of the message * which must either be a valid SQL query or a SQL comment. */ -void write_void_log (int node_id, char *seqbuf, const char *message) { - open_log_archive(node_id, seqbuf); - generate_archive_header(node_id, seqbuf); - submit_string_to_archive(message); - close_log_archive(); +static void +archive_void_log (SlonNode *node, char *seqbuf, const char *message) { + archive_open(node, seqbuf); + archive_append_str(node, message); + archive_close(node); } + + Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.48.2.1 retrieving revision 1.48.2.2 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.48.2.1 -r1.48.2.2 --- src/slon/slon.h +++ src/slon/slon.h @@ -106,6 +106,10 @@ SlonWorkMsg *message_head; SlonWorkMsg *message_tail; + char *archive_name; + char *archive_temp; + FILE *archive_fp; + SlonNode *prev; SlonNode *next; };
- Previous message: [Slony1-commit] By cbbrowne: Bug #1591 - large tuples in sl_log_2 not being found - The
- Next message: [Slony1-commit] By wieck: Fixed archive log writing by moving global variables into the
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list