CVS User Account cvsuser
Tue Nov 30 23:38:52 PST 2004
Log Message:
-----------
The ongoing effort to understand log spooling...

Added Files:
-----------
    slony1-engine/src/slonspool:
        gen_output.pm (r1.1)
        init.pm (r1.1)
        slonspool.pl (r1.1)
        subscribe.pm (r1.1)

Removed Files:
-------------
    slony1-engine/src/slonspool:
        slonspool

-------------- next part --------------
--- /dev/null
+++ src/slonspool/subscribe.pm
@@ -0,0 +1,92 @@
+#!/usr/bin/perl
+# $Id: subscribe.pm,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $
+# Author: Christopher Browne
+# Copyright 2004 Afilias Canada
+
+sub subscribe_to_node {
+  open(SUBSCRIBE, ">$spoolpath/subscription.log");
+  foreach $set (@SETS) {
+
+    # Create namespaces
+    print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
+    my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+    while ( @row = $sth->fetchrow ) {
+      my ($namespace) = @row;
+      print SUBSCRIBE "create schema $namespace;\n";
+    }
+    close(SUBSCRIBE);
+
+    # Create tables
+    $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
+    while ( @row = $sth->fetchrow ) {
+      my ($table) = @row;
+      `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
+    }
+    open(SUBSCRIBE, ">>$spoolpath/subscription.log");
+
+    # Pull data, as in copy_set (remote_worker.c)
+    my $query = "begin transaction; set transaction isolation level serializable;";
+    $sth = $dbh->exec($query);
+    my $tquery = qq{
+		    select T.tab_id,
+		    "pg_catalog".quote_ident(PGN.nspname) || '.' ||
+		    "pg_catalog".quote_ident(PGC.relname) as tab_fqname,
+		    T.tab_idxname, T.tab_comment
+		    from "$cluster".sl_table T,
+		    "pg_catalog".pg_class PGC,
+		    "pg_catalog".pg_namespace PGN
+		    where T.tab_set = $set
+		    and T.tab_reloid = PGC.oid
+		    and PGC.relnamespace = PGN.oid
+		    order by tab_id;
+		   };
+  }
+  $sth=$dbh->exec($tquery);
+  while (@row=$sth->fetchrow) {
+    my ($table) = @row;
+    print SUBSCRIBE qq{copy "$table" from stdin;\n};
+    my $query = qq{copy "$table" to stdout;};
+    $res = $dbh->exec($query);
+    my $line = "*" x 16384;
+    $ret = $dbh->getline($line, 16384);
+    while ($line ne "\\.") {
+      print SUBSCRIBE line, "\n";
+      $ret = $dbh->getline($line, 16384);
+    }
+    print SUBSCRIBE "\.\n";
+  }
+  close SUBSCRIBE;
+  my $seqquery = qq{
+    select n.nspname, c.relname
+    from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
+    where
+      n.oid = c.relnamespace and
+      c.oid = s.seq_reloid and
+      s.seq_set = $set;};
+  $sth=$dbh->exec($seqquery);
+  while (my @row=$sth->fetchrow) {
+    my ($nsp, $seqname) = @row;
+    `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
+  }
+  # Next, populate Sync information
+  # Use the last SYNC's snapshot information and set
+  # the action sequence list to all actions after
+  # that.
+
+  my $squery = qq{
+    select ssy_seqno, ssy_minxid, ssy_maxxid,
+           ssy_xip, ssy_action_list
+    from "$cluster".sl_setsync
+    where ssy_setid = $set; };
+  $sth=$dbh->exec($squery);
+  while (my @row=$sth->fetchrow) {
+    my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
+  }
+  my $createsync = qq{
+    insert into "_$cluster".sl_setsync
+	   (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
+    values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
+  print SUBSCRIBE $createsync, "\n";
+}
+
+1;
--- /dev/null
+++ src/slonspool/slonspool.pl
@@ -0,0 +1,185 @@
+#!/usr/bin/perl
+# $Id: slonspool.pl,v 1.1 2004/11/30 23:38:41 cbbrowne Exp $
+# Author: Christopher Browne
+# Copyright 2004 Afilias Canada
+
+use Pg;
+use Getopt::Long;
+require "subscribe.pm";
+require "gen_output.pm";
+require "init.pm";
+
+my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname,
+   $maxsize, $maxage, $subnode, $node);
+
+my @SETS;
+my $dbh;
+process_options();
+initialize_configuration();
+#subscribe_to_node();
+
+while (1) {
+  listen_to_node();
+}
+
+sub listen_to_node {
+  while (1) {
+    process_event();
+    die -1;
+  }
+}
+
+sub process_event {
+  my $dsn = "dbname=$database host=$host port=$port user=$user";
+  if ($password) {
+    $dsn .= " password=$password";
+  }
+  print "DSN: $dsn\n";
+  my $dbh = Pg::connectdb($dsn);
+  print "Last err:", $dbh->errorMessage, "\n";
+  my $sync_event;
+  my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
+                    where con_origin = $node order by con_seqno desc limit 1;};
+  print $last_seq, "\n";
+  my $res = $dbh->exec($last_seq);
+  while (my @row = $res->fetchrow) {
+    ($sync_event) = @row;
+    print "Last sync: $sync_event\n";
+  }
+  print "Last err:", $dbh->errorMessage, "\n";
+  $sync_event++;
+  print "Next sync: $sync_event\n";
+
+  my @ORIGINS;
+  my $origin_query = qq{ select set_origin from "_$cluster".sl_set where set_id in ($opt_sets); };
+  $res = $dbh->exec($origin_query);
+  while (my @row = $res->fetchrow) {
+    my ($origin) = @row;
+    push @ORIGINS, $origin;
+  }
+  $origin_qualification = " (log_origin in (" . join(',', @ORIGINS) . ")) ";
+
+
+  my $table_qualification = " (log_tableid in (" . join(',', @TABLES) . ")) ";
+
+  print "Table qualification: $table_qualification\n";
+  my $qualification .= " $origin_qualification and $table_qualification ";
+
+  my $cursor_query = qq{
+  declare LOG cursor for
+    select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
+    from "_$cluster".sl_log_1
+    where $origin_qualification and $table_qualification
+    order by log_xid, log_actionseq;};
+
+  print "Cursor query: $cursor_query\n";
+
+  my $lastxid = "";
+  my $syncname=sprintf("log-%08d", $sync);
+  open(LOGOUTPUT, ">$spoolpath/$syncname");
+  print LOGOUTPUT "-- Data for sync $sync_event\n";
+  print LOGOUTPUT "-- ", `date`;
+  my $begin = $dbh->exec("begin;");
+  my $cursorexec = $dbh->exec($cursor_query);
+  print "Last err:", $dbh->errorMessage, "\n";
+  my $foundsome = "YES";
+  while ($foundsome eq "YES") {
+    $foundsome = "NO";
+    my $res = $dbh->exec("fetch forward 100 in LOG;");
+    while (my @row = $res->fetchrow) {
+      $foundsome = "YES";
+      my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
+      if ($xid ne $lastxid) { # changed xid - report that...
+	if ($lastxid ne "") {  # Do nothing first time around...
+	  printf LOGOUTPUT "COMMIT; -- Done xid $lastxid\n";
+	}
+	print LOGOUTPUT "BEGIN;\nselect fail_if_xid_applied($xid);\n";
+	$lastxid = $xid;
+      }
+      if ($cmdtype eq "I") {
+	printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
+      } elsif ($cmdtype eq "U") {
+	printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
+      } elsif ($cmdtype eq "D") {
+	printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
+      } else {
+	print LOGOUTPUT "problem: cmddata not in (I,U,D) = [$cmdtype]\n";
+      }
+    }
+  }
+  if ($lastxid ne "") {
+    print LOGOUTPUT "COMMIT; -- Done xid $lastxid\n";
+  }
+  close LOGOUTPUT;
+  $dbh->exec("rollback;");
+  my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
+                         values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
+  print "Confirm: $confirmation\n";
+  my $cursorexec = $dbh->exec($confirmation);
+}
+
+sub connect_to_node {
+  my $dsn = "dbname=$database host=$host port=$port user=$user";
+  if ($password) {
+    $dsn .= " password=$password";
+  }
+  $dbh = Pg::connectdb($dsn);
+}
+
+sub process_options {
+
+  $goodopts = GetOptions("help", "database=s", "host=s", "user=s",
+			 "cluster=s", "password=s", "port=s", "sets=s",
+			 "spoolpath=s", "spoolname=s", "pgbins=s",
+			 "maxsize=i", "maxage=i", "node=i", "subnode=i");
+
+  if (defined ($opt_help)) {
+    show_usage();
+  }
+
+  $cluster=$opt_cluster if (defined($opt_cluster));
+  $subnode = $opt_subnode if (defined ($opt_subnode));
+  $node = $opt_node if (defined($opt_node));
+  $database=$opt_database if (defined ($opt_database));
+  $user = $opt_user if (defined ($opt_user));
+  $host = $opt_host if (defined($opt_host));
+  $password = $opt_password if (defined($opt_password));
+  $port = $opt_port if (defined($opt_port));
+  $pgbins = $opt_pgbins if (defined($opt_pgbins));
+  $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
+  $spoolname   = $opt_spoolname   if (defined($opt_spoolname));
+  if (defined($opt_sets)) {
+    @SETS=split (/,/, $opt_sets);
+  }
+  if (defined($opt_maxsize)){
+    $maxsize = $opt_maxsize;
+  } else {
+    $maxsize = 10000;
+  }
+  if (defined($opt_maxage)){
+    $maxsize = $opt_maxage;
+  } else {
+    $maxage = 300;
+  }
+}
+
+sub show_usage {
+  print qq{slonspool:
+     --help                get help
+     --cluster=s           Slony-I cluster name
+     --subnode=s   Node number subscribed through
+     --node=i              Node number to use to request
+     --pgbins=s            Location of PostgreSQL binaries including slonik and pg_dump
+     --database=s          database to connect to
+     --host=s              host for database
+     --user=s              user for database
+     --password=s          password for database (you should probably use .pgpass instead)
+     --port=i              port number to connect to
+     --sets=s              Sets to replicate (comma-delimited) - e.g --sets=1,2,4 
+     --spoolpath=s         directory in which to spool output
+     --spoolname=s         naming convention for spoolfiles
+     --maxsize=i           maximum size of spool files, in kB - default =10000KB
+     --maxage=i            maximum age of spool files in seconds - default 300
+};
+  die -1;
+}
--- /dev/null
+++ src/slonspool/gen_output.pm
@@ -0,0 +1,3 @@
+#!/usr/bin/perl
+
+1;
--- src/slonspool/slonspool
+++ /dev/null
@@ -1,263 +0,0 @@
-#!perl
-
-use Pg;
-use Getopt::Long;
-my($database,$user, $host, $cluster, $password, $port, $spoolpath, $spoolname, $maxsize, $maxage, $subnode, $node);
-my @SETS;
-my $dbh;
-process_options();
-connect_to_node();
-subscribe_to_node();
-
-while (1) {
-  listen_to_node();
-}
-
-sub subscribe_to_node {
-  open(SUBSCRIBE, ">$spoolpath/subscription.log");
-  foreach $set (@SETS) {
-
-    # Create namespaces
-    print SUBSCRIBE "-- Subscribing node $node to set $set on $host - $port/$database/$user\n";
-    my $sth = $dbh->exec("select distinct nspname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
-    while ( @row = $sth->fetchrow ) {
-      my ($namespace) = @row;
-      print SUBSCRIBE "create schema $namespace;\n";
-    }
-    close(SUBSCRIBE);
-
-    # Create tables
-    $sth = $dbh->exec("select nspname || '.' || relname from pg_class p, pg_namespace n, _oxrsorg.sl_table t where t.tab_reloid = p.oid and p.relnamespace = n.oid and tab_set = $set;");
-    while ( @row = $sth->fetchrow ) {
-      my ($table) = @row;
-      `$pgbins/pg_dump -p $port -h $host -U $user -t $table -s $database >> $spoolpath/subscription.log`;
-    }
-    open(SUBSCRIBE, ">>$spoolpath/subscription.log");
-
-    # Pull data, as in copy_set (remote_worker.c)
-    my $query = "start transaction; set transaction isolation level serializable; ";
-    $sth = $dbh->exec($query);
-    my $tquery = qq{
-		    select T.tab_id,
-		    "pg_catalog".quote_ident(PGN.nspname) || '.' ||
-		    "pg_catalog".quote_ident(PGC.relname) as tab_fqname,
-		    T.tab_idxname, T.tab_comment
-		    from "$cluster".sl_table T,
-		    "pg_catalog".pg_class PGC,
-		    "pg_catalog".pg_namespace PGN
-		    where T.tab_set = $set
-		    and T.tab_reloid = PGC.oid
-		    and PGC.relnamespace = PGN.oid
-		    order by tab_id;
-		   };
-  }
-  $sth=$dbh->exec($tquery);
-  while (@row=$sth->fetchrow) {
-    my ($table) = @row;
-    print SUBSCRIBE qq{copy "$table" from stdin;\n};
-    my $query = qq{copy "$table" to stdout;};
-    $res = $dbh->exec($query);
-    my $line = "*" x 16384;
-    $ret = $dbh->getline($line, 16384);
-    while ($line ne "\\.") {
-      print SUBSCRIBE line, "\n";
-      $ret = $dbh->getline($line, 16384);
-    }
-    print SUBSCRIBE "\.\n";
-  }
-  close SUBSCRIBE;
-  my $seqquery = qq{
-    select n.nspname, c.relname
-    from "pg_catalog".pg_class c, "pg_catalog".pg_namespace, "$cluster".sl_sequence s
-    where
-      n.oid = c.relnamespace and
-      c.oid = s.seq_reloid and
-      s.seq_set = $set;};
-  $sth=$dbh->exec($seqquery);
-  while (my @row=$sth->fetchrow) {
-    my ($nsp, $seqname) = @row;
-    `$pgbins/pg_dump -p $port -h $host -U $user -n $nsp -t $seqname $database >> $spoolpath/subscription.log`;
-  }
-  # Next, populate Sync information
-  # Use the last SYNC's snapshot information and set
-  # the action sequence list to all actions after
-  # that.
-
-  my $squery = qq{
-    select ssy_seqno, ssy_minxid, ssy_maxxid,
-           ssy_xip, ssy_action_list
-    from "$cluster".sl_setsync
-    where ssy_setid = $set; };
-  $sth=$dbh->exec($squery);
-  while (my @row=$sth->fetchrow) {
-    my ($seqno, $minxid, $maxxid, $xip, $actionlist) = @row;
-  }
-  my $createsync = qq{
-    insert into "_$cluster".sl_setsync
-	   (ssy_setid, ssy_origin, ssy_seqno, ssy_minxid, ssy_maxxid, ssy_xip, ssy_action_list)
-    values ($set, $node, $seqno, $minxid, $maxxid, '$xip', '$actionlist');};
-  print SUBSCRIBE $createsync, "\n";
-}
-
-sub listen_to_node {
-  while (1) {
-    process_event();
-    sleep 2;
-  }
-}
-
-sub process_event {
-  my $dsn = "dbname=$database host=$host port=$port user=$user";
-  if ($password) {
-    $dsn .= " password=$password";
-  }
-  my $dbh = Pg::connectdb($dsn);
-  print "Result:",  $dbh->status, " OK=", PGRES_CONNECTION_OK, "\n";
-  my $sync_event;
-  my $last_seq = qq{select con_seqno from "_$cluster".sl_confirm
-                    where con_origin = $node order by con_seqno desc limit 1;};
-  print $last_seq, "\n";
-  my $res = $dbh->exec($last_seq);
-  while (my @row = $res->fetchrow) {
-    ($sync_event) = @row;
-    print "Last sync: $sync_event\n";
-  }
-  $sync_event++;
-  print "Next sync: $sync_event\n";
-
-  my $get_tables = qq{ select tab_id from "_$cluster".sl_table where tab_set in ($sets); };
-  my $origin_qualification = " (log_origin = $sub_node) ";
-  my $table_qualification = " ( log_tableid in (";
-  my $res = $dbh->exec($get_tables);
-  my @TABLES;
-  while (my @row=$sth->fetchrow) {
-    my ($table_id) = @row;
-    push @TABLES, $table_id;
-  }
-  $table_qualification .= join(',', @TABLES);
-  $table_qualification .= "))";
-  my $qualification .= " $origin_qualification and $table_qualification ";
-  my $get_event_info = qq{select ev_minxid, ev_maxxid, ev_xip from "_$cluster".sl_event where ev_seqno = $sync_event;};
-  my $res = $dbh->exec($get_event_info);
-  while (my @row=$res->fetchrow) {
-    my ($minxid, $maxxid, $ev_zip) = @row;
-    if ($ev_zip) {
-      $ev_zip = s/'//g;  # Strip off unnecessary quotes
-      $qualification .= "and ($log_xid < '$maxxid' and \"_$cluster\".xxid_lt_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
-      $qualification .= "and ($log_xid >= '$minxid' and \"_$cluster\".xxid_ge_snapshot(log_xid, '$minxid:$maxxid:$ev_zip'))";
-    } else {
-      $qualification .= "and ($log_xid < '$maxxid') ";
-      $qualification .= "and ($log_xid >= '$minxid') ";
-    }
-  }
-
-  my $tables_query = qq{t.tab_id, t.tab_reloid, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace;};
-  $res = $dbh->exec($tables_query);
-  while (my @row = $res->fetchrow) {
-    my ($id, $oid, $namespace, $tname) = @row;
-    $TABLENAME[$i] = qq{"$namespace".$tname};
-  }
-
-  my $cursor_query = qq{
-  declare LOG cursor for
-    select log_origin, log_xid, log_tableid, log_actionseq, log_cmdtype, log_cmddata
-    from "_$cluster".sl_log_1
-    where $qualification
-    order by log_actionseq;};
-
-
-  my $syncname=sprintf("log-%8d", $sync_event);
-  open(LOGOUTPUT, ">$spoolpath/$syncname");
-  print LOGOUTPUT "-- Data for sync $sync_event\n";
-  print LOGOUTPUT "-- ", `date`;
-  print LOGOUTPUT "BEGIN;\n";
-  my $begin = $dbh->exec("begin;");
-  my $cursorexec = $dbh->exec($cursor_query);
-  my $foundsome = "YES";
-  while ($foundsome eq "YES") {
-    $foundsome = "NO";
-    my $res = $dbh->exec("fetch forward 100 in LOG;");
-    while (my @row = $res->fetchrow) {
-      $foundsome = "YES";
-      my ($origin, $xid, $tableid, $actionseq, $cmdtype, $cmddata) = @row;
-      if ($cmddata eq "I") {
-	printf LOGOUTPUT "insert into %s %s;\n", $TABLENAME[$tableid], $cmddata;
-      } elsif ($cmddata eq "U") {
-	printf LOGOUTPUT "update only %s set %s;\n", $TABLENAME[$tableid], $cmddata;
-      } elsif ($cmddata eq "D") {
-	printf LOGOUTPUT "delete from only %s where %s;\n", $TABLENAME[$tableid], $cmddata;
-      }
-    }
-  }
-  close LOGOUTPUT;
-  my $confirmation = qq{ insert into "_$cluster".sl_confirm (con_origin,con_received,con_seqno,con_timestamp)
-                         values ($node, $subnode, $sync_event, CURRENT_TIMESTAMP); };
-  print "Confirm: $confirmation\n";
-  my $cursorexec = $dbh->exec($confirmation);
-}
-
-sub connect_to_node {
-  my $dsn = "dbname=$database host=$host port=$port user=$user";
-  if ($password) {
-    $dsn .= " password=$password";
-  }
-  $dbh = Pg::connectdb($dsn);
-}
-
-sub process_options {
-
-  $goodopts = GetOptions("help", "database=s", "host=s", "user=s",
-			 "cluster=s", "password=s", "port=s", "sets=s",
-			 "spoolpath=s", "spoolname=s", "pgbins=s",
-			 "maxsize=i", "maxage=i", "node=i", "subnode=i");
-
-  if (defined ($opt_help)) {
-    show_usage();
-  }
-
-  $cluster=$opt_cluster if (defined($opt_cluster));
-  $subnode = $opt_subnode if (defined ($opt_subnode));
-  $node = $opt_node if (defined($opt_node));
-  $database=$opt_database if (defined ($opt_database));
-  $user = $opt_user if (defined ($opt_user));
-  $host = $opt_host if (defined($opt_host));
-  $password = $opt_password if (defined($opt_password));
-  $port = $opt_port if (defined($opt_port));
-  $pgbins = $opt_pgbins if (defined($opt_pgbins));
-  $spoolpath = $opt_spoolpath if (defined($opt_spoolpath));
-  $spoolname   = $opt_spoolname   if (defined($opt_spoolname));
-  if (defined($opt_sets)) {
-    @SETS=split (/,/, $opt_sets);
-  }
-  if (defined($opt_maxsize)){
-    $maxsize = $opt_maxsize;
-  } else {
-    $maxsize = 10000;
-  } 
-  if (defined($opt_maxage)){
-    $maxsize = $opt_maxage;
-  } else {
-    $maxage = 300;
-  }
-}
-
-sub show_usage {
-  print qq{slonspool:
-     --help                get help
-     --cluster=s           Slony-I cluster name
-     --subnode=s   Node number subscribed through
-     --node=i              Node number to use to request
-     --pgbins=s            Location of PostgreSQL binaries including slonik and pg_dump
-     --database=s          database to connect to
-     --host=s              host for database
-     --user=s              user for database
-     --password=s          password for database (you should probably use .pgpass instead)
-     --port=i              port number to connect to
-     --sets=s              Sets to replicate (comma-delimited) - e.g --sets=1,2,4 
-     --spoolpath=s         directory in which to spool output
-     --spoolname=s         naming convention for spoolfiles
-     --maxsize=i           maximum size of spool files, in kB - default =10000KB
-     --maxage=i            maximum age of spool files in seconds - default 300
-};
-  die -1;
-}
--- /dev/null
+++ src/slonspool/init.pm
@@ -0,0 +1,61 @@
+#!/usr/bin/perl
+# $Id: init.pm,v 1.1 2004/11/30 23:38:40 cbbrowne Exp $
+
+# Data structures...
+# %NODES{}{}
+# Fields:
+#  $NODE{$i}{last_event} - Last event processed for node
+#
+# %SET{}{}
+# Fields:
+#  $SET{$i}{origin}  - origin node
+#  $SET{$i}{comment} - Comment about set
+#  $SET{$i}{provider} - node that provides data to our favorite node
+#
+# %TABLES
+#  $TABLES{$i}{name}
+#  $TABLES{$i}{namespace}
+#  $TABLES{$i}{set}
+
+# Populate latest information about subscription providers and such...
+sub load_configuration {
+  my $dsn = "dbname=$database host=$host port=$port user=$user";
+  if ($password) {
+    $dsn .= " password=$password";
+  }
+  $dbh = Pg::connectdb($dsn);
+
+  # Populate %NODE with confirmation information
+  my $confirm_query = qq{ select con_origin, con_seqno from "_$cluster".sl_confirm where received = $node; };
+  my $res = $dbh->exec($confirm_query);
+  while (my @row = $res->fetchrow) {
+    my ($origin, $sync) = @row;
+    if ($NODE{$origin}{last_event} < $sync) {
+      $NODE{$origin}{last_event} = $sync;
+    }
+  }
+
+  # Populate %SET with set info for the sets being handled
+  my $sub_set_query = qq{ select set_id, set_origin from "_$cluster".sl_set where set_id in ($opt_sets);};
+  my $res = $dbh->exec($confirm_query);
+  while (my @row = $res->fetchrow) {
+    my ($set, $origin) = @row;
+    $SET{$set}{origin} = $origin;
+  }
+
+  my $tables_query = qq{select t.tab_id, t.tab_set, n.nspname, r.relname from "_$cluster".sl_table t, pg_catalog.pg_namespace n, pg_catalog.pg_class r where r.oid = t.tab_reloid and n.oid = r.relnamespace and tab_set in ($opt_sets) ;};
+  $res = $dbh->exec($tables_query);
+  while (my @row = $res->fetchrow) {
+    my ($id, $set, $namespace, $tname) = @row;
+    $TABLES{$id}{name} = $tname;
+    $TABLES{$id}{namespace} = $namespace;
+    $TABLES{$id}{set} = $set;
+  }
+}
+
+sub storeNode {
+  my ($id, $comment) = @_;
+  $NODES[$id] = $comment;
+}
+
+1;


More information about the Slony1-commit mailing list