Tue Dec 6 20:59:23 PST 2005
- Previous message: [Slony1-commit] By cbbrowne: Bug #1471 - petere - slon documentation "The reference
- Next message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- Restructuring of the sync_helper thread in preparation for fixing the out of memory problems on large rows. Jan Modified Files: -------------- slony1-engine/src/slon: remote_worker.c (r1.101 -> r1.102) slon.h (r1.55 -> r1.56) -------------- next part -------------- Index: remote_worker.c =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/remote_worker.c,v retrieving revision 1.101 retrieving revision 1.102 diff -Lsrc/slon/remote_worker.c -Lsrc/slon/remote_worker.c -u -w -r1.101 -r1.102 --- src/slon/remote_worker.c +++ src/slon/remote_worker.c @@ -4782,18 +4782,25 @@ SlonNode *node = wd->node; PGconn *dbconn; WorkerGroupLine *line = NULL; - int line_no; SlonDString query; - PGresult *res; - int ntuples; - int tupno; int errors; - WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE]; int alloc_lines = 0; struct timeval tv_start; struct timeval tv_now; int first_fetch; + WorkerGroupLine *data_line[SLON_DATA_FETCH_SIZE]; + int data_line_alloc; + int data_line_first; + int data_line_last; + + PGresult *res; + int ntuples; + int tupno; + + int line_no; + int line_ncmds; + dstring_init(&query); for (;;) @@ -4827,6 +4834,10 @@ dbconn = provider->conn->dbconn; pthread_mutex_unlock(&(provider->helper_lock)); + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: got work to do\n", + node->no_id, provider->no_id); + errors = 0; do { @@ -4858,6 +4869,7 @@ gettimeofday(&tv_start, NULL); first_fetch = true; + res = NULL; if (query_execute(node, dbconn, &query) < 0) { @@ -4865,74 +4877,137 @@ break; } + slon_mkquery(&query, "fetch %d from LOG; ", + SLON_DATA_FETCH_SIZE * SLON_COMMANDS_PER_LINE); + data_line_alloc = 0; + data_line_first = 0; + data_line_last = 0; + + res = NULL; + ntuples = 0; + tupno = 0; + + while (!errors) + { /* - * Now fetch the log data and forward it via the line pool to the - * main worker who pushes it into the local database. + * Deliver filled line buffers to the worker process. */ - alloc_lines = 0; - while (errors == 0) + if (data_line_last > data_line_first) { + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: deliver %d lines to worker\n", + node->no_id, provider->no_id, + data_line_last - data_line_first); + + pthread_mutex_lock(&(wd->workdata_lock)); + while (data_line_first < data_line_last) + { + DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail, + data_line[data_line_first]); + data_line_first++; + } + pthread_cond_signal(&(wd->repldata_cond)); + pthread_mutex_unlock(&(wd->workdata_lock)); + } + /* - * Allocate at least some lines - ideally the whole fetch - * size. + * If we cycled through all the allocated line buffers, + * reset the indexes. */ - while (alloc_lines == 0 && !errors) + if (data_line_first == data_line_alloc) { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: allocate lines\n", - node->no_id, provider->no_id); + data_line_alloc = 0; + data_line_first = 0; + data_line_last = 0; + } /* - * Wait until there are lines available in the pool. + * Make sure we are inside memory limits and that we + * have available line buffers. */ pthread_mutex_lock(&(wd->workdata_lock)); - while (wd->linepool_head == NULL && + if (data_line_alloc == 0 /* || oversize */) + { + /* + * First make sure that the overall memory usage is + * inside bouds. + */ + if (0 /* oversize */) + { + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: wait for oversize memory to free\n", + node->no_id, provider->no_id); + + while (/* oversize && */ wd->workgroup_status == SLON_WG_BUSY) { pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); } + if (wd->workgroup_status != SLON_WG_BUSY) + { + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: abort operation\n", + node->no_id, provider->no_id); + errors++; + break; + } + } /* - * If any error occured somewhere in the group, the main - * worker will set the status to ABORT. + * Second make sure that we have at least 1 line + * buffer. */ + if (data_line_alloc == 0) + { + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: allocate line buffers\n", + node->no_id, provider->no_id); + while (data_line_alloc == 0 && !errors) + { + while (wd->linepool_head == NULL && + wd->workgroup_status == SLON_WG_BUSY) + { + pthread_cond_wait(&(wd->linepool_cond), &(wd->workdata_lock)); + } if (wd->workgroup_status != SLON_WG_BUSY) { slon_log(SLON_DEBUG4, "remoteHelperThread_%d_%d: abort operation\n", node->no_id, provider->no_id); - pthread_mutex_unlock(&(wd->workdata_lock)); errors++; break; } /* - * So far so good. Fill our array of lines from the pool. + * While we are at it, we can as well allocate + * up to FETCH_SIZE buffers. */ - while (alloc_lines < SLON_DATA_FETCH_SIZE && + while (data_line_alloc < SLON_DATA_FETCH_SIZE && wd->linepool_head != NULL) { - data_line[alloc_lines] = wd->linepool_head; + data_line[data_line_alloc] = wd->linepool_head; DLLIST_REMOVE(wd->linepool_head, wd->linepool_tail, - data_line[alloc_lines]); - alloc_lines++; + data_line[data_line_alloc]); + data_line_alloc++; } - pthread_mutex_unlock(&(wd->workdata_lock)); } - - if (errors) - break; - - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: have %d line buffers\n", - node->no_id, provider->no_id, alloc_lines); + } + } + pthread_mutex_unlock(&(wd->workdata_lock)); /* - * Now that we have allocated some buffer space, try to fetch - * that many rows from the cursor. + * We are within memory limits and have allocated + * line buffers. Make sure that we have log lines + * fetched. */ - slon_mkquery(&query, "fetch %d from LOG; ", - alloc_lines * SLON_COMMANDS_PER_LINE); + if (tupno >= ntuples) + { + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: fetch from cursor\n", + node->no_id, provider->no_id); + if (res != NULL) + PQclear(res); + res = PQexec(dbconn, dstring_data(&query)); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -4940,7 +5015,6 @@ node->no_id, provider->no_id, dstring_data(&query), PQresultErrorMessage(res)); - PQclear(res); errors++; break; } @@ -4955,16 +5029,34 @@ first_fetch = false; } - /* - * Fill the line buffers with queries from the retrieved log - * rows. - */ - line_no = 0; ntuples = PQntuples(res); - slon_log(SLON_DEBUG3, - "remoteHelperThread_%d_%d: got %d log rows\n", + tupno = 0; + + slon_log(SLON_DEBUG4, + "remoteHelperThread_%d_%d: fetched %d log rows\n", node->no_id, provider->no_id, ntuples); - for (tupno = 0; tupno < ntuples; tupno++) + } + + /* + * If there are no more tuples, we're done + */ + if (ntuples == 0) + break; + + /* + * Now move tuples from the fetch result into the + * line buffers. + */ + line_no = data_line_last++; + line_ncmds = 0; + + line = data_line[line_no]; + line->code = SLON_WGLC_ACTION; + line->provider = provider; + dstring_reset(&(line->data)); + dstring_reset(&(line->log)); + + while (tupno < ntuples && line_no < data_line_alloc) { char *log_origin = PQgetvalue(res, tupno, 0); char *log_xid = PQgetvalue(res, tupno, 1); @@ -4974,14 +5066,7 @@ char *log_cmdtype = PQgetvalue(res, tupno, 4); char *log_cmddata = PQgetvalue(res, tupno, 5); - if (tupno % SLON_COMMANDS_PER_LINE == 0) - { - line = data_line[line_no++]; - line->code = SLON_WGLC_ACTION; - line->provider = provider; - dstring_reset(&(line->data)); - dstring_reset(&(line->log)); - } + tupno++; /* * This can happen if the table belongs to a set that @@ -4993,6 +5078,10 @@ wd->tab_fqname[log_tableid] == NULL) continue; + /* + * If we are forwarding this set, add the insert + * into sl_log_? + */ if (wd->tab_forward[log_tableid]) { slon_appendquery(&(line->log), @@ -5005,6 +5094,10 @@ log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata); } + + /* + * Add the actual replicating command to the line buffer + */ switch (*log_cmdtype) { case 'I': @@ -5028,62 +5121,63 @@ log_cmddata); break; } - } - PQclear(res); + line_ncmds++; - /* - * Now put all the line buffers back. Filled ones into the - * repldata, unused ones into the pool. - */ - pthread_mutex_lock(&(wd->workdata_lock)); - for (tupno = 0; tupno < alloc_lines; tupno++) + if (line_ncmds >= SLON_COMMANDS_PER_LINE) { - if (tupno < line_no) - DLLIST_ADD_TAIL(wd->repldata_head, wd->repldata_tail, - data_line[tupno]); - else - DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, - data_line[tupno]); - } - if (line_no > 0) - pthread_cond_signal(&(wd->repldata_cond)); - if (line_no < alloc_lines) - pthread_cond_broadcast(&(wd->linepool_cond)); - pthread_mutex_unlock(&(wd->workdata_lock)); + if (data_line_last >= data_line_alloc) + break; + line_no = data_line_last++; + + line = data_line[line_no]; + line->code = SLON_WGLC_ACTION; + line->provider = provider; + dstring_reset(&(line->data)); + dstring_reset(&(line->log)); - slon_log(SLON_DEBUG3, - "remoteHelperThread_%d_%d: %d log buffers delivered\n", - node->no_id, provider->no_id, line_no); + line_ncmds = 0; + } + } - if (line_no < alloc_lines) + /* + * Move one line back if we actually ran out of + * tuples on an exact SLON_COMMANDS_PER_LINE boundary. + */ + if (line_ncmds == 0) { - slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: no more log rows\n", - node->no_id, provider->no_id); - alloc_lines = 0; - break; - } - alloc_lines = 0; + data_line_last--; } + } /* Cursor returned EOF */ } while (0); /* * if there are still line buffers allocated, give them back. */ - if (alloc_lines > 0) + if (data_line_first < data_line_alloc) { slon_log(SLON_DEBUG4, - "remoteHelperThread_%d_%d: return unused line buffers\n", - node->no_id, provider->no_id); + "remoteHelperThread_%d_%d: return %d unused line buffers\n", + node->no_id, provider->no_id, + data_line_alloc - data_line_first); pthread_mutex_lock(&(wd->workdata_lock)); - while (alloc_lines > 0) + while (data_line_first < data_line_alloc) { - alloc_lines--; + data_line_alloc--; DLLIST_ADD_HEAD(wd->linepool_head, wd->linepool_tail, - data_line[alloc_lines]); + data_line[data_line_alloc]); } pthread_cond_broadcast(&(wd->linepool_cond)); pthread_mutex_unlock(&(wd->workdata_lock)); + + data_line_alloc = 0; + data_line_first = 0; + data_line_last = 0; + } + + if(res != NULL) + { + PQclear(res); + res = NULL; } /* Index: slon.h =================================================================== RCS file: /usr/local/cvsroot/slony1/slony1-engine/src/slon/slon.h,v retrieving revision 1.55 retrieving revision 1.56 diff -Lsrc/slon/slon.h -Lsrc/slon/slon.h -u -w -r1.55 -r1.56 --- src/slon/slon.h +++ src/slon/slon.h @@ -37,7 +37,7 @@ #else #define SLON_COMMANDS_PER_LINE 10 #define SLON_DATA_FETCH_SIZE 10 -#define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 50) +#define SLON_WORKLINES_PER_HELPER (SLON_DATA_FETCH_SIZE * 5) #endif #define SLON_MAX_PATH 1024
- Previous message: [Slony1-commit] By cbbrowne: Bug #1471 - petere - slon documentation "The reference
- Next message: [Slony1-commit] By cbbrowne: Remove pg_listener activity for event confirmations - this
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list