Oracle AQ

Getting Started

Data Dictionary

dba/ALL/user_evaluation_contexts
dba/ALL/user_evaluation_context_tables
dba/ALL/user_queues
dba/ALL/user_queue_publishers
dba/ALL/user_queue_schedules
dba/ALL/user_queue_tables
dba/ALL/user_rule_sets
dba/ALL/user_rulesets
dba/ALL/user_source
dba/ALL/user_types

Create Users

CONNECT system/manager;
DROP USER aqadm CASCADE;
CREATE USER aqadm IDENTIFIED BY aqadm;
GRANT CONNECT, RESOURCE TO aqadm;
GRANT EXECUTE ON DBMS_AQADM TO aqadm;
GRANT Aq_administrator_role TO aqadm;
 
DROP USER aq CASCADE;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
GRANT EXECUTE ON dbms_aqadm TO aq;

Create Q Table and Q of Object Type

CONNECT aq/aq;
 
CREATE TYPE aq.Message_typ AS object (
subject VARCHAR2(30),
text VARCHAR2(80));
/
 
BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table => 'aq.objmsgs80_qtab',
  queue_payload_type => 'aq.Message_typ');
 
   DBMS_AQADM.CREATE_QUEUE (
  queue_name => 'msg_queue',
  queue_table => 'aq.objmsgs80_qtab');
 
   DBMS_AQADM.START_QUEUE (
  queue_name => 'msg_queue');
END;
/

Create a Q Table and Q of Raw Type

BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table => 'aq.RawMsgs_qtab',
  queue_payload_type => 'RAW');
 
  DBMS_AQADM.CREATE_QUEUE (
  queue_name => 'raw_msg_queue',
  queue_table => 'aq.RawMsgs_qtab');
 
  DBMS_AQADM.START_QUEUE (
  queue_name => 'raw_msg_queue');
END;
/

Create a Prioritized Message Q Table and Q

BEGIN
  DBMS_AQADM.CREATE_QUEUE_TABLE (
  queue_table => 'aq.priority_msg',
  sort_list => 'PRIORITY,ENQ_TIME',
  queue_payload_type => 'aq.Message_typ');
 
  DBMS_AQADM.CREATE_QUEUE (
  queue_name => 'priority_msg_queue',
  queue_table => 'aq.priority_msg');
 
  DBMS_AQADM.START_QUEUE (
  queue_name => 'priority_msg_queue');
END;
/

Drop Queue

BEGIN
   DBMS_AQADM.STOP_QUEUE(
      queue_name => 'TEST_QUEUE');
 
   DBMS_AQADM.DROP_QUEUE(
      Queue_name => 'TEST_QUEUE');
 
   DBMS_AQADM.DROP_QUEUE_TABLE(
        Queue_table => 'TEST_QUEUE_TAB');
END;

List Topic Subscribers

SET serveroutput ON
DECLARE
  subList dbms_aqadm.aq$_subscriber_list_t;
BEGIN
  subList := dbms_aqadm.queue_subscribers('My_Topic');
  dbms_output.put_line('Subscriber count:' || subList.count);
  FOR i IN subList.FIRST .. subList.LAST LOOP
    dbms_output.put_line('Subscriber name: ' || subList(i).name);
  END LOOP;
END;
/

Remove Subscribers

SET serveroutput ON
BEGIN
  DBMS_AQADM.REMOVE_SUBSCRIBER
      ('My_Topic', SYS.AQ$_AGENT ('Subscriber1', NULL, NULL));
END;
/

Sample PL/SQL

CREATE OR REPLACE PROCEDURE Create_AQ_Queue (
    queueName VARCHAR2, isTopic BOOLEAN)
AS
   queueTableName VARCHAR2(64);
BEGIN
   queueTableName := queueName || '_Tab';
 
   DBMS_OUTPUT.put_line ('Create queue table: ' || queueTableName);
   DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table => queueTableName,
        Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE',
        sort_list => 'PRIORITY,ENQ_TIME',
        multiple_consumers => isTopic,
        compatible => '8.1.5');
 
   DBMS_OUTPUT.put_line ('Create queue: ' || queueName);
   DBMS_AQADM.CREATE_QUEUE(
      Queue_name => queueName,
      Queue_table => queueTableName);
 
   DBMS_OUTPUT.put_line ('Created queue: ' || queueName);
