upgradeschema(p_old text)

8.138. upgradeschema(p_old text)

Function Properties

Language: PLPGSQL

Return Type: text

Called during "update functions" by slonik to perform schema changes

declare
	v_tab_row	record;
	v_query text;
	v_keepstatus text;
begin
	-- If old version is pre-2.0, then we require a special upgrade process
	if p_old like '1.%' then
		raise exception 'Upgrading to Slony-I 2.x requires running slony_upgrade_20';
	end if;

	perform upgradeSchemaAddTruncateTriggers();

	-- Change all Slony-I-defined columns that are "timestamp without time zone" to "timestamp *WITH* time zone"
	if exists (select 1 from information_schema.columns c
            where table_schema = '_schemadoc' and data_type = 'timestamp without time zone'
	    and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE')
		and (c.table_name, c.column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp')))
	then

	  -- Preserve sl_status
	  select pg_get_viewdef('sl_status') into v_keepstatus;
	  execute 'drop view sl_status';
	  for v_tab_row in select table_schema, table_name, column_name from information_schema.columns c
            where table_schema = '_schemadoc' and data_type = 'timestamp without time zone'
	    and exists (select 1 from information_schema.tables t where t.table_schema = c.table_schema and t.table_name = c.table_name and t.table_type = 'BASE TABLE')
		and (table_name, column_name) in (('sl_confirm', 'con_timestamp'), ('sl_event', 'ev_timestamp'), ('sl_registry', 'reg_timestamp'),('sl_archive_counter', 'ac_timestamp'))
	  loop
		raise notice 'Changing Slony-I column [%.%] to timestamp WITH time zone', v_tab_row.table_name, v_tab_row.column_name;
		v_query := 'alter table ' || slon_quote_brute(v_tab_row.table_schema) ||
                   '.' || v_tab_row.table_name || ' alter column ' || v_tab_row.column_name ||
                   ' type timestamp with time zone;';
		execute v_query;
	  end loop;
	  -- restore sl_status
	  execute 'create view sl_status as ' || v_keepstatus;
        end if;

	if not exists (select 1 from information_schema.tables where table_schema = '_schemadoc' and table_name = 'sl_components') then
	   v_query := '
create table sl_components (
	co_actor	 text not null primary key,
	co_pid		 integer not null,
	co_node		 integer not null,
	co_connection_pid integer not null,
	co_activity	  text,
	co_starttime	  timestamptz not null,
	co_event	  bigint,
	co_eventtype 	  text
) without oids;
';
  	   execute v_query;
	end if;

	



	if not exists (select 1 from information_schema.tables t where table_schema = '_schemadoc' and table_name = 'sl_event_lock') then
	   v_query := 'create table sl_event_lock (dummy integer);';
	   execute v_query;
        end if;
	
	if not exists (select 1 from information_schema.tables t 
			where table_schema = '_schemadoc' 
			and table_name = 'sl_apply_stats') then
		v_query := '
			create table sl_apply_stats (
				as_origin			int4,
				as_num_insert		int8,
				as_num_update		int8,
				as_num_delete		int8,
				as_num_truncate		int8,
				as_num_script		int8,
				as_num_total		int8,
				as_duration			interval,
				as_apply_first		timestamptz,
				as_apply_last		timestamptz,
				as_cache_prepare	int8,
				as_cache_hit		int8,
				as_cache_evict		int8,
				as_cache_prepare_max int8
			) WITHOUT OIDS;';
		execute v_query;
	end if;
	
	--
	-- On the upgrade to 2.2, we change the layout of sl_log_N by
	-- adding columns log_tablenspname, log_tablerelname, and
	-- log_cmdupdncols as well as changing log_cmddata into
	-- log_cmdargs, which is a text array.
	--
	if not check_table_field_exists('_schemadoc', 'sl_log_1', 'log_cmdargs') then
		--
		-- Check that the cluster is completely caught up
		--
		if check_unconfirmed_log() then
			raise EXCEPTION 'cannot upgrade to new sl_log_N format due to existing unreplicated data';
		end if;

		--
		-- Drop tables sl_log_1 and sl_log_2
		--
		drop table sl_log_1;
		drop table sl_log_2;

		--
		-- Create the new sl_log_1
		--
		create table sl_log_1 (
			log_origin          int4,
			log_txid            bigint,
			log_tableid         int4,
			log_actionseq       int8,
			log_tablenspname    text,
			log_tablerelname    text,
			log_cmdtype         "char",
			log_cmdupdncols     int4,
			log_cmdargs         text[]
		) without oids;
		create index sl_log_1_idx1 on sl_log_1
			(log_origin, log_txid, log_actionseq);

		comment on table sl_log_1 is 'Stores each change to be propagated to subscriber nodes';
		comment on column sl_log_1.log_origin is 'Origin node from which the change came';
		comment on column sl_log_1.log_txid is 'Transaction ID on the origin node';
		comment on column sl_log_1.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect';
		comment on column sl_log_1.log_actionseq is 'The sequence number in which actions will be applied on replicas';
		comment on column sl_log_1.log_tablenspname is 'The schema name of the table affected';
		comment on column sl_log_1.log_tablerelname is 'The table name of the table affected';
		comment on column sl_log_1.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE';
		comment on column sl_log_1.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs';
		comment on column sl_log_1.log_cmdargs is 'The data needed to perform the log action on the replica';

		--
		-- Create the new sl_log_2
		--
		create table sl_log_2 (
			log_origin          int4,
			log_txid            bigint,
			log_tableid         int4,
			log_actionseq       int8,
			log_tablenspname    text,
			log_tablerelname    text,
			log_cmdtype         "char",
			log_cmdupdncols     int4,
			log_cmdargs         text[]
		) without oids;
		create index sl_log_2_idx1 on sl_log_2
			(log_origin, log_txid, log_actionseq);

		comment on table sl_log_2 is 'Stores each change to be propagated to subscriber nodes';
		comment on column sl_log_2.log_origin is 'Origin node from which the change came';
		comment on column sl_log_2.log_txid is 'Transaction ID on the origin node';
		comment on column sl_log_2.log_tableid is 'The table ID (from sl_table.tab_id) that this log entry is to affect';
		comment on column sl_log_2.log_actionseq is 'The sequence number in which actions will be applied on replicas';
		comment on column sl_log_2.log_tablenspname is 'The schema name of the table affected';
		comment on column sl_log_2.log_tablerelname is 'The table name of the table affected';
		comment on column sl_log_2.log_cmdtype is 'Replication action to take. U = Update, I = Insert, D = DELETE, T = TRUNCATE';
		comment on column sl_log_2.log_cmdupdncols is 'For cmdtype=U the number of updated columns in cmdargs';
		comment on column sl_log_2.log_cmdargs is 'The data needed to perform the log action on the replica';

		create table sl_log_script (
			log_origin			int4,
			log_txid			bigint,
			log_actionseq		int8,
			log_cmdtype			"char",
			log_cmdargs			text[]
			) WITHOUT OIDS;
		create index sl_log_script_idx1 on sl_log_script
		(log_origin, log_txid, log_actionseq);

		comment on table sl_log_script is 'Captures SQL script queries to be propagated to subscriber nodes';
		comment on column sl_log_script.log_origin is 'Origin name from which the change came';
		comment on column sl_log_script.log_txid is 'Transaction ID on the origin node';
		comment on column sl_log_script.log_actionseq is 'The sequence number in which actions will be applied on replicas';
		comment on column sl_log_2.log_cmdtype is 'Replication action to take. S = Script statement, s = Script complete';
		comment on column sl_log_script.log_cmdargs is 'The DDL statement, optionally followed by selected nodes to execute it on.';

		--
		-- Put the log apply triggers back onto sl_log_1/2
		--
		create trigger apply_trigger
			before INSERT on sl_log_1
			for each row execute procedure logApply('_schemadoc');
		alter table sl_log_1
			enable replica trigger apply_trigger;
		create trigger apply_trigger
			before INSERT on sl_log_2
			for each row execute procedure logApply('_schemadoc');
		alter table sl_log_2
			enable replica trigger apply_trigger;
	end if;
	if not exists (select 1 from information_schema.routines where routine_schema = '_schemadoc' and routine_name = 'string_agg') then
	       CREATE AGGREGATE string_agg(text) (
	   	      SFUNC=agg_text_sum,
		      STYPE=text,
		      INITCOND=''
		      );
	end if;
	if not exists (select 1 from information_schema.views where table_schema='_schemadoc' and table_name='sl_failover_targets') then
	   create view sl_failover_targets as
	   	  select  set_id,
		  set_origin as set_origin,
		  sub1.sub_receiver as backup_id

		  FROM
		  sl_subscribe sub1
		  ,sl_set set1
		  where
 		  sub1.sub_set=set_id
		  and sub1.sub_forward=true
		  --exclude candidates where the set_origin
		  --has a path a node but the failover
		  --candidate has no path to that node
		  and sub1.sub_receiver not in
	    	  (select p1.pa_client from
	    	  sl_path p1 
	    	  left outer join sl_path p2 on
	    	  (p2.pa_client=p1.pa_client 
	    	  and p2.pa_server=sub1.sub_receiver)
	    	  where p2.pa_client is null
	    	  and p1.pa_server=set_origin
	    	  and p1.pa_client<>sub1.sub_receiver
	    	  )
		  and sub1.sub_provider=set_origin
		  --exclude any subscribers that are not
		  --direct subscribers of all sets on the
		  --origin
		  and sub1.sub_receiver not in
		  (select direct_recv.sub_receiver
		  from
			
			(--all direct receivers of the first set
			select subs2.sub_receiver
			from sl_subscribe subs2
			where subs2.sub_provider=set1.set_origin
		      	and subs2.sub_set=set1.set_id) as
		      	direct_recv
			inner join
			(--all other sets from the origin
			select set_id from sl_set set2
			where set2.set_origin=set1.set_origin
			and set2.set_id<>sub1.sub_set)
			as othersets on(true)
			left outer join sl_subscribe subs3
			on(subs3.sub_set=othersets.set_id
		   	and subs3.sub_forward=true
		   	and subs3.sub_provider=set1.set_origin
		   	and direct_recv.sub_receiver=subs3.sub_receiver)
	    		where subs3.sub_receiver is null
	    	);
	end if;

	if not check_table_field_exists('_schemadoc', 'sl_node', 'no_failed') then
	   alter table sl_node add column no_failed bool;
	   update sl_node set no_failed=false;
	end if;
	return p_old;
end;