微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing



Write messaging applications with ODP.NET and Oracle Streams Advanced Queuing

-- Part I: Database setup required for this demo

------------------------------------------------------------------
-- sql to grant appropriate privilege to database user,SCott
------------------------------------------------------------------
sql> ALTER USER SCott ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
GRANT ALL ON DBMS_AQADM TO scott;

------------------------------------------------------------------
-- PLsql to create queue-table and queue and start queue for SCott
------------------------------------------------------------------
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table=>'scott.test_q_tab',
queue_payload_type=>'RAW',
multiple_consumers=>FALSE);

DBMS_AQADM.CREATE_QUEUE(
queue_name=>'scott.test_q',
queue_table=>'scott.test_q_tab');

DBMS_AQADM.START_QUEUE(queue_name=>'scott.test_q');
END;
/

------------------------------------------------------------------
-- PLsql to stop queue and drop queue & queue-table from SCott
------------------------------------------------------------------
BEGIN
DBMS_AQADM.STOP_QUEUE('scott.test_q');

DBMS_AQADM.DROP_QUEUE(
queue_name => 'scott.test_q',
auto_commit => TRUE);

DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'scott.test_q_tab',
force => FALSE,
auto_commit => TRUE);
END;
/
-- End of Part I,database setup.

//Part II: Demonstrates using the Listen method
//C#
using System;
using System.Text;
using Oracle.DataAccess.Client;
using Oracle.DataAccess.Types;
using System.Threading;

namespace ODPSample
{
/// <summary>
/// Demonstrates how a thread can listen and wait until a message is enqueued.
/// Once a message is enqueued,the listening thread returns from the
/// blocked Listen() method invocation and dequeues the message.
/// </summary>
class EnqueueDequeue
{
static bool s_bListenReturned = false;

static void Main(string[] args)
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection con = new OracleConnection(constr);

// Create queue
OracleAQQueue queue = new OracleAQQueue("scott.test_q",con);

try
{
// Open connection
con.open();

// Set message type for the queue
queue.MessageType = OracleAQMessageType.Raw;

// Spawning a thread which will listen for a message
ThreadStart ts = new ThreadStart(TestListen);
Thread t = new Thread(ts);
t.Start();

System.Threading.Thread.Sleep(2000);

// Begin transaction for enqueue
OracleTransaction txn = con.BeginTransaction();

// Prepare message and RAW payload
OracleAQMessage enqMsg = new OracleAQMessage();
byte[] bytePayload = { 0,1,2,3,4,5,6,7,8,9 };
enqMsg.Payload = bytePayload;

// Prepare to Enqueue
queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit;

Console.WriteLine("[Main Thread] Enqueuing a message...");
Console.WriteLine("[Main Thread] Enqueued Message Payload : "
+ ByteArrayToString(enqMsg.Payload as byte[]));
Console.WriteLine();

// Enqueue message
queue.Enqueue(enqMsg);

// Enqueue transaction commit
txn.Commit();

// Loop till Listen returns
while (!s_bListenReturned)
System.Threading.Thread.Sleep(1000);
}
catch (Exception e)
{
Console.WriteLine("Error: {0}",e.Message);
}
finally
{
// Close/dispose objects
queue.dispose();
con.Close();
con.dispose();
}
}

static void TestListen()
{
// Create connection
string constr = "user id=scott;password=Pwd4Sct;data source=oracle";
OracleConnection conListen = new OracleConnection(constr);

// Create queue
OracleAQQueue queueListen = new OracleAQQueue("scott.test_q",conListen);

try
{
// Open the connection for Listen thread.
// Connection blocked on Listen thread can not be used for other DB
// operations
conListen.open();

// Set message type for the queue
queueListen.MessageType = OracleAQMessageType.Raw;

// Listen
queueListen.Listen(null);

Console.WriteLine("[Listen Thread] Listen returned... Dequeuing...");

// Begin txn for Dequeue
OracleTransaction txn = conListen.BeginTransaction();

// Prepare to Dequeue
queueListen.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit;
queueListen.DequeueOptions.Wait = 10;

// Dequeue message
OracleAQMessage deqMsg = queueListen.Dequeue();
Console.WriteLine("[Listen Thread] Dequeued Message Payload : "
+ ByteArrayToString(deqMsg.Payload as byte[]));

// Dequeue txn commit
txn.Commit();

// Allow the main thread to exit
s_bListenReturned = true;
}
catch (Exception e)
{
Console.WriteLine("Error: {0}",e.Message);
}
finally
{
// Close/dispose objects
queueListen.dispose();
conListen.Close();
conListen.dispose();
}
}

// Function to convert byte[] to string
static private string ByteArrayToString(byte[] byteArray)
{
StringBuilder sb = new StringBuilder();
for (int n = 0; n < byteArray.Length; n++)
{
sb.Append((int.Parse(byteArray[n].ToString())).ToString("X"));
}
return sb.ToString();
}
}
}



