如何配置非分区双向事务复制(复制 Transact-SQL 编程)

双向事务复制是一种特定的事务复制拓扑,它允许两台服务器相互交换更改:每台服务器均发布数据,然后从另一台服务器订阅包含相同数据的发布。

对等事务复制也支持此拓扑,但双向复制可提高性能。 有关详细信息,请参阅对等事务复制

在两台服务器上的数据库之间配置未分区的双向事务复制拓扑

  1. 使每台服务器能作为发布服务器和分发服务器 有关详细信息,请参阅如何配置发布和分发(复制 Transact-SQL 编程)

  2. 在每台服务器上,对要双向复制的数据库执行 sp_replicationdboption (Transact-SQL)。 为 @dbname 指定数据库名称,为 @optname 指定 publish 值,并将 @value 的值指定为 true。

  3. 在每台服务器上,对要双向复制的数据库执行 sp_addpublication (Transact-SQL)。 为 @publication 指定发布的名称。

  4. 在每台服务器上,对要双向复制的数据库执行 sp_addarticle (Transact-SQL)。 指定下列参数:

    • @article - 项目的名称。

    • @publication - 步骤 3 中的发布名称。

    • @source_object - 已发布的表的名称。

    • @destination_table - 位于其他服务器上的数据库中的等效表的名称。

    • @schema_option - 确保不设置 0x02 值。

      注意注意

      如果不设置 0x02 值,则对 @schema_option&(位与)(Transact-SQL) 运算将返回值 0。若要执行此运算,必须将 binary 值转换为 int。

    • @ins_cmd - CALL sp_ins_destination_article_name 值。

    • @upd_cmd - SCALL sp_upd_destination_article_name 值。

    • @del_cmd - CALL sp_del_destination_article_name 值。

      注意注意

      这些值表示默认的调用格式。 可以使用其他调用格式。

  5. 对双向发布中的每个项目重复步骤 4。

  6. 在每台服务器上,对要双向复制的数据库执行 sp_addsubscription (Transact-SQL)。 指定下列参数:

    • @publication - 步骤 3 中的发布的名称。

    • @subscriber - 其他服务器的名称。

    • @destination_db - 其他服务器上的数据库的名称。

    • @sync_type - none 值。

    • @status - active 值。

    • @loopback_detection - true 值。

  7. 在每台服务器上,对要双向复制的数据库执行 sp_addpushsubscription_agent (Transact-SQL)。 指定下列参数:

    • @publication - 步骤 3 中的发布的名称。

    • @subscriber - 其他服务器的名称。

    • @subscriber_db - 其他服务器上的数据库的名称。

    • @job_login - 在分发服务器上运行分发代理时所用的 Microsoft Windows 凭据。

    • @job_password - 在分发服务器上运行分发代理时所用的 Windows 凭据的密码。

      安全说明安全说明

      如果可能,请在运行时提示用户输入安全凭据。 如果必须在脚本文件中存储凭据,则必须保护文件以防止未经授权的访问。

  8. 使用步骤 4 中指定的名称,创建自定义存储过程以处理冲突并执行插入、更新和删除操作。 有关详细信息,请参阅如何将数据更改的传播方法设置为事务项目(复制 Transact-SQL 编程)

示例

此示例在同一台服务器上创建两个测试数据库,然后在这两个数据库之间配置未分区的双向事务复制。

-- This script uses sqlcmd scripting variables. They are in the form
-- $(MyVariable). For information about how to use scripting variables  
-- on the command line and in SQL Server Management Studio, see the 
-- "Executing Replication Scripts" section in the topic
-- "Programming Replication Using System Stored Procedures".

USE master;
GO

-- Create the test databases if they do not exist.
IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'test1')
    CREATE DATABASE test1;

IF NOT EXISTS(SELECT * FROM sys.databases WHERE name = 'test2')
    CREATE DATABASE test2;
GO

-- Enable the server as a Distributor, if not already done.
DECLARE @distributor AS sysname;
EXEC sp_helpdistributor @distributor = @distributor OUTPUT;
IF @distributor <> @@SERVERNAME
BEGIN
    EXEC master..sp_adddistributor @distributor = @@SERVERNAME;
    EXEC master..sp_adddistributiondb @database= 'distribution';
END
GO