END;
/
 
CREATE OR REPLACE PROCEDURE Start_AQ_Queue (queueName VARCHAR2)
AS
BEGIN
   DBMS_OUTPUT.put_line ('Starting queue: ' || queueName);
   DBMS_AQADM.START_QUEUE(
      queue_name => queueName);
   DBMS_OUTPUT.put_line ('Started queue: ' || queueName);
END;
/
 
CREATE OR REPLACE PROCEDURE Stop_AQ_Queue (queueName VARCHAR2)
AS
BEGIN
   DBMS_OUTPUT.put_line ('Stopping queue: ' || queueName);
   DBMS_AQADM.STOP_QUEUE(
      queue_name => queueName);
   DBMS_OUTPUT.put_line ('Stopped queue: ' || queueName);
END;
/
 
CREATE OR REPLACE PROCEDURE Remove_AQ_Queue (queueName VARCHAR2)
AS
BEGIN
   DBMS_OUTPUT.put_line ('Removing queue: ' || queueName);
   DBMS_AQADM.STOP_QUEUE(
      queue_name => queueName);
 
   DBMS_AQADM.DROP_QUEUE(
      Queue_name => queueName);
 
   DBMS_AQADM.DROP_QUEUE_TABLE(
        Queue_table => queueName || '_TAB');
   DBMS_OUTPUT.put_line ('Removed: ' || queueName);
END;
/
 
-- Drop queues
BEGIN
   Remove_AQ_Queue('Test_Queue');
 
   Remove_AQ_Queue('Test_Topic');
END;
/
 
-- Create queues
BEGIN
   Create_AQ_Queue('Test_Queue', FALSE);
   Start_AQ_Queue('Test_Queue');
 
   Create_AQ_Queue('Test_Topic', TRUE);
   Start_AQ_Queue('Test_Topic');
END;
/

* Add/Remove Subscribers

-- Declare a VARRAY type
CREATE OR REPLACE TYPE MyTopics AS VARRAY(16) OF VARCHAR2(64);
/
 
-- Add subscriber procedure
CREATE OR REPLACE PROCEDURE Subscribe_MyTopics 
(subscriberName VARCHAR2, subTopics MyTopics)
AS
  subscriber sys.aq$_agent;
BEGIN
  subscriber := sys.aq$_agent(subscriberName, NULL, NULL);
 
  FOR i IN subTopics.FIRST .. subTopics.LAST LOOP
    DBMS_OUTPUT.put_line ('Add subscriber: ' || subscriberName || ' to ITV Topic: ' || subTopics(i));
    DBMS_AQADM.ADD_SUBSCRIBER
    (queue_name => subTopics(i),
     subscriber => subscriber);
  END LOOP;
END;
/
 
-- Remove subscriber procedure
CREATE OR REPLACE PROCEDURE UnSubscribe_MyTopics 
(subscriberName VARCHAR2, subTopics MyTopics)
AS
  subscriber sys.aq$_agent;
BEGIN
  subscriber := sys.aq$_agent(subscriberName, NULL, NULL);
 
  FOR i IN subTopics.FIRST .. subTopics.LAST LOOP
    DBMS_OUTPUT.put_line ('Remove subscriber: ' || subscriberName || ' from ITV Topic: ' || subTopics(i));
    DBMS_AQADM.REMOVE_SUBSCRIBER
    (queue_name => subTopics(i),
     subscriber => subscriber);
  END LOOP;
END;
/
 
-- Add subscriber
SET SERVEROUTPUT ON;
DECLARE
  subTopics MyTopics;
BEGIN
  subTopics := MyTopics('Topic1', 'Topic2', 'Topic3');
 
  UnSubscribe_MyTopics('testId', subTopics);
  Subscribe_MyTopics('testId', subTopics);
END;
/

References

* Oracle® Streams Advanced Queuing User’s Guide and Reference10g Release 2 (10.2)

* Oracle® Streams Advanced Queuing User’s Guide and Reference Release 10.1 Part No. B10785-01

* http://www.akadia.com/services/ora_advanced_queueing.html

* AQ vs JMS
* Toad World SQL Reference
* Oracle DBMS_AQADM

This entry was posted in aq. Bookmark the permalink.