Tue Nov 30 23:38:52 PST 2004
- Previous message: [Slony1-commit] By cbbrowne: Added in some code to process syncs; still very rudimentary
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Log Message: ----------- The ongoing effort to understand log spooling... Added Files: ----------- slony1-engine/src/slonspool: gen_output.pm (r1.1) init.pm (r1.1) slonspool.pl (r1.1) subscribe.pm (r1.1) Removed Files: ------------- slony1-engine/src/slonspool: slonspool -------------- next part -------------- --- /dev/null +++ src/slonspool/subscribe.pm @@ -0,0 +1,92 @@ +#!/usr/bin/perl +# $Id: subscribe.pm,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $ +# Author: Christopher Browne +# Copyright 2004 Afilias Canada + +sub subscribe_to_node { + open(SUBSCRIBE, ">$spoolpath/subscription.log"); + foreach $set (@SETS) { + + # Create namespaces + print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n"; + my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;"); + while ( @row = $sth->fetchrow ) { + my ($namespace) = @row; + print SUBSCRIBE "create schema $namespace;\n"; + } + close(SUBSCRIBE); + + # Create tables + $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;"); + while ( @row = $sth->fetchrow ) { + my ($table) = @row; + `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`; + } + open(SUBSCRIBE, ">>$spoolpath/subscription.log"); + + # Pull data, as in copy_set (remote_worker.c) + my $query = "begin transaction; set transaction isolation level serializable;"; + $sth = $dbh->exec($query); + my $tquery = qq{ + select T.tab_id, + "pg_catalog".quote_ident(PGN.nspname) || '.' || + "pg_catalog".quote_ident(PGC.relname) as tab_fqname, + T.tab_idxname, T.tab_comment + from "$cluster".sl_table T, + "pg_catalog".pg_class PGC, + "pg_catalog".pg_namespace PGN + where T.tab_set = $set + and T.tab_reloid = PGC.oid + and PGC.relnamespace = PGN.oid + order by tab_id; + }; + } + $sth=$dbh->exec($tquery); + while (@row=$sth->fetchrow) { + my ($table) = @row; + print SUBSCRIBE qq{copy "$table" from stdin;\n}; + my $query = qq{copy "$table" to stdout;}; + $res = $dbh->exec($query); + my $line = "*" x 16384; + $ret = $dbh->getline($line, 16384); + while ($line ne "\\.") { + print SUBSCRIBE line, "\n"; + $ret = $dbh->getline($line, 16384); + } + print SUBSCRIBE "\.\n"; + } + close SUBSCRIBE; + my $seqquery = qq{ + select n.nspname, c.relname + from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s + where + n.oid = c.relnamespace and + c.oid = s.seq_reloid and + s.seq_set = $set;}; + $sth=$dbh->exec($seqquery); + while (my @row=$sth->fetchrow) { + my ($nsp, $seqname) = @row; + `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`; + } + # Next, populate Sync information + # Use the last SYNC's snapshot information and set + # the action sequence list to all actions after + # that. + + my $squery = qq{ + select ssy_seqno, ssy_minxid, ssy_maxxid, + ssy_xip, ssy_action_list + from "$cluster".sl_setsync + where ssy_setid = $set; }; + $sth=$dbh->exec($squery); + while (my @row=$sth->fetchrow) { + my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row; + } + my $createsync = qq{ + insert into "_$cluster".sl_setsync + (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list) + values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');}; + print SUBSCRIBE $createsync, "\n"; +} + +1; --- /dev/null +++ src/slonspool/slonspool.pl @@ -0,0 +1,185 @@ +#!/usr/bin/perl +# $Id: slonspool.pl,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $ +# Author: Christopher Browne +# Copyright 2004 Afilias Canada + +use Pg; +use Getopt::Long; +require "subscribe.pm"; +require "gen_output.pm"; +require "init.pm"; + +my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, + $maxsize, $maxage, $subnode, $node); + +my @SETS; +my $dbh; +process_options(); +initialize_configuration(); +#subscribe_to_node(); + +while (1) { + listen_to_node(); +} + +sub listen_to_node { + while (1) { + process_event(); + die -1; + } +} + +sub process_event { + my $dsn = "dbname=$database host=$host port=$port user=$user"; + if ($password) { + $dsn .= " password=$password"; + } + print "DSN: $dsn\n"; + my $dbh = Pg::connectdb($dsn); + print "Last err:", $dbh->errorMessage, "\n"; + my $sync_event; + my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm + where con_origin = $node order by con_seqno desc limit 1;}; + print $last_seq, "\n"; + my $res = $dbh->exec($last_seq); + while (my @row = $res->fetchrow) { + ($sync_event) = @row; + print "Last sync: $sync_event\n"; + } + print "Last err:", $dbh->errorMessage, "\n"; + $sync_event++; + print "Next sync: $sync_event\n"; + + my @ORIGINS; + my $origin_query = qq{ select set_origin from "_$cluster".sl_set where set_id in ($opt_sets); }; + $res = $dbh->exec($origin_query); + while (my @row = $res->fetchrow) { + my ($origin) = @row; + push @ORIGINS, $origin; + } + $origin_qualification = " (log_origin in (" . join(',', @ORIGINS) . ")) "; + + + my $table_qualification = " (log_tableid in (" . join(',', @TABLES) . ")) "; + + print "Table qualification: $table_qualification\n"; + my $qualification .= " $origin_qualification and $table_qualification "; + + my $cursor_query = qq{ + declare LOG cursor for + select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata + from "_$cluster".sl_log_1 + where $origin_qualification and $table_qualification + order by log_xid, log_actionseq;}; + + print "Cursor query: $cursor_query\n"; + + my $lastxid = ""; + my $syncname=sprintf("log-%08d", $sync); + open(LOGOUTPUT, ">$spoolpath/$syncname"); + print LOGOUTPUT "-- Data for sync $sync_event\n"; + print LOGOUTPUT "-- ", `date`; + my $begin = $dbh->exec("begin;"); + my $cursorexec = $dbh->exec($cursor_query); + print "Last err:", $dbh->errorMessage, "\n"; + my $foundsome = "YES"; + while ($foundsome eq "YES") { + $foundsome = "NO"; + my $res = $dbh->exec("fetch forward 100 in LOG;"); + while (my @row = $res->fetchrow) { + $foundsome = "YES"; + my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row; + if ($xid ne $lastxid) { # changed xid - report that... + if ($lastxid ne "") { # Do nothing first time around... + printf LOGOUTPUT "COMMIT; -- Done xid $lastxid\n"; + } + print LOGOUTPUT "BEGIN;\nselect fail_if_xid_applied($xid);\n"; + $lastxid = $xid; + } + if ($cmdtype eq "I") { + printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata; + } elsif ($cmdtype eq "U") { + printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata; + } elsif ($cmdtype eq "D") { + printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata; + } else { + print LOGOUTPUT "problem: cmddata not in (I,U,D) = [$cmdtype]\n"; + } + } + } + if ($lastxid ne "") { + print LOGOUTPUT "COMMIT; -- Done xid $lastxid\n"; + } + close LOGOUTPUT; + $dbh->exec("rollback;"); + my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp) + values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); }; + print "Confirm: $confirmation\n"; + my $cursorexec = $dbh->exec($confirmation); +} + +sub connect_to_node { + my $dsn = "dbname=$database host=$host port=$port user=$user"; + if ($password) { + $dsn .= " password=$password"; + } + $dbh = Pg::connectdb($dsn); +} + +sub process_options { + + $goodopts = GetOptions("help", "database=s", "host=s", "user=s", + "cluster=s", "password=s", "port=s", "sets=s", + "spoolpath=s", "spoolname=s", "pgbins=s", + "maxsize=i", "maxage=i", "node=i", "subnode=i"); + + if (defined ($opt_help)) { + show_usage(); + } + + $cluster=$opt_cluster if (defined($opt_cluster)); + $subnode = $opt_subnode if (defined ($opt_subnode)); + $node = $opt_node if (defined($opt_node)); + $database=$opt_database if (defined ($opt_database)); + $user = $opt_user if (defined ($opt_user)); + $host = $opt_host if (defined($opt_host)); + $password = $opt_password if (defined($opt_password)); + $port = $opt_port if (defined($opt_port)); + $pgbins = $opt_pgbins if (defined($opt_pgbins)); + $spoolpath = $opt_spoolpath if (defined($opt_spoolpath)); + $spoolname = $opt_spoolname if (defined($opt_spoolname)); + if (defined($opt_sets)) { + @SETS=split (/,/, $opt_sets); + } + if (defined($opt_maxsize)){ + $maxsize = $opt_maxsize; + } else { + $maxsize = 10000; + } + if (defined($opt_maxage)){ + $maxsize = $opt_maxage; + } else { + $maxage = 300; + } +} + +sub show_usage { + print qq{slonspool: + --help get help + --cluster=s Slony-I cluster name + --subnode=s Node number subscribed through + --node=i Node number to use to request + --pgbins=s Location of PostgreSQL binaries including slonik and pg_dump + --database=s database to connect to + --host=s host for database + --user=s user for database + --password=s password for database (you should probably use .pgpass instead) + --port=i port number to connect to + --sets=s Sets to replicate (comma-delimited) - e.g --sets=1,2,4 + --spoolpath=s directory in which to spool output + --spoolname=s naming convention for spoolfiles + --maxsize=i maximum size of spool files, in kB - default =10000KB + --maxage=i maximum age of spool files in seconds - default 300 +}; + die -1; +} --- /dev/null +++ src/slonspool/gen_output.pm @@ -0,0 +1,3 @@ +#!/usr/bin/perl + +1; --- src/slonspool/slonspool +++ /dev/null @@ -1,263 +0,0 @@ -#!perl - -use Pg; -use Getopt::Long; -my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $subnode, $node); -my @SETS; -my $dbh; -process_options(); -connect_to_node(); -subscribe_to_node(); - -while (1) { - listen_to_node(); -} - -sub subscribe_to_node { - open(SUBSCRIBE, ">$spoolpath/subscription.log"); - foreach $set (@SETS) { - - # Create namespaces - print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n"; - my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;"); - while ( @row = $sth->fetchrow ) { - my ($namespace) = @row; - print SUBSCRIBE "create schema $namespace;\n"; - } - close(SUBSCRIBE); - - # Create tables - $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;"); - while ( @row = $sth->fetchrow ) { - my ($table) = @row; - `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`; - } - open(SUBSCRIBE, ">>$spoolpath/subscription.log"); - - # Pull data, as in copy_set (remote_worker.c) - my $query = "start transaction; set transaction isolation level serializable; "; - $sth = $dbh->exec($query); - my $tquery = qq{ - select T.tab_id, - "pg_catalog".quote_ident(PGN.nspname) || '.' || - "pg_catalog".quote_ident(PGC.relname) as tab_fqname, - T.tab_idxname, T.tab_comment - from "$cluster".sl_table T, - "pg_catalog".pg_class PGC, - "pg_catalog".pg_namespace PGN - where T.tab_set = $set - and T.tab_reloid = PGC.oid - and PGC.relnamespace = PGN.oid - order by tab_id; - }; - } - $sth=$dbh->exec($tquery); - while (@row=$sth->fetchrow) { - my ($table) = @row; - print SUBSCRIBE qq{copy "$table" from stdin;\n}; - my $query = qq{copy "$table" to stdout;}; - $res = $dbh->exec($query); - my $line = "*" x 16384; - $ret = $dbh->getline($line, 16384); - while ($line ne "\\.") { - print SUBSCRIBE line, "\n"; - $ret = $dbh->getline($line, 16384); - } - print SUBSCRIBE "\.\n"; - } - close SUBSCRIBE; - my $seqquery = qq{ - select n.nspname, c.relname - from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s - where - n.oid = c.relnamespace and - c.oid = s.seq_reloid and - s.seq_set = $set;}; - $sth=$dbh->exec($seqquery); - while (my @row=$sth->fetchrow) { - my ($nsp, $seqname) = @row; - `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`; - } - # Next, populate Sync information - # Use the last SYNC's snapshot information and set - # the action sequence list to all actions after - # that. - - my $squery = qq{ - select ssy_seqno, ssy_minxid, ssy_maxxid, - ssy_xip, ssy_action_list - from "$cluster".sl_setsync - where ssy_setid = $set; }; - $sth=$dbh->exec($squery); - while (my @row=$sth->fetchrow) { - my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row; - } - my $createsync = qq{ - insert into "_$cluster".sl_setsync - (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list) - values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');}; - print SUBSCRIBE $createsync, "\n"; -} - -sub listen_to_node { - while (1) { - process_event(); - sleep 2; - } -} - -sub process_event { - my $dsn = "dbname=$database host=$host port=$port user=$user"; - if ($password) { - $dsn .= " password=$password"; - } - my $dbh = Pg::connectdb($dsn); - print "Result:", $dbh->status, " OK=", PGRES_CONNECTION_OK, "\n"; - my $sync_event; - my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm - where con_origin = $node order by con_seqno desc limit 1;}; - print $last_seq, "\n"; - my $res = $dbh->exec($last_seq); - while (my @row = $res->fetchrow) { - ($sync_event) = @row; - print "Last sync: $sync_event\n"; - } - $sync_event++; - print "Next sync: $sync_event\n"; - - my $get_tables = qq{ select tab_id from "_$cluster".sl_table where tab_set in ($sets); }; - my $origin_qualification = " (log_origin = $sub_node) "; - my $table_qualification = " ( log_tableid in ("; - my $res = $dbh->exec($get_tables); - my @TABLES; - while (my @row=$sth->fetchrow) { - my ($table_id) = @row; - push @TABLES, $table_id; - } - $table_qualification .= join(',', @TABLES); - $table_qualification .= "))"; - my $qualification .= " $origin_qualification and $table_qualification "; - my $get_event_info = qq{select ev_minxid, ev_maxxid, ev_xip from "_$cluster".sl_event where ev_seqno = $sync_event;}; - my $res = $dbh->exec($get_event_info); - while (my @row=$res->fetchrow) { - my ($minxid, $maxxid, $ev_zip) = @row; - if ($ev_zip) { - $ev_zip = s/'//g; # Strip off unnecessary quotes - $qualification .= "and ($log_xid < '$maxxid' and \"_$cluster\".xxid_lt_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))"; - $qualification .= "and ($log_xid >= '$minxid' and \"_$cluster\".xxid_ge_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))"; - } else { - $qualification .= "and ($log_xid < '$maxxid') "; - $qualification .= "and ($log_xid >= '$minxid') "; - } - } - - my $tables_query = qq{t.tab_id, t.tab_reloid, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace;}; - $res = $dbh->exec($tables_query); - while (my @row = $res->fetchrow) { - my ($id, $oid, $namespace, $tname) = @row; - $TABLENAME[$i] = qq{"$namespace".$tname}; - } - - my $cursor_query = qq{ - declare LOG cursor for - select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata - from "_$cluster".sl_log_1 - where $qualification - order by log_actionseq;}; - - - my $syncname=sprintf("log-%8d", $sync_event); - open(LOGOUTPUT, ">$spoolpath/$syncname"); - print LOGOUTPUT "-- Data for sync $sync_event\n"; - print LOGOUTPUT "-- ", `date`; - print LOGOUTPUT "BEGIN;\n"; - my $begin = $dbh->exec("begin;"); - my $cursorexec = $dbh->exec($cursor_query); - my $foundsome = "YES"; - while ($foundsome eq "YES") { - $foundsome = "NO"; - my $res = $dbh->exec("fetch forward 100 in LOG;"); - while (my @row = $res->fetchrow) { - $foundsome = "YES"; - my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row; - if ($cmddata eq "I") { - printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata; - } elsif ($cmddata eq "U") { - printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata; - } elsif ($cmddata eq "D") { - printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata; - } - } - } - close LOGOUTPUT; - my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp) - values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); }; - print "Confirm: $confirmation\n"; - my $cursorexec = $dbh->exec($confirmation); -} - -sub connect_to_node { - my $dsn = "dbname=$database host=$host port=$port user=$user"; - if ($password) { - $dsn .= " password=$password"; - } - $dbh = Pg::connectdb($dsn); -} - -sub process_options { - - $goodopts = GetOptions("help", "database=s", "host=s", "user=s", - "cluster=s", "password=s", "port=s", "sets=s", - "spoolpath=s", "spoolname=s", "pgbins=s", - "maxsize=i", "maxage=i", "node=i", "subnode=i"); - - if (defined ($opt_help)) { - show_usage(); - } - - $cluster=$opt_cluster if (defined($opt_cluster)); - $subnode = $opt_subnode if (defined ($opt_subnode)); - $node = $opt_node if (defined($opt_node)); - $database=$opt_database if (defined ($opt_database)); - $user = $opt_user if (defined ($opt_user)); - $host = $opt_host if (defined($opt_host)); - $password = $opt_password if (defined($opt_password)); - $port = $opt_port if (defined($opt_port)); - $pgbins = $opt_pgbins if (defined($opt_pgbins)); - $spoolpath = $opt_spoolpath if (defined($opt_spoolpath)); - $spoolname = $opt_spoolname if (defined($opt_spoolname)); - if (defined($opt_sets)) { - @SETS=split (/,/, $opt_sets); - } - if (defined($opt_maxsize)){ - $maxsize = $opt_maxsize; - } else { - $maxsize = 10000; - } - if (defined($opt_maxage)){ - $maxsize = $opt_maxage; - } else { - $maxage = 300; - } -} - -sub show_usage { - print qq{slonspool: - --help get help - --cluster=s Slony-I cluster name - --subnode=s Node number subscribed through - --node=i Node number to use to request - --pgbins=s Location of PostgreSQL binaries including slonik and pg_dump - --database=s database to connect to - --host=s host for database - --user=s user for database - --password=s password for database (you should probably use .pgpass instead) - --port=i port number to connect to - --sets=s Sets to replicate (comma-delimited) - e.g --sets=1,2,4 - --spoolpath=s directory in which to spool output - --spoolname=s naming convention for spoolfiles - --maxsize=i maximum size of spool files, in kB - default =10000KB - --maxage=i maximum age of spool files in seconds - default 300 -}; - die -1; -} --- /dev/null +++ src/slonspool/init.pm @@ -0,0 +1,61 @@ +#!/usr/bin/perl +# $Id: init.pm,v 1.1 2004/11/30 23:38:40 cbbrowne Exp $ + +# Data structures... +# %NODES{}{} +# Fields: +# $NODE{$i}{last_event} - Last event processed for node +# +# %SET{}{} +# Fields: +# $SET{$i}{origin} - origin node +# $SET{$i}{comment} - Comment about set +# $SET{$i}{provider} - node that provides data to our favorite node +# +# %TABLES +# $TABLES{$i}{name} +# $TABLES{$i}{namespace} +# $TABLES{$i}{set} + +# Populate latest information about subscription providers and such... +sub load_configuration { + my $dsn = "dbname=$database host=$host port=$port user=$user"; + if ($password) { + $dsn .= " password=$password"; + } + $dbh = Pg::connectdb($dsn); + + # Populate %NODE with confirmation information + my $confirm_query = qq{ select con_origin, con_seqno from "_$cluster".sl_confirm where received = $node; }; + my $res = $dbh->exec($confirm_query); + while (my @row = $res->fetchrow) { + my ($origin, $sync) = @row; + if ($NODE{$origin}{last_event} < $sync) { + $NODE{$origin}{last_event} = $sync; + } + } + + # Populate %SET with set info for the sets being handled + my $sub_set_query = qq{ select set_id, set_origin from "_$cluster".sl_set where set_id in ($opt_sets);}; + my $res = $dbh->exec($confirm_query); + while (my @row = $res->fetchrow) { + my ($set, $origin) = @row; + $SET{$set}{origin} = $origin; + } + + my $tables_query = qq{select t.tab_id, t.tab_set, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace and tab_set in ($opt_sets) ;}; + $res = $dbh->exec($tables_query); + while (my @row = $res->fetchrow) { + my ($id, $set, $namespace, $tname) = @row; + $TABLES{$id}{name} = $tname; + $TABLES{$id}{namespace} = $namespace; + $TABLES{$id}{set} = $set; + } +} + +sub storeNode { + my ($id, $comment) = @_; + $NODES[$id] = $comment; +} + +1;
- Previous message: [Slony1-commit] By cbbrowne: Added in some code to process syncs; still very rudimentary
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the Slony1-commit mailing list