-- Enable the server as a Publisher, if not already done.
IF NOT EXISTS (SELECT * FROM msdb..MSdistpublishers WHERE name = @@SERVERNAME)
    EXEC master..sp_adddistpublisher 
        @publisher = @@SERVERNAME, 
        @distribution_db = 'distribution', 
        @working_directory = 'C:\Program Files\Microsoft SQL Server\MSSQL.1\MSSQL\repldata'; 
GO

-- Enable the databases for transactional publishing.
EXEC sp_replicationdboption N'test1', N'publish', true;
EXEC sp_replicationdboption N'test2', N'publish', true;
GO

-- Create a table in test1 and populate with 10 rows.
USE test1;
GO

IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'two_way_test1')
    DROP TABLE two_way_test1;
GO

CREATE TABLE two_way_test1
(pkcol int primary key not null,
 intcol int,
 charcol char(100),
 datecol datetime
);
GO

INSERT INTO two_way_test1 VALUES (1, 10, 'row1', GETDATE());
INSERT INTO two_way_test1 VALUES (2, 20, 'row2', GETDATE());
INSERT INTO two_way_test1 VALUES (3, 30, 'row3', GETDATE());
INSERT INTO two_way_test1 VALUES (4, 40, 'row4', GETDATE());
INSERT INTO two_way_test1 VALUES (5, 50, 'row5', GETDATE());
INSERT INTO two_way_test1 VALUES (6, 60, 'row6', GETDATE());
INSERT INTO two_way_test1 VALUES (7, 70, 'row7', GETDATE());
INSERT INTO two_way_test1 VALUES (8, 80, 'row8', GETDATE());
INSERT INTO two_way_test1 VALUES (9, 90, 'row9', GETDATE());
INSERT INTO two_way_test1 VALUES (10, 100, 'row10', GETDATE());
GO

-- Create a table in test2 and populate with 10 rows
USE test2;
GO

IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'two_way_test2')
    DROP TABLE two_way_test2;
GO

CREATE TABLE two_way_test2
(pkcol int primary key not null,
 intcol int,
 charcol char(100),
 datecol datetime
);
GO

INSERT INTO two_way_test2 VALUES (1, 10, 'row1', GETDATE());
INSERT INTO two_way_test2 VALUES (2, 20, 'row2', GETDATE());
INSERT INTO two_way_test2 VALUES (3, 30, 'row3', GETDATE());
INSERT INTO two_way_test2 VALUES (4, 40, 'row4', GETDATE());
INSERT INTO two_way_test2 VALUES (5, 50, 'row5', GETDATE());
INSERT INTO two_way_test2 VALUES (6, 60, 'row6', GETDATE());
INSERT INTO two_way_test2 VALUES (7, 70, 'row7', GETDATE());
INSERT INTO two_way_test2 VALUES (8, 80, 'row8', GETDATE());
INSERT INTO two_way_test2 VALUES (9, 90, 'row9', GETDATE());
INSERT INTO two_way_test2 VALUES (10, 100, 'row10', GETDATE());
GO

-- Add the transactional publication and article in test1
USE test1;
GO

DECLARE @publication AS sysname; 
DECLARE @article1 AS sysname; 
DECLARE @article2 AS sysname; 
DECLARE @login AS sysname;
DECLARE @password AS nvarchar(512);
SET @publication = N'two_way_pub_test1';
SET @article1 = N'two_way_test1';
SET @article2 = N'two_way_test2';
SET @login = $(Login);
SET @password = $(Password);

EXEC sp_addlogreader_agent 
    @job_login = @login, 
    @job_password = @password,
    @publisher_security_mode = 1;

EXEC sp_addpublication @publication = @publication, 
    @restricted = N'false', 
    @sync_method = N'native', 
    @repl_freq = N'continuous', 
    @description = N'publ1', 
    @status = N'active', 
    @allow_push = N'true', 
    @allow_pull = N'true', 
    @allow_anonymous = N'false', 
    @enabled_for_internet = N'false', 
    @independent_agent = N'true', 
    @immediate_sync = N'false', 
    @allow_sync_tran = N'false', 
    @autogen_sync_procs = N'false', 
    @retention = 60;

