Getting Started
Data Dictionary
dba/ALL/user_evaluation_contexts dba/ALL/user_evaluation_context_tables dba/ALL/user_queues dba/ALL/user_queue_publishers dba/ALL/user_queue_schedules dba/ALL/user_queue_tables dba/ALL/user_rule_sets dba/ALL/user_rulesets dba/ALL/user_source dba/ALL/user_types
Create Users
CONNECT system/manager; DROP USER aqadm CASCADE; CREATE USER aqadm IDENTIFIED BY aqadm; GRANT CONNECT, RESOURCE TO aqadm; GRANT EXECUTE ON DBMS_AQADM TO aqadm; GRANT Aq_administrator_role TO aqadm; DROP USER aq CASCADE; CREATE USER aq IDENTIFIED BY aq; GRANT CONNECT, RESOURCE TO aq; GRANT EXECUTE ON dbms_aq TO aq; GRANT EXECUTE ON dbms_aqadm TO aq;
Create Q Table and Q of Object Type
CONNECT aq/aq; CREATE TYPE aq.Message_typ AS object ( subject VARCHAR2(30), text VARCHAR2(80)); / BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.objmsgs80_qtab', queue_payload_type => 'aq.Message_typ'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'msg_queue', queue_table => 'aq.objmsgs80_qtab'); DBMS_AQADM.START_QUEUE ( queue_name => 'msg_queue'); END; /
Create a Q Table and Q of Raw Type
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.RawMsgs_qtab', queue_payload_type => 'RAW'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'raw_msg_queue', queue_table => 'aq.RawMsgs_qtab'); DBMS_AQADM.START_QUEUE ( queue_name => 'raw_msg_queue'); END; /
Create a Prioritized Message Q Table and Q
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'aq.priority_msg', sort_list => 'PRIORITY,ENQ_TIME', queue_payload_type => 'aq.Message_typ'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'priority_msg_queue', queue_table => 'aq.priority_msg'); DBMS_AQADM.START_QUEUE ( queue_name => 'priority_msg_queue'); END; /
Drop Queue
BEGIN DBMS_AQADM.STOP_QUEUE( queue_name => 'TEST_QUEUE'); DBMS_AQADM.DROP_QUEUE( Queue_name => 'TEST_QUEUE'); DBMS_AQADM.DROP_QUEUE_TABLE( Queue_table => 'TEST_QUEUE_TAB'); END;
List Topic Subscribers
SET serveroutput ON DECLARE subList dbms_aqadm.aq$_subscriber_list_t; BEGIN subList := dbms_aqadm.queue_subscribers('My_Topic'); dbms_output.put_line('Subscriber count:' || subList.count); FOR i IN subList.FIRST .. subList.LAST LOOP dbms_output.put_line('Subscriber name: ' || subList(i).name); END LOOP; END; /
Remove Subscribers
SET serveroutput ON BEGIN DBMS_AQADM.REMOVE_SUBSCRIBER ('My_Topic', SYS.AQ$_AGENT ('Subscriber1', NULL, NULL)); END; /
Sample PL/SQL
CREATE OR REPLACE PROCEDURE Create_AQ_Queue ( queueName VARCHAR2, isTopic BOOLEAN) AS queueTableName VARCHAR2(64); BEGIN queueTableName := queueName || '_Tab'; DBMS_OUTPUT.put_line ('Create queue table: ' || queueTableName); DBMS_AQADM.CREATE_QUEUE_TABLE( Queue_table => queueTableName, Queue_payload_type => 'SYS.AQ$_JMS_MESSAGE', sort_list => 'PRIORITY,ENQ_TIME', multiple_consumers => isTopic, compatible => '8.1.5'); DBMS_OUTPUT.put_line ('Create queue: ' || queueName); DBMS_AQADM.CREATE_QUEUE( Queue_name => queueName, Queue_table => queueTableName); DBMS_OUTPUT.put_line ('Created queue: ' || queueName); END; / CREATE OR REPLACE PROCEDURE Start_AQ_Queue (queueName VARCHAR2) AS BEGIN DBMS_OUTPUT.put_line ('Starting queue: ' || queueName); DBMS_AQADM.START_QUEUE( queue_name => queueName); DBMS_OUTPUT.put_line ('Started queue: ' || queueName); END; / CREATE OR REPLACE PROCEDURE Stop_AQ_Queue (queueName VARCHAR2) AS BEGIN DBMS_OUTPUT.put_line ('Stopping queue: ' || queueName); DBMS_AQADM.STOP_QUEUE( queue_name => queueName); DBMS_OUTPUT.put_line ('Stopped queue: ' || queueName); END; / CREATE OR REPLACE PROCEDURE Remove_AQ_Queue (queueName VARCHAR2) AS BEGIN DBMS_OUTPUT.put_line ('Removing queue: ' || queueName); DBMS_AQADM.STOP_QUEUE( queue_name => queueName); DBMS_AQADM.DROP_QUEUE( Queue_name => queueName); DBMS_AQADM.DROP_QUEUE_TABLE( Queue_table => queueName || '_TAB'); DBMS_OUTPUT.put_line ('Removed: ' || queueName); END; / -- Drop queues BEGIN Remove_AQ_Queue('Test_Queue'); Remove_AQ_Queue('Test_Topic'); END; / -- Create queues BEGIN Create_AQ_Queue('Test_Queue', FALSE); Start_AQ_Queue('Test_Queue'); Create_AQ_Queue('Test_Topic', TRUE); Start_AQ_Queue('Test_Topic'); END; /
* Add/Remove Subscribers
-- Declare a VARRAY type CREATE OR REPLACE TYPE MyTopics AS VARRAY(16) OF VARCHAR2(64); / -- Add subscriber procedure CREATE OR REPLACE PROCEDURE Subscribe_MyTopics (subscriberName VARCHAR2, subTopics MyTopics) AS subscriber sys.aq$_agent; BEGIN subscriber := sys.aq$_agent(subscriberName, NULL, NULL); FOR i IN subTopics.FIRST .. subTopics.LAST LOOP DBMS_OUTPUT.put_line ('Add subscriber: ' || subscriberName || ' to ITV Topic: ' || subTopics(i)); DBMS_AQADM.ADD_SUBSCRIBER (queue_name => subTopics(i), subscriber => subscriber); END LOOP; END; / -- Remove subscriber procedure CREATE OR REPLACE PROCEDURE UnSubscribe_MyTopics (subscriberName VARCHAR2, subTopics MyTopics) AS subscriber sys.aq$_agent; BEGIN subscriber := sys.aq$_agent(subscriberName, NULL, NULL); FOR i IN subTopics.FIRST .. subTopics.LAST LOOP DBMS_OUTPUT.put_line ('Remove subscriber: ' || subscriberName || ' from ITV Topic: ' || subTopics(i)); DBMS_AQADM.REMOVE_SUBSCRIBER (queue_name => subTopics(i), subscriber => subscriber); END LOOP; END; / -- Add subscriber SET SERVEROUTPUT ON; DECLARE subTopics MyTopics; BEGIN subTopics := MyTopics('Topic1', 'Topic2', 'Topic3'); UnSubscribe_MyTopics('testId', subTopics); Subscribe_MyTopics('testId', subTopics); END; /
References
* Oracle® Streams Advanced Queuing User’s Guide and Reference10g Release 2 (10.2)
* Oracle® Streams Advanced Queuing User’s Guide and Reference Release 10.1 Part No. B10785-01