oracle/
aq.rs

1//! Oracle Advanced Queuing (available when `aq_unstable` feature is enabled.)
2//!
3//! **Warning:** Any type in this module is unstable. It may be changed incompatibly by minor version upgrades.
4//!
5//! # Examples
6//!
7//! ## Object type queue
8//!
9//! ```
10//! # use oracle::Error;
11//! # use oracle::test_util;
12//! # use oracle::aq;
13//! # use oracle::sql_type::Object;
14//! # let conn = test_util::connect()?;
15//!
16//! // Create a queue
17//! let objtype = conn.object_type("UDT_BOOK")?;
18//! let mut queue = aq::Queue::<Object>::new(&conn, "BOOK_QUEUE", &objtype)?;
19//!
20//! // Create a message
21//! let mut payload = objtype.new_object()?;
22//! payload.set("TITLE", &"Pebble in the Sky")?;
23//! payload.set("AUTHORS", &"Isaac Asimov")?;
24//! payload.set("PRICE", &17.0)?;
25//! let mut msg = aq::MsgProps::<Object>::new(&conn)?;
26//! msg.set_payload(&payload);
27//!
28//! // Enqueue the message to the queue
29//! queue.enqueue(&msg)?;
30//!
31//! // Dequeue a message from the queue
32//! let new_msg = queue.dequeue()?;
33//! let new_payload = new_msg.payload()?;
34//!
35//! // Compare message payloads.
36//! assert_eq!(payload.get::<String>("TITLE")?, new_payload.get::<String>("TITLE")?);
37//! assert_eq!(payload.get::<String>("AUTHORS")?, new_payload.get::<String>("AUTHORS")?);
38//! assert_eq!(payload.get::<f32>("PRICE")?, new_payload.get::<f32>("PRICE")?);
39//! # Ok::<(), Error>(())
40//! ```
41//!
42//! ## RAW data queue
43//!
44//! ```
45//! # use oracle::Error;
46//! # use oracle::test_util;
47//! # use oracle::aq;
48//! # let conn = test_util::connect()?;
49//!
50//! // Create a queue
51//! let mut queue = aq::Queue::<[u8]>::new(&conn, "RAW_QUEUE", &())?;
52//!
53//! // Create a message
54//! let payload = b"\xde\xad\xbe\xef";
55//! let mut msg = aq::MsgProps::<[u8]>::new(&conn)?;
56//! msg.set_payload(payload.as_ref());
57//!
58//! // Enqueue the message to the queue
59//! queue.enqueue(&msg)?;
60//!
61//! // Dequeue a message from the queue
62//! let new_msg = queue.dequeue()?;
63//! let new_payload = new_msg.payload()?; // returns Vec<u8>
64//!
65//! // Compare message payloads.
66//! assert_eq!(payload, new_payload.as_slice());
67//! # Ok::<(), Error>(())
68//! ```
69//!
70//! # Enqueue and dequeue more than one message in one call
71//!
72//! ```
73//! # use oracle::Error;
74//! # use oracle::test_util;
75//! # use oracle::aq;
76//! # let conn = test_util::connect()?;
77//!
78//! // Create a queue
79//! let mut queue = aq::Queue::<[u8]>::new(&conn, "RAW_QUEUE", &())?;
80//!
81//! // Create messages
82//! let payloads = [b"\xde\xad\xbe\xef", b"\xba\xce\xba\x11"];
83//! let mut messages = vec![];
84//! for payload in &payloads {
85//!     let mut msg = aq::MsgProps::<[u8]>::new(&conn)?;
86//!     msg.set_payload(payload.as_ref())?;
87//!     messages.push(msg);
88//! }
89//!
90//! // Enqueue the messages
91//! queue.enqueue_many(&messages)?;
92//!
93//! // Dequeue messages from the queue
94//! let new_messages = queue.dequeue_many(10)?;
95//!
96//! // Compare message payloads.
97//! assert_eq!(new_messages.len(), 2);
98//! assert_eq!(new_messages[0].payload()?, payloads[0]);
99//! assert_eq!(new_messages[1].payload()?, payloads[1]);
100//! # Ok::<(), Error>(())
101//! ```
102
103use crate::chkerr;
104use crate::connection::Conn;
105use crate::sql_type::Object;
106use crate::sql_type::ObjectType;
107use crate::sql_type::OracleType;
108use crate::sql_type::Timestamp;
109use crate::to_rust_slice;
110use crate::Connection;
111use crate::Context;
112use crate::DpiMsgProps;
113use crate::DpiObject;
114use crate::DpiQueue;
115use crate::Error;
116use crate::OdpiStr;
117use crate::Result;
118use odpic_sys::*;
119use std::borrow::ToOwned;
120use std::fmt;
121use std::marker::PhantomData;
122use std::os::raw::c_char;
123use std::ptr;
124use std::time::Duration;
125
126/// A trait for payload type
127///
128/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
129pub trait Payload: ToOwned {
130    type TypeInfo;
131    fn payload_type(payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>>;
132    fn get(props: &MsgProps<Self>) -> Result<Self::Owned>;
133    fn set(&self, props: &mut MsgProps<Self>) -> Result<()>;
134}
135
136impl Payload for [u8] {
137    type TypeInfo = ();
138
139    fn payload_type(_payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>> {
140        Ok(None)
141    }
142
143    fn get(props: &MsgProps<Self>) -> Result<Vec<u8>> {
144        let mut ptr = ptr::null();
145        let mut len = 0;
146        chkerr!(
147            props.ctxt(),
148            dpiMsgProps_getPayload(props.handle.raw, ptr::null_mut(), &mut ptr, &mut len)
149        );
150        Ok(to_rust_slice(ptr, len).to_vec())
151    }
152
153    fn set(&self, props: &mut MsgProps<Self>) -> Result<()> {
154        chkerr!(
155            props.ctxt(),
156            dpiMsgProps_setPayloadBytes(
157                props.handle.raw,
158                self.as_ptr() as *const c_char,
159                self.len() as u32
160            )
161        );
162        props.payload_type = None;
163        Ok(())
164    }
165}
166
167impl Payload for Object {
168    type TypeInfo = ObjectType;
169
170    fn payload_type(payload_type: &Self::TypeInfo) -> Result<Option<ObjectType>> {
171        Ok(Some(payload_type.clone()))
172    }
173
174    fn get(props: &MsgProps<Self>) -> Result<Object> {
175        let objtype = props
176            .payload_type
177            .as_ref()
178            .ok_or_else(Error::no_data_found)?;
179        let mut obj_handle = DpiObject::null();
180        chkerr!(
181            props.ctxt(),
182            dpiMsgProps_getPayload(
183                props.handle.raw,
184                &mut obj_handle.raw,
185                ptr::null_mut(),
186                ptr::null_mut()
187            )
188        );
189        Ok(Object::new(props.conn.clone(), obj_handle, objtype.clone()))
190    }
191
192    fn set(&self, props: &mut MsgProps<Self>) -> Result<()> {
193        chkerr!(
194            props.ctxt(),
195            dpiMsgProps_setPayloadObject(props.handle.raw, self.handle())
196        );
197        props.payload_type = Some(self.object_type().clone());
198        Ok(())
199    }
200}
201
202/// Advanced Queueing (AQ) queue which may be used to enqueue and dequeue messages
203///
204/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
205pub struct Queue<T>
206where
207    T: Payload + ?Sized,
208{
209    conn: Conn,
210    handle: DpiQueue,
211    payload_type: Option<ObjectType>,
212    enq_options: Option<EnqOptions>,
213    deq_options: Option<DeqOptions>,
214    phantom: PhantomData<T>,
215}
216
217impl<T> Queue<T>
218where
219    T: Payload + ?Sized,
220{
221    fn handle(&self) -> *mut dpiQueue {
222        self.handle.raw
223    }
224
225    fn ctxt(&self) -> &Context {
226        self.conn.ctxt()
227    }
228
229    /// Creates a new queue which may be used to enqueue and dequeue messages
230    /// from Advanced Queuing (AQ) queues.
231    pub fn new(
232        conn: &Connection,
233        queue_name: &str,
234        payload_type: &T::TypeInfo,
235    ) -> Result<Queue<T>> {
236        let mut handle = ptr::null_mut();
237        let name = OdpiStr::new(queue_name);
238        let payload_type = T::payload_type(payload_type)?;
239        let objtype = payload_type
240            .as_ref()
241            .map(|t| t.handle().raw)
242            .unwrap_or(ptr::null_mut());
243        chkerr!(
244            conn.ctxt(),
245            dpiConn_newQueue(conn.handle(), name.ptr, name.len, objtype, &mut handle)
246        );
247        Ok(Queue {
248            conn: conn.conn.clone(),
249            handle: DpiQueue::new(handle),
250            payload_type,
251            enq_options: None,
252            deq_options: None,
253            phantom: PhantomData,
254        })
255    }
256
257    /// Dequeues a single message from the queue.
258    pub fn dequeue(&self) -> Result<MsgProps<T>> {
259        let mut props = ptr::null_mut();
260        chkerr!(self.ctxt(), dpiQueue_deqOne(self.handle(), &mut props));
261        Ok(MsgProps::from_dpi_msg_props(
262            self.conn.clone(),
263            DpiMsgProps::new(props),
264            self.payload_type.clone(),
265        ))
266    }
267
268    /// Dequeues multiple messages from the queue.
269    pub fn dequeue_many(&self, max_size: u32) -> Result<Vec<MsgProps<T>>> {
270        let mut num_props = max_size;
271        let mut handles = Vec::<DpiMsgProps>::with_capacity(max_size as usize);
272        chkerr!(
273            self.ctxt(),
274            dpiQueue_deqMany(
275                self.handle(),
276                &mut num_props,
277                // The following code works only when
278                // the size of `*mut dpiMsgProps` equals to that of `DpiMsgProps`.
279                handles.as_mut_ptr() as *mut *mut dpiMsgProps
280            )
281        );
282        let num_props = num_props as usize;
283        unsafe {
284            handles.set_len(num_props);
285        }
286        let props: Vec<_> = handles
287            .into_iter()
288            .map(|handle| {
289                MsgProps::from_dpi_msg_props(self.conn.clone(), handle, self.payload_type.clone())
290            })
291            .collect();
292        Ok(props)
293    }
294
295    /// Enqueues a single mesasge into the queue.
296    pub fn enqueue(&self, props: &MsgProps<T>) -> Result<()> {
297        chkerr!(self.ctxt(), dpiQueue_enqOne(self.handle(), props.handle()));
298        Ok(())
299    }
300
301    /// Enqueues multiple messages into the queue.
302    ///
303    /// **Warning:** calling this function in parallel on different connections
304    /// acquired from the same pool may fail due to Oracle bug 29928074. Ensure
305    /// that this function is not run in parallel, use standalone connections or
306    /// connections from different pools, or make multiple calls to
307    /// [`Queue.enqueue`] instead. The function [`Queue.dequeue_many`]
308    /// call is not affected.
309    ///
310    /// [`Queue.enqueue`]: #method.enqueue
311    /// [`Queue.dequeue_many`]: #method.dequeue_many
312    pub fn enqueue_many<'a, I>(&'a self, props: I) -> Result<()>
313    where
314        I: IntoIterator<Item = &'a MsgProps<T>>,
315    {
316        let iter = props.into_iter();
317        let (lower, _) = iter.size_hint();
318        let mut raw_props = Vec::with_capacity(lower);
319        for msg in iter {
320            let handle = msg.handle();
321            raw_props.push(handle);
322            unsafe {
323                dpiMsgProps_addRef(handle);
324            }
325        }
326        chkerr!(
327            self.ctxt(),
328            dpiQueue_enqMany(
329                self.handle(),
330                raw_props.len() as u32,
331                raw_props.as_mut_ptr()
332            ),
333            for handle in raw_props {
334                unsafe {
335                    dpiMsgProps_release(handle);
336                }
337            }
338        );
339        for handle in raw_props {
340            unsafe {
341                dpiMsgProps_release(handle);
342            }
343        }
344        Ok(())
345    }
346
347    /// Returns a reference to the dequeue options associated with the queue. These
348    /// options affect how messages are dequeued.
349    pub fn deq_options(&mut self) -> Result<&mut DeqOptions> {
350        if self.deq_options.is_none() {
351            let mut handle = ptr::null_mut();
352            chkerr!(
353                self.ctxt(),
354                dpiQueue_getDeqOptions(self.handle(), &mut handle)
355            );
356            self.deq_options = Some(DeqOptions::new(self.ctxt().clone(), handle));
357        }
358        Ok(self.deq_options.as_mut().unwrap())
359    }
360
361    /// Returns a reference to the enqueue options associated with the queue. These
362    /// options affect how messages are enqueued.
363    pub fn enq_options(&mut self) -> Result<&mut EnqOptions> {
364        if self.enq_options.is_none() {
365            let mut handle = ptr::null_mut();
366            chkerr!(
367                self.ctxt(),
368                dpiQueue_getEnqOptions(self.handle(), &mut handle)
369            );
370            self.enq_options = Some(EnqOptions::new(self.ctxt().clone(), handle));
371        }
372        Ok(self.enq_options.as_mut().unwrap())
373    }
374}
375
376#[derive(Clone, Debug, PartialEq, Eq)]
377/// Delivery mode used for filtering messages when dequeuing messages from a queue
378///
379/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
380pub enum MessageDeliveryMode {
381    /// Dequeue only persistent messages from the queue. This is the default mode.
382    Persistent,
383    /// Dequeue only buffered messages from the queue.
384    Buffered,
385    /// Dequeue both persistent and buffered messages from the queue.
386    PersistentOrBuffered,
387}
388
389impl MessageDeliveryMode {
390    fn from_dpi_value(val: dpiMessageDeliveryMode) -> Result<MessageDeliveryMode> {
391        match val {
392            DPI_MODE_MSG_PERSISTENT => Ok(MessageDeliveryMode::Persistent),
393            DPI_MODE_MSG_BUFFERED => Ok(MessageDeliveryMode::Buffered),
394            DPI_MODE_MSG_PERSISTENT_OR_BUFFERED => Ok(MessageDeliveryMode::PersistentOrBuffered),
395            _ => Err(Error::internal_error(format!(
396                "unknown dpiMessageDeliveryMode {}",
397                val
398            ))),
399        }
400    }
401
402    fn to_dpi_value(&self) -> dpiMessageDeliveryMode {
403        match self {
404            MessageDeliveryMode::Persistent => DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode,
405            MessageDeliveryMode::Buffered => DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode,
406            MessageDeliveryMode::PersistentOrBuffered => {
407                DPI_MODE_MSG_PERSISTENT as dpiMessageDeliveryMode
408            }
409        }
410    }
411}
412
413#[derive(Clone, Debug, PartialEq, Eq)]
414/// Possible states for messages in a queue
415///
416/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
417pub enum MessageState {
418    /// The message is ready to be processed.
419    Ready,
420    /// The message is waiting for the delay time to expire.
421    Waiting,
422    /// The message has already been processed and is retained.
423    Processed,
424    /// The message has been moved to the exception queue.
425    Expired,
426}
427
428impl MessageState {
429    fn from_dpi_value(val: dpiMessageState) -> Result<MessageState> {
430        match val {
431            DPI_MSG_STATE_READY => Ok(MessageState::Ready),
432            DPI_MSG_STATE_WAITING => Ok(MessageState::Waiting),
433            DPI_MSG_STATE_PROCESSED => Ok(MessageState::Processed),
434            DPI_MSG_STATE_EXPIRED => Ok(MessageState::Expired),
435            _ => Err(Error::internal_error(format!(
436                "unknown dpiMessageState {}",
437                val
438            ))),
439        }
440    }
441}
442
443#[derive(Clone, Debug, PartialEq, Eq)]
444/// Modes that are possible when dequeuing messages from a queue
445///
446/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
447pub enum DeqMode {
448    /// Read the message without acquiring a lock on the
449    ///  message(equivalent to a SELECT statement).
450    Browse,
451    /// Read the message and obtain a write lock on the
452    /// message (equivalent to a SELECT FOR UPDATE
453    /// statement).
454    Locked,
455    /// Read the message and update or delete it. This is
456    /// the default mode. Note that the message may be
457    /// retained in the queue table based on retention
458    /// properties.
459    Remove,
460    /// Confirms receipt of the message but does not
461    /// deliver the actual message content.
462    RemoveNoData,
463}
464
465impl DeqMode {
466    fn from_dpi_value(val: dpiDeqMode) -> Result<DeqMode> {
467        match val {
468            DPI_MODE_DEQ_BROWSE => Ok(DeqMode::Browse),
469            DPI_MODE_DEQ_LOCKED => Ok(DeqMode::Locked),
470            DPI_MODE_DEQ_REMOVE => Ok(DeqMode::Remove),
471            DPI_MODE_DEQ_REMOVE_NO_DATA => Ok(DeqMode::RemoveNoData),
472            _ => Err(Error::internal_error(format!("unknown dpiDeqMode {}", val))),
473        }
474    }
475
476    fn to_dpi_value(&self) -> dpiDeqMode {
477        match self {
478            DeqMode::Browse => DPI_MODE_DEQ_BROWSE,
479            DeqMode::Locked => DPI_MODE_DEQ_LOCKED,
480            DeqMode::Remove => DPI_MODE_DEQ_REMOVE,
481            DeqMode::RemoveNoData => DPI_MODE_DEQ_REMOVE_NO_DATA,
482        }
483    }
484}
485
486#[derive(Clone, Debug, PartialEq, Eq)]
487/// method used for determining which message is to be dequeued from a queue
488///
489/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
490pub enum DeqNavigation {
491    /// Retrieves the first available message that
492    /// matches the search criteria. This resets the
493    /// position to the beginning of the queue.
494    FirstMessage,
495    /// Skips the remainder of the current transaction
496    /// group (if any) and retrieves the first message of
497    /// the next transaction group. This option can only
498    /// be used if message grouping is enabled for the
499    /// queue.
500    NextTransaction,
501    /// Retrieves the next available message that matches
502    /// the search criteria. This is the default method.
503    NextMessage,
504}
505
506impl DeqNavigation {
507    fn from_dpi_value(val: dpiDeqNavigation) -> Result<DeqNavigation> {
508        match val {
509            DPI_DEQ_NAV_FIRST_MSG => Ok(DeqNavigation::FirstMessage),
510            DPI_DEQ_NAV_NEXT_TRANSACTION => Ok(DeqNavigation::NextTransaction),
511            DPI_DEQ_NAV_NEXT_MSG => Ok(DeqNavigation::NextMessage),
512            _ => Err(Error::internal_error(format!(
513                "unknown dpiDeqNavigation {}",
514                val
515            ))),
516        }
517    }
518
519    fn to_dpi_value(&self) -> dpiDeqNavigation {
520        match self {
521            DeqNavigation::FirstMessage => DPI_DEQ_NAV_FIRST_MSG,
522            DeqNavigation::NextTransaction => DPI_DEQ_NAV_NEXT_TRANSACTION,
523            DeqNavigation::NextMessage => DPI_DEQ_NAV_NEXT_MSG,
524        }
525    }
526}
527
528#[derive(Clone, Debug, PartialEq, Eq)]
529/// visibility of messages in advanced queuing
530///
531/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
532pub enum Visibility {
533    /// The message is not part of the current transaction
534    /// but constitutes a transaction of its own.
535    Immediate,
536    /// The message is part of the current transaction.
537    /// This is the default value.
538    OnCommit,
539}
540
541impl Visibility {
542    fn from_dpi_value(val: dpiVisibility) -> Result<Visibility> {
543        match val {
544            DPI_VISIBILITY_IMMEDIATE => Ok(Visibility::Immediate),
545            DPI_VISIBILITY_ON_COMMIT => Ok(Visibility::OnCommit),
546            _ => Err(Error::internal_error(format!(
547                "unknown dpiVisibility {}",
548                val
549            ))),
550        }
551    }
552
553    fn to_dpi_value(&self) -> dpiVisibility {
554        match self {
555            Visibility::Immediate => DPI_VISIBILITY_IMMEDIATE,
556            Visibility::OnCommit => DPI_VISIBILITY_ON_COMMIT,
557        }
558    }
559}
560
561/// Options when dequeuing messages using advanced queueing
562///
563/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
564pub struct DeqOptions {
565    ctxt: Context,
566    handle: *mut dpiDeqOptions,
567}
568
569impl DeqOptions {
570    fn new(ctxt: Context, handle: *mut dpiDeqOptions) -> DeqOptions {
571        DeqOptions { ctxt, handle }
572    }
573
574    fn ctxt(&self) -> &Context {
575        &self.ctxt
576    }
577
578    /// Returns the condition that must be satisfied in order for a message to be
579    /// dequeued.
580    ///
581    /// See [`set_condition`](#method.set_condition) method for more information
582    pub fn condition(&self) -> Result<String> {
583        let mut s = OdpiStr::new("");
584        chkerr!(
585            self.ctxt(),
586            dpiDeqOptions_getCondition(self.handle, &mut s.ptr, &mut s.len)
587        );
588        Ok(s.to_string())
589    }
590
591    /// Returns the name of the consumer that is dequeuing messages.
592    ///
593    /// see [`set_consumer_name`](#method.set_consumer_name) method for more information.
594    pub fn consumer_name(&self) -> Result<String> {
595        let mut s = OdpiStr::new("");
596        chkerr!(
597            self.ctxt(),
598            dpiDeqOptions_getConsumerName(self.handle, &mut s.ptr, &mut s.len)
599        );
600        Ok(s.to_string())
601    }
602
603    ///  Returns the correlation of the message to be dequeued.
604    ///
605    ///  See [`set_correlation`](#method.set_correlation) method for more information.
606    pub fn correlation(&self) -> Result<String> {
607        let mut s = OdpiStr::new("");
608        chkerr!(
609            self.ctxt(),
610            dpiDeqOptions_getCorrelation(self.handle, &mut s.ptr, &mut s.len)
611        );
612        Ok(s.to_string())
613    }
614
615    /// Returns the mode that is to be used when dequeuing messages.
616    pub fn mode(&self) -> Result<DeqMode> {
617        let mut val = 0;
618        chkerr!(self.ctxt(), dpiDeqOptions_getMode(self.handle, &mut val));
619        DeqMode::from_dpi_value(val)
620    }
621
622    /// Returns the identifier of the specific message that is to be dequeued.
623    pub fn message_id(&self) -> Result<Vec<u8>> {
624        let mut msg = OdpiStr::new("");
625        chkerr!(
626            self.ctxt(),
627            dpiDeqOptions_getMsgId(self.handle, &mut msg.ptr, &mut msg.len)
628        );
629        Ok(msg.to_vec())
630    }
631
632    /// Returns the position of the message that is to be dequeued.
633    pub fn navigation(&self) -> Result<DeqNavigation> {
634        let mut val = 0;
635        chkerr!(
636            self.ctxt(),
637            dpiDeqOptions_getNavigation(self.handle, &mut val)
638        );
639        DeqNavigation::from_dpi_value(val)
640    }
641
642    /// Returns the transformation of the message to be dequeued.
643    ///
644    /// See [`set_transformation`](#method.set_transformation) method for more information.
645    pub fn transformation(&self) -> Result<String> {
646        let mut s = OdpiStr::new("");
647        chkerr!(
648            self.ctxt(),
649            dpiDeqOptions_getTransformation(self.handle, &mut s.ptr, &mut s.len)
650        );
651        Ok(s.to_string())
652    }
653
654    /// Returns whether the message being dequeued is part of the current
655    /// transaction or constitutes a transaction on its own.
656    pub fn visibility(&self) -> Result<Visibility> {
657        let mut val = 0;
658        chkerr!(
659            self.ctxt(),
660            dpiDeqOptions_getVisibility(self.handle, &mut val)
661        );
662        Visibility::from_dpi_value(val)
663    }
664
665    /// Returns the time to wait for a message matching the search
666    /// criteria.
667    pub fn wait(&self) -> Result<Duration> {
668        let mut val = 0;
669        chkerr!(self.ctxt(), dpiDeqOptions_getWait(self.handle, &mut val));
670        Ok(Duration::from_secs(val as u64))
671    }
672
673    /// Sets the condition which must be true for messages to be dequeued.
674    ///
675    /// The condition must be a valid boolean expression similar to the where clause
676    /// of a SQL query. The expression can include conditions on message
677    /// properties, user data properties and PL/SQL or SQL functions. User data
678    /// properties must be prefixed with tab.user_data as a qualifier to indicate
679    /// the specific column of the queue table that stores the message payload.
680    pub fn set_condition(&mut self, val: &str) -> Result<()> {
681        let val = OdpiStr::new(val);
682        chkerr!(
683            self.ctxt(),
684            dpiDeqOptions_setCondition(self.handle, val.ptr, val.len)
685        );
686        Ok(())
687    }
688
689    /// Sets the name of the consumer which will be dequeuing messages. This value
690    /// should only be set if the queue is set up for multiple consumers.
691    pub fn set_consumer_name(&mut self, val: &str) -> Result<()> {
692        let val = OdpiStr::new(val);
693        chkerr!(
694            self.ctxt(),
695            dpiDeqOptions_setConsumerName(self.handle, val.ptr, val.len)
696        );
697        Ok(())
698    }
699
700    /// Sets the correlation of the message to be dequeued.
701    ///
702    /// Special pattern  matching characters such as the percent
703    /// sign (`%`) and the underscore (`_`)
704    /// can be used. If multiple messages satisfy the pattern, the order of
705    /// dequeuing is undetermined.
706    pub fn set_correlation(&mut self, val: &str) -> Result<()> {
707        let val = OdpiStr::new(val);
708        chkerr!(
709            self.ctxt(),
710            dpiDeqOptions_setCorrelation(self.handle, val.ptr, val.len)
711        );
712        Ok(())
713    }
714
715    /// Sets the message delivery mode that is to be used when dequeuing messages.
716    pub fn set_delivery_mode(&mut self, val: &MessageDeliveryMode) -> Result<()> {
717        chkerr!(
718            self.ctxt(),
719            dpiDeqOptions_setDeliveryMode(self.handle, val.to_dpi_value())
720        );
721        Ok(())
722    }
723
724    /// Sets the mode that is to be used when dequeuing messages.
725    pub fn set_mode(&mut self, val: &DeqMode) -> Result<()> {
726        chkerr!(
727            self.ctxt(),
728            dpiDeqOptions_setMode(self.handle, val.to_dpi_value())
729        );
730        Ok(())
731    }
732
733    /// Sets the identifier of the specific message to be dequeued.
734    pub fn set_message_id(&mut self, val: &[u8]) -> Result<()> {
735        let ptr = if val.is_empty() {
736            ptr::null()
737        } else {
738            val.as_ptr() as *const c_char
739        };
740        let len = val.len() as u32;
741        chkerr!(self.ctxt(), dpiDeqOptions_setMsgId(self.handle, ptr, len));
742        Ok(())
743    }
744
745    /// Sets the position in the queue of the message that is to be dequeued.
746    pub fn set_navigation(&mut self, val: &DeqNavigation) -> Result<()> {
747        chkerr!(
748            self.ctxt(),
749            dpiDeqOptions_setNavigation(self.handle, val.to_dpi_value())
750        );
751        Ok(())
752    }
753
754    /// Sets the transformation of the message to be dequeued.
755    ///
756    /// The transformation
757    /// is applied after the message is dequeued but before it is returned to the
758    /// application. It must be created using DBMS_TRANSFORM.
759    pub fn set_transformation(&mut self, val: &str) -> Result<()> {
760        let val = OdpiStr::new(val);
761        chkerr!(
762            self.ctxt(),
763            dpiDeqOptions_setTransformation(self.handle, val.ptr, val.len)
764        );
765        Ok(())
766    }
767
768    /// Sets whether the message being dequeued is part of the current transaction
769    /// or constitutes a transaction on its own.
770    pub fn set_visibility(&mut self, val: &Visibility) -> Result<()> {
771        chkerr!(
772            self.ctxt(),
773            dpiDeqOptions_setVisibility(self.handle, val.to_dpi_value())
774        );
775        Ok(())
776    }
777
778    /// Set the time to wait for a message matching the search
779    /// criteria.
780    pub fn set_wait(&mut self, val: &Duration) -> Result<()> {
781        let secs = val.as_secs().try_into().unwrap_or(u32::MAX);
782        chkerr!(self.ctxt(), dpiDeqOptions_setWait(self.handle, secs));
783        Ok(())
784    }
785}
786
787impl fmt::Debug for DeqOptions {
788    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
789        write!(f, "DeqOptions {{ handle: {:?} }}", self.handle)
790    }
791}
792
793/// Options when enqueuing messages using advanced queueing
794///
795/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
796pub struct EnqOptions {
797    ctxt: Context,
798    handle: *mut dpiEnqOptions,
799}
800
801impl EnqOptions {
802    fn new(ctxt: Context, handle: *mut dpiEnqOptions) -> EnqOptions {
803        EnqOptions { ctxt, handle }
804    }
805
806    fn ctxt(&self) -> &Context {
807        &self.ctxt
808    }
809
810    /// Returns the transformation of the message to be enqueued.
811    ///
812    /// See [`set_transformation`](#method.set_transformation) method for more information.
813    pub fn transformation(&self) -> Result<String> {
814        let mut s = OdpiStr::new("");
815        chkerr!(
816            self.ctxt(),
817            dpiEnqOptions_getTransformation(self.handle, &mut s.ptr, &mut s.len)
818        );
819        Ok(s.to_string())
820    }
821
822    /// Returns whether the message being enqueued is part of the current
823    /// transaction or constitutes a transaction on its own.
824    pub fn visibility(&self) -> Result<Visibility> {
825        let mut val = 0;
826        chkerr!(
827            self.ctxt(),
828            dpiEnqOptions_getVisibility(self.handle, &mut val)
829        );
830        Visibility::from_dpi_value(val)
831    }
832
833    /// Sets the message delivery mode that is to be used when enqueuing messages.
834    pub fn set_delivery_mode(&mut self, val: &MessageDeliveryMode) -> Result<()> {
835        chkerr!(
836            self.ctxt(),
837            dpiEnqOptions_setDeliveryMode(self.handle, val.to_dpi_value())
838        );
839        Ok(())
840    }
841
842    /// Sets the transformation of the message to be enqueued.
843    ///
844    /// The transformation
845    /// is applied after the message is enqueued but before it is returned to the
846    /// application. It must be created using DBMS_TRANSFORM.
847    pub fn set_transformation(&mut self, val: &str) -> Result<()> {
848        let val = OdpiStr::new(val);
849        chkerr!(
850            self.ctxt(),
851            dpiEnqOptions_setTransformation(self.handle, val.ptr, val.len)
852        );
853        Ok(())
854    }
855
856    /// Sets whether the message being enqueued is part of the current transaction
857    /// or constitutes a transaction on its own.
858    pub fn set_visibility(&mut self, val: &Visibility) -> Result<()> {
859        chkerr!(
860            self.ctxt(),
861            dpiEnqOptions_setVisibility(self.handle, val.to_dpi_value())
862        );
863        Ok(())
864    }
865}
866
867impl fmt::Debug for EnqOptions {
868    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
869        write!(f, "EnqOptions {{ handle: {:?} }}", self.handle)
870    }
871}
872
873/// Properties of messages that are enqueued and dequeued using advanced queuing
874///
875/// **Warning:** The type is unstable. It may be changed incompatibly by minor version upgrades.
876#[derive(Clone)]
877pub struct MsgProps<T>
878where
879    T: Payload + ?Sized,
880{
881    conn: Conn,
882    handle: DpiMsgProps,
883    payload_type: Option<ObjectType>,
884    phantom: PhantomData<T>,
885}
886
887impl<T> MsgProps<T>
888where
889    T: Payload + ?Sized,
890{
891    fn handle(&self) -> *mut dpiMsgProps {
892        self.handle.raw()
893    }
894
895    fn ctxt(&self) -> &Context {
896        self.conn.ctxt()
897    }
898
899    /// Creates a new message properties
900    pub fn new(conn: &Connection) -> Result<MsgProps<T>> {
901        let mut handle = ptr::null_mut();
902        chkerr!(conn.ctxt(), dpiConn_newMsgProps(conn.handle(), &mut handle));
903        Ok(MsgProps {
904            conn: conn.conn.clone(),
905            handle: DpiMsgProps::new(handle),
906            payload_type: None,
907            phantom: PhantomData,
908        })
909    }
910
911    fn from_dpi_msg_props(
912        conn: Conn,
913        handle: DpiMsgProps,
914        payload_type: Option<ObjectType>,
915    ) -> MsgProps<T> {
916        MsgProps {
917            conn,
918            handle,
919            payload_type,
920            phantom: PhantomData,
921        }
922    }
923
924    /// Returns the number of attempts that have been made to dequeue a message.
925    pub fn num_attempts(&self) -> Result<i32> {
926        let mut val = 0;
927        chkerr!(
928            self.ctxt(),
929            dpiMsgProps_getNumAttempts(self.handle(), &mut val)
930        );
931        Ok(val)
932    }
933
934    /// Returns the correlation supplied by the producer when the message was
935    /// enqueued.
936    pub fn correlation(&self) -> Result<String> {
937        let mut s = OdpiStr::new("");
938        chkerr!(
939            self.ctxt(),
940            dpiMsgProps_getCorrelation(self.handle(), &mut s.ptr, &mut s.len)
941        );
942        Ok(s.to_string())
943    }
944
945    /// Returns the duration the enqueued message will be delayed.
946    pub fn delay(&self) -> Result<Duration> {
947        let mut secs = 0;
948        chkerr!(self.ctxt(), dpiMsgProps_getDelay(self.handle(), &mut secs));
949        Ok(Duration::from_secs(secs as u64))
950    }
951
952    /// Returns the mode that was used to deliver the message.
953    pub fn delivery_mode(&self) -> Result<MessageDeliveryMode> {
954        let mut val = 0;
955        chkerr!(
956            self.ctxt(),
957            dpiMsgProps_getDeliveryMode(self.handle(), &mut val)
958        );
959        MessageDeliveryMode::from_dpi_value(val)
960    }
961
962    /// Returns the time that the message was enqueued.
963    pub fn enq_time(&self) -> Result<Timestamp> {
964        let mut val = Default::default();
965        chkerr!(self.ctxt(), dpiMsgProps_getEnqTime(self.handle(), &mut val));
966        Ok(Timestamp::from_dpi_timestamp(&val, &OracleType::Date))
967    }
968
969    /// Returns the name of the queue to which the message is moved if it cannot be
970    /// processed successfully.
971    ///
972    /// See [`set_exception_queue`](#method.set_exception_queue) method for more information.
973    pub fn exception_queue(&self) -> Result<String> {
974        let mut s = OdpiStr::new("");
975        chkerr!(
976            self.ctxt(),
977            dpiMsgProps_getExceptionQ(self.handle(), &mut s.ptr, &mut s.len)
978        );
979        Ok(s.to_string())
980    }
981
982    /// Returns the duration the message is available to be dequeued.
983    ///
984    /// See [`set_expiration`](#method.set_expiration) method for more information.
985    pub fn expiration(&self) -> Result<Duration> {
986        let mut val = 0;
987        chkerr!(
988            self.ctxt(),
989            dpiMsgProps_getExpiration(self.handle(), &mut val)
990        );
991        Ok(Duration::from_secs(val as u64))
992    }
993
994    /// Returns the id of the message in the queue that generated this message. No
995    /// value is available until the message has been enqueued or dequeued.
996    pub fn message_id(&self) -> Result<Vec<u8>> {
997        let mut msg = OdpiStr::new("");
998        chkerr!(
999            self.ctxt(),
1000            dpiMsgProps_getMsgId(self.handle(), &mut msg.ptr, &mut msg.len)
1001        );
1002        Ok(msg.to_vec())
1003    }
1004
1005    /// Returns the id of the message in the last queue that generated this
1006    /// message.
1007    ///
1008    /// See [`set_original_message_id`](#method.set_original_message_id) for more information.
1009    pub fn original_message_id(&self) -> Result<Vec<u8>> {
1010        let mut msg = OdpiStr::new("");
1011        chkerr!(
1012            self.ctxt(),
1013            dpiMsgProps_getOriginalMsgId(self.handle(), &mut msg.ptr, &mut msg.len)
1014        );
1015        Ok(msg.to_vec())
1016    }
1017
1018    /// Returns the payload associated with the message properties.
1019    ///
1020    /// The payload is available after the a call to [`Queue.dequeue`] or
1021    /// [`Queue.dequeue_many`]
1022    ///
1023    /// [`Queue.dequeue`]: Queue#method.dequeue
1024    /// [`Queue.dequeue_many`]: Queue#method.dequeue_many
1025    pub fn payload(&self) -> Result<T::Owned> {
1026        T::get(self)
1027    }
1028
1029    /// Returns the priority assigned to the message.
1030    ///
1031    /// See [`set_priority`](#method.set_priority) method for more information.
1032    pub fn priority(&self) -> Result<i32> {
1033        let mut val = 0;
1034        chkerr!(
1035            self.ctxt(),
1036            dpiMsgProps_getPriority(self.handle(), &mut val)
1037        );
1038        Ok(val)
1039    }
1040
1041    /// Returns the state of the message at the time of dequeue.
1042    pub fn state(&self) -> Result<MessageState> {
1043        let mut val = 0;
1044        chkerr!(self.ctxt(), dpiMsgProps_getState(self.handle(), &mut val));
1045        MessageState::from_dpi_value(val)
1046    }
1047
1048    /// Sets the correlation of the message to be dequeued.
1049    ///
1050    /// Special pattern matching characters such as the percent
1051    /// sign (`%`) and the underscore (`_`) can be used. If multiple
1052    /// messages satisfy the pattern, the order of dequeuing is
1053    /// undetermined.
1054    pub fn set_correlation(&mut self, val: &str) -> Result<()> {
1055        let val = OdpiStr::new(val);
1056        chkerr!(
1057            self.ctxt(),
1058            dpiMsgProps_setCorrelation(self.handle(), val.ptr, val.len)
1059        );
1060        Ok(())
1061    }
1062
1063    /// Sets the number of seconds to delay the message before it can be dequeued.
1064    ///
1065    /// Messages enqueued with a delay are put into the [`MessageState::Waiting`]
1066    /// state. When the delay expires the message is put into the
1067    /// [`MessageState::Ready`] state. Dequeuing directly by message id overrides this
1068    /// delay specification. Note that delay processing requires the queue monitor
1069    /// to be started.
1070    ///
1071    /// [`MessageState::Waiting`]: MessageState#variant.Waiting
1072    /// [`MessageState::Ready`]: MessageState#variant.Ready
1073    pub fn set_delay(&mut self, val: &Duration) -> Result<()> {
1074        let secs = val
1075            .as_secs()
1076            .try_into()
1077            .map_err(|_| Error::out_of_range(format!("too long duration {:?}", val)))?;
1078        chkerr!(self.ctxt(), dpiMsgProps_setDelay(self.handle(), secs));
1079        Ok(())
1080    }
1081
1082    /// Sets the name of the queue to which the message is moved if it cannot be
1083    /// processed successfully.
1084    ///
1085    /// Messages are moved if the number of unsuccessful
1086    /// dequeue attempts has reached the maximum allowed number or if the message
1087    /// has expired. All messages in the exception queue are in the
1088    /// [`MessageState::Expired`] state.
1089    ///
1090    /// [`MessageState::Expired`]: MessageState#variant.Expired
1091    pub fn set_exception_queue(&mut self, val: &str) -> Result<()> {
1092        let val = OdpiStr::new(val);
1093        chkerr!(
1094            self.ctxt(),
1095            dpiMsgProps_setExceptionQ(self.handle(), val.ptr, val.len)
1096        );
1097        Ok(())
1098    }
1099
1100    /// Sets the number of seconds the message is available to be dequeued.
1101    ///
1102    /// This value is an offset from the delay. Expiration processing requires the queue
1103    /// monitor to be running. Until this time elapses, the messages are in the
1104    /// queue in the state [`MessageState::Ready`]. After this time elapses messages
1105    /// are moved to the exception queue in the [`MessageState::Expired`] state.
1106    ///
1107    /// [`MessageState::Ready`]: MessageState#variant.Ready
1108    /// [`MessageState::Expired`]: MessageState#variant.Expired
1109    pub fn set_expiration(&mut self, val: &Duration) -> Result<()> {
1110        let secs = val
1111            .as_secs()
1112            .try_into()
1113            .map_err(|_| Error::out_of_range(format!("too long duration {:?}", val)))?;
1114        chkerr!(self.ctxt(), dpiMsgProps_setExpiration(self.handle(), secs));
1115        Ok(())
1116    }
1117
1118    /// Sets the id of the message in the last queue that generated this
1119    /// message.
1120    pub fn set_original_message_id(&mut self, val: &[u8]) -> Result<()> {
1121        let ptr = if val.is_empty() {
1122            ptr::null()
1123        } else {
1124            val.as_ptr() as *const c_char
1125        };
1126        let len = val.len() as u32;
1127        chkerr!(
1128            self.ctxt(),
1129            dpiMsgProps_setOriginalMsgId(self.handle(), ptr, len)
1130        );
1131        Ok(())
1132    }
1133
1134    /// Sets the payload for the message.
1135    ///
1136    /// This value will be used when the message is enqueued using
1137    /// [`Queue.enqueue`] or [`Queue.enqueue_many`].
1138    ///
1139    /// [`Queue.enqueue`]: Queue#method.enqueue
1140    /// [`Queue.enqueue_many`]: Queue#method.enqueue_many
1141    pub fn set_payload(&mut self, val: &T) -> Result<()> {
1142        val.set(self)
1143    }
1144
1145    /// Sets the priority assigned to the message.
1146    ///
1147    /// A smaller number indicates a higher priority. The priority can
1148    /// be any number, including negative numbers.
1149    pub fn set_priority(&mut self, val: i32) -> Result<()> {
1150        chkerr!(self.ctxt(), dpiMsgProps_setPriority(self.handle(), val));
1151        Ok(())
1152    }
1153}
1154
1155impl<T> fmt::Debug for MsgProps<T>
1156where
1157    T: Payload,
1158{
1159    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1160        write!(f, "MsgProps {{ handle: {:?} }}", self.handle())
1161    }
1162}