EXEC sp_addarticle @publication = @publication,
    @article = @article1, 
    @source_owner = N'dbo', 
    @source_object = @article1, 
    @destination_table = @article2, 
    @type = N'logbased', 
    @creation_script = null, 
    @description = null, 
    @pre_creation_cmd = N'drop', 
    @schema_option = 0x00000000000000F1, 
    @status = 16, 
    @vertical_partition = N'false', 
    @ins_cmd = N'CALL sp_ins_two_way_test2', 
    @del_cmd = N'XCALL sp_del_two_way_test2', 
    @upd_cmd = N'XCALL sp_upd_two_way_test2', 
    @filter = null, 
    @sync_object = null;
GO

-- Add the transactional publication and article in test2
USE test2
GO

DECLARE @publication AS sysname; 
DECLARE @article1 AS sysname; 
DECLARE @article2 AS sysname; 
DECLARE @login AS sysname;
DECLARE @password AS nvarchar(512);
SET @publication = N'two_way_pub_test2';
SET @article1 = N'two_way_test1';
SET @article2 = N'two_way_test2';
SET @login = $(Login);
SET @password = $(Password);

EXEC sp_addlogreader_agent 
    @job_login = @login, 
    @job_password = @password,
    @publisher_security_mode = 1;

EXEC sp_addpublication @publication = @publication, 
    @restricted = N'false', 
    @sync_method = N'native', 
    @repl_freq = N'continuous', 
    @description = N'Pub2',
    @status = N'active', 
    @allow_push = N'true', 
    @allow_pull = N'true', 
    @allow_anonymous = N'false', 
    @enabled_for_internet = N'false', 
    @independent_agent = N'true', 
    @immediate_sync = N'false', 
    @allow_sync_tran = N'false', 
    @autogen_sync_procs = N'false', 
    @retention = 60;

EXEC sp_addarticle @publication = @publication,
    @article = @article2, 
    @source_owner = N'dbo', 
    @source_object = @article2, 
    @destination_table = @article1, 
    @type = N'logbased', 
    @creation_script = null, 
    @description = null, 
    @pre_creation_cmd = N'drop', 
    @schema_option = 0x00000000000000F1, 
    @status = 16, 
    @vertical_partition = N'false', 
    @ins_cmd = N'CALL sp_ins_two_way_test1', 
    @del_cmd = N'XCALL sp_del_two_way_test1', 
    @upd_cmd = N'XCALL sp_upd_two_way_test1', 
    @filter = null, 
    @sync_object = null;
GO

-- Add the transactional subscription in test1
USE test1
GO

DECLARE @publication AS sysname;
DECLARE @subscriber AS sysname;
DECLARE @subscription_db AS sysname;
SET @publication = N'two_way_pub_test1';
SET @subscriber = $(SubServer2);
SET @subscription_db = N'test2';

EXEC sp_addsubscription @publication = @publication, 
    @article = N'all', 
    @subscriber = @subscriber, 
    @destination_db = @subscription_db, 
    @sync_type = N'none', 
    @status = N'active', 
    @update_mode = N'read only', 
    @loopback_detection = 'true';

EXEC sp_addpushsubscription_agent 
    @publication = @publication, 
    @subscriber = @subscriber, 
    @subscriber_db = @subscription_db, 
    @job_login = $(Login), 
    @job_password = $(Password);
GO

-- Add the transactional subscription in test2
USE test2
GO

DECLARE @publication AS sysname;
DECLARE @subscriber AS sysname;
DECLARE @subscription_db AS sysname;
SET @publication = N'two_way_pub_test2';
SET @subscriber = $(SubServer1);
SET @subscription_db = N'test1';

EXEC sp_addsubscription @publication = @publication, 
    @article = N'all', 
    @subscriber = @subscriber, 
    @destination_db = @subscription_db, 
    @sync_type = N'none', 
    @status = N'active', 
    @update_mode = N'read only', 
    @loopback_detection = 'true';

EXEC sp_addpushsubscription_agent 
    @publication = @publication, 
    @subscriber = @subscriber, 
    @subscriber_db = @subscription_db, 
    @job_login = $(Login), 
    @job_password = $(Password);
GO

-- Create custom stored procedures in test1 
USE test1
GO

IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_ins_two_way_test1' and type = 'P')
    DROP proc sp_ins_two_way_test1;
IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_upd_two_way_test1' and type = 'P')
    DROP proc sp_upd_two_way_test1;
IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_del_two_way_test1' and type = 'P')
    DROP proc sp_del_two_way_test1;
GO

