Simple demo of message propagation

Posted by

If you’re using AQ, then it’s simple to setup simple enqueue and dequeue facilities on your local database to provide all sorts of asynchronous style processing in your applications.  As long as you’re applications are designed and built to handle it, the “fire and forget” model to keep user applications responsive, and all of the “heavy lifting” done in the background is a very attractive one.

You can also use AQ to achieve the same concept across multiple databases, and the database will take care of propagating the messages from one database to the other.  Here’s a simple demo of that.

Database 1




SQL> connect / as sysdba
Connected.

--
-- A user to hold all of our AQ stuff
--
SQL> create user aqtest identified by aqtest;

User created.

SQL> grant connect, resource, aq_administrator_role to aqtest;

Grant succeeded.

SQL> grant execute on dbms_aq to aqtest;

Grant succeeded.

SQL> grant execute on dbms_aqadm to aqtest;

Grant succeeded.

SQL> alter user aqtest quota unlimited on users;

User altered.

SQL> grant create database link to aqtest;

Grant succeeded.

SQL> begin
  2    dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQTEST',FALSE);
  3    dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQTEST',FALSE);
  4  end;
  5  /

PL/SQL procedure successfully completed.

--
-- Now we connect as AQTEST and construct our message payload types and our local queues
--

SQL> connect aqtest/aqtest
Connected.

SQL> create type aqtest.message_typ as object(subject varchar2(30), text varchar2(80));
  2  /

Type created.

SQL> begin
  2    dbms_aqadm.create_queue_table(queue_table => 'aqtest.messages_qtab',
  3                                  queue_payload_type =>  'aqtest.Message_typ',
  4                                  multiple_consumers => TRUE);
  5    dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE',
  6                            queue_table => 'aqtest.messages_qtab');
  7    dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE');
  8  end;
  9  /

PL/SQL procedure successfully completed.

--
-- And here is a basic enqueue routine.  If a remote address is specify, then we will propagate
-- the message to that address.  Otherwise the message will stay in the local queue.
--
SQL> create or replace procedure enqueue_msg(p_msg in varchar2,
  2                                          p_remote_address in varchar2 default null)
  3  as
  4    l_enqueue_options    dbms_aq.enqueue_options_t;
  5    l_message_properties dbms_aq.message_properties_t;
  6    l_message_handle     raw(16);
  7    l_message            aqtest.message_typ;
  8    l_recipients         dbms_aq.aq$_recipient_list_t;
  9  BEGIN
 10    l_recipients(1) := SYS.aq$_agent('RECIPIENT', p_remote_address, null);
 11    l_message_properties.recipient_list := l_recipients;
 12
 13    l_message := message_typ('NORMAL MESSAGE',  p_msg );
 14    dbms_aq.enqueue(queue_name => 'msg_queue',
 15                    enqueue_options => l_enqueue_options,
 16                    message_properties => l_message_properties,
 17                    payload => l_message,
 18                    msgid => l_message_handle);
 19  end;
 20  /

Procedure created.

Database 2, we do the exact same setup



SQL> connect / as sysdba
Connected.

--
-- A user to hold all of our AQ stuff
--
SQL> create user aqtest identified by aqtest;

User created.

SQL> grant connect, resource, aq_administrator_role to aqtest;

Grant succeeded.

SQL> grant execute on dbms_aq to aqtest;

Grant succeeded.

SQL> grant execute on dbms_aqadm to aqtest;

Grant succeeded.

SQL> alter user aqtest quota unlimited on users;

User altered.

SQL> grant create database link to aqtest;

Grant succeeded.

SQL> begin
  2    dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQTEST',FALSE);
  3    dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQTEST',FALSE);
  4  end;
  5  /

PL/SQL procedure successfully completed.

--
-- Now we connect as AQTEST and construct our message payload types and our local queues
--

SQL> connect aqtest/aqtest
Connected.

SQL> create type aqtest.message_typ as object(subject varchar2(30), text varchar2(80));
  2  /

Type created.

SQL> begin
  2    dbms_aqadm.create_queue_table(queue_table => 'aqtest.messages_qtab',
  3                                  queue_payload_type =>  'aqtest.Message_typ',
  4                                  multiple_consumers => TRUE);
  5    dbms_aqadm.create_queue(queue_name => 'MSG_QUEUE',
  6                            queue_table => 'aqtest.messages_qtab');
  7    dbms_aqadm.start_queue(queue_name => 'MSG_QUEUE');
  8  end;
  9  /