-- Part I: Database setup required for this demo

------------------------------------------------------------------
-- sql to grant appropriate privilege to database user,SCott
------------------------------------------------------------------
sql> ALTER USER SCott ACCOUNT UNLOCK IDENTIFIED BY Pwd4Sct;
User altered.
sql> GRANT ALL ON DBMS_AQADM TO scott;

------------------------------------------------------------------
-- PLsql to create queue-table and queue and start queue for SCott
------------------------------------------------------------------
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE(
queue_table=>'scott.test_q_tab',database setup.

//Part II: Demonstrates application notification//C#using System;using System.Text;using Oracle.DataAccess.Client;using Oracle.DataAccess.Types;namespace ODPSample{ /// <summary> /// Demonstrates how the application can be notified when a message is /// available in a queue. /// </summary> class Notification { static bool isNotified = false; static void Main(string[] args) { // Create connection string constr = "user id=scott;password=Pwd4Sct;data source=oracle"; OracleConnection con = new OracleConnection(constr); // Create queue OracleAQQueue queue = new OracleAQQueue("scott.test_q",con); try { // Open connection con.open(); // Set message type for the queue queue.MessageType = OracleAQMessageType.Raw; // Add the event handler to handle the notification. The // MsgReceived method will be invoked when a message is enqueued queue.MessageAvailable += new OracleAQMessageAvailableEventHandler(Notification.MsgReceived); Console.WriteLine("Notification registered..."); // Begin txn for enqueue OracleTransaction txn = con.BeginTransaction(); Console.WriteLine("Now enqueuing message..."); // Prepare message and RAW payload OracleAQMessage enqMsg = new OracleAQMessage(); byte[] bytePayload = { 0,9 }; enqMsg.Payload = bytePayload; // Prepare to Enqueue queue.EnqueueOptions.Visibility = OracleAQVisibilityMode.OnCommit; // Enqueue message queue.Enqueue(enqMsg); Console.WriteLine("Enqueued Message Payload : " + ByteArrayToString(enqMsg.Payload as byte[])); Console.WriteLine("MessageId of Enqueued Message : " + ByteArrayToString(enqMsg.MessageId)); Console.WriteLine(); // Enqueue txn commit txn.Commit(); // Loop while waiting for notification while (isNotified == false) { System.Threading.Thread.Sleep(2000); } } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } finally { // Close/dispose objects queue.dispose(); con.Close(); con.dispose(); } } static void MsgReceived(object src,OracleAQMessageAvailableEventArgs arg) { try { Console.WriteLine("Notification Received..."); Console.WriteLine("QueueName : {0}",arg.QueueName); Console.WriteLine("Notification Type : {0}",arg.NotificationType); //following type-cast to "byte[]" is required only for .NET 1.x byte[] notifiedMsgid = (byte[]) arg.MessageId[0]; Console.WriteLine("MessageId of Notified Message : " + ByteArrayToString(notifiedMsgid)); isNotified = true; } catch (Exception e) { Console.WriteLine("Error: {0}",e.Message); } } // Function to convert byte[] to string static private string ByteArrayToString(byte[] byteArray) { StringBuilder sb = new StringBuilder(); for (int n = 0; n < byteArray.Length; n++) { sb.Append((int.Parse(byteArray[n].ToString())).ToString("X")); } return sb.ToString(); } }}

原文地址:https://www.jb51.cc/oracle/205999.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