-- Insert procedure
CREATE proc sp_ins_two_way_test1 @pkcol int, 
    @intcol int, 
    @charcol char(100), 
    @datecol datetime
AS
    INSERT INTO two_way_test1 (pkcol, intcol, charcol, 
        datecol) 
    VALUES (@pkcol, @intcol, @charcol, GETDATE());
GO

-- Update procedure
CREATE proc sp_upd_two_way_test1 @old_pkcol int, 
    @old_intcol int, 
    @old_charcol char(100), 
    @old_datecol datetime,
    @pkcol int, @intcol int, 
    @charcol char(100), 
    @datecol datetime
AS
    -- IF intcol conflict is detected, add values
    -- IF charcol conflict detected, concatenate values
    DECLARE  @curr_intcol int, @curr_charcol char(100);

    SELECT @curr_intcol = intcol, @curr_charcol = charcol 
    FROM two_way_test1 WHERE pkcol = @pkcol;

    IF @curr_intcol != @old_intcol
        SELECT @intcol = @curr_intcol + 
            (@intcol - @old_intcol);

    IF @curr_charcol != @old_charcol
        SELECT @charcol = rtrim(@curr_charcol) + 
            '_' + rtrim(@charcol);

    UPDATE two_way_test1 SET intcol = @intcol, 
        charcol = @charcol, datecol = GETDATE()
    WHERE pkcol = @old_pkcol;
GO

-- Delete procedure
CREATE proc sp_del_two_way_test1 @old_pkcol int, 
    @old_intcol int, 
    @old_charcol char(100), 
    @old_datecol datetime
AS
    DELETE two_way_test1 WHERE pkcol = @old_pkcol;
GO

-- Create custom stored procedures in test2
USE test2
GO

IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_ins_two_way_test2' and type = 'P')
    DROP proc sp_ins_two_way_test2;
IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_upd_two_way_test2' and type = 'P')
    DROP proc sp_upd_two_way_test2;
IF EXISTS (SELECT * FROM sysobjects WHERE name LIKE 'sp_del_two_way_test2' and type = 'P')
    DROP proc sp_del_two_way_test2;
GO

-- Insert procedure
CREATE proc sp_ins_two_way_test2 @pkcol int, 
    @intcol int, 
    @charcol char(100), 
    @datecol datetime
AS
    INSERT INTO two_way_test2 (pkcol, intcol, charcol,datecol) 
        VALUES (@pkcol, @intcol, @charcol, GETDATE());
GO

-- Update procedure
CREATE proc sp_upd_two_way_test2 @old_pkcol int, 
    @old_intcol int, 
    @old_charcol char(100), 
    @old_datecol datetime,
    @pkcol int, 
    @intcol int, 
    @charcol char(100), 
    @datecol datetime
AS
    -- IF intcol conflict is detected, add values
    -- IF charcol conflict detected, concatenate values
    DECLARE  @curr_intcol int, @curr_charcol char(100);

    SELECT @curr_intcol = intcol, @curr_charcol = charcol 
    FROM two_way_test2 WHERE pkcol = @pkcol;

    IF @curr_intcol != @old_intcol
        SELECT @intcol = @curr_intcol + 
            (@intcol - @old_intcol);

    IF @curr_charcol != @old_charcol
        SELECT @charcol = rtrim(@curr_charcol) + 
        '_' + rtrim(@charcol);

    UPDATE two_way_test2 SET intcol = @intcol, 
        charcol = @charcol, datecol = GETDATE() 
    WHERE pkcol = @old_pkcol;
GO

-- Delete procedure
CREATE proc sp_del_two_way_test2 @old_pkcol int, 
    @old_intcol int, 
    @old_charcol char(100), 
    @old_datecol datetime
AS
    DELETE two_way_test2 WHERE pkcol = @old_pkcol;

GO

-- Execute updates to the first row in test1 and test2
USE test1
GO
UPDATE two_way_test1 SET intcol = 20 , charcol = 'updated at test1' WHERE pkcol = 1;

USE test2
GO
UPDATE two_way_test2 SET intcol = 60 , charcol = 'updated at test2' WHERE pkcol = 1;

-- Select data from both tables to verify that the changes were propagated
SELECT * FROM test1..two_way_test1 WHERE pkcol = 1;
SELECT * FROM test2..two_way_test2 WHERE pkcol = 1;
GO