PL/SQL procedure successfully completed.

Back to Database 1


--
-- We need a database link to the remote database, plus a quick query to test that its working
--

SQL> create database link remote_db connect to aqtest identified by aqtest using 'db11';

Database link created.

SQL> select * from tab@remote_db;

no rows selected


--
-- We get our propagation schedule running, and we're ready to go.   
--

SQL> begin
  2    dbms_aqadm.schedule_propagation(queue_name  => 'MSG_QUEUE',
  3                                        destination => 'remote_db',
  4                                        start_time  => sysdate,
  5                                        latency     => 0);
  6  end;
  7  /

PL/SQL procedure successfully completed.

SQL> select * from user_queue_schedules
  2  @pr
==============================
QNAME                         : MSG_QUEUE
DESTINATION                   : REMOTE_DB
START_DATE                    :
START_TIME                    : 14:16:01
PROPAGATION_WINDOW            :
NEXT_TIME                     :
LATENCY                       : 0
SCHEDULE_DISABLED             : N
PROCESS_NAME                  : J000
SESSION_ID                    : 400, 38936
INSTANCE                      : 1
LAST_RUN_DATE                 : 28-NOV-16 02.16.01.283000 PM +08:00
LAST_RUN_TIME                 : 14:16:01
CURRENT_START_DATE            : 28-NOV-16 02.16.01.283000 PM +08:00
CURRENT_START_TIME            : 14:16:01
NEXT_RUN_DATE                 : 28-NOV-16 02.16.01.280000 PM +08:00
NEXT_RUN_TIME                 : 14:16:01
TOTAL_TIME                    : 0
TOTAL_NUMBER                  : 0
TOTAL_BYTES                   : 0
MAX_NUMBER                    : 0
MAX_BYTES                     : 0
AVG_NUMBER                    : 0
AVG_SIZE                      : 0
AVG_TIME                      : 0
FAILURES                      : 0
LAST_ERROR_DATE               :
LAST_ERROR_TIME               :
LAST_ERROR_MSG                :
MESSAGE_DELIVERY_MODE         : PERSISTENT
ELAPSED_DEQUEUE_TIME          :
ELAPSED_PICKLE_TIME           :
JOB_NAME                      : AQ_JOB$_6438

PL/SQL procedure successfully completed.

--
-- a message that will not be propagaged, because remote recipient is not specified
--
SQL> begin
  2    enqueue_msg('This message will stay local');
  3    commit;
  4  end;
  5  /

PL/SQL procedure successfully completed.

--
-- a message that WILL be propagated, because remote recipient is specified
--
SQL> begin
  2    enqueue_msg('This message will be propagated.',
  3                 'aqtest.msg_queue_other@remote_db');
  4    commit;
  5  end;
  6  /

PL/SQL procedure successfully completed.

--
-- So if everything is working correctly, we have 2 messages on the local queue, and 1 message on the remote queue
--

SQL>
SQL> select t1.cnt,
  2         t2.cnt
  3  from (select count(*) cnt from messages_qtab) t1,
  4       (select count(*) cnt from messages_qtab_other@remote_db) t2
  5  /

       CNT        CNT
---------- ----------
         2          1

1 row selected.

SQL>
SQL>

And there you go.  Messages between databases using the in-built propagation mechanisms.

3 comments

  1. Excellent post even by your lofty standards! Yet another woefully underused feature.

    Lots of enterprise applications use messaging for workflow or “business processes”, but developers forget that the work done in one step and the message to the next step are one logical unit of work, so they have to be enclosed in one transaction.

    With AQ, the transaction management comes naturally without having to use some outside transaction manager – though with two DBs it’s a distributed transaction.

    Distributed transactions can be problematic (timeouts, “in doubt” transactions…) but if we limit their use to AQ then I imagine the problems are greatly minimized…

  2. Two problems:

    1. Where is the parameter “p_remote_address” used in enqueue_msg ?

    2. What are messages_qtab_other and msg_queue_other ?
    Weren’t the queue table and queue on the second database named the same as on the first one ?

    Best Regards,
    Iudith

Got some thoughts? Leave a comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.