1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use super::{Pop, Synced};

use crate::loom::sync::atomic::AtomicUsize;
use crate::runtime::task;

use std::marker::PhantomData;
use std::sync::atomic::Ordering::{Acquire, Release};

pub(crate) struct Shared<T: 'static> {
    /// Number of pending tasks in the queue. This helps prevent unnecessary
    /// locking in the hot path.
    pub(super) len: AtomicUsize,

    _p: PhantomData<T>,
}

unsafe impl<T> Send for Shared<T> {}
unsafe impl<T> Sync for Shared<T> {}

impl<T: 'static> Shared<T> {
    pub(crate) fn new() -> (Shared<T>, Synced) {
        let inject = Shared {
            len: AtomicUsize::new(0),
            _p: PhantomData,
        };

        let synced = Synced {
            is_closed: false,
            head: None,
            tail: None,
        };

        (inject, synced)
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.len() == 0
    }

    // Kind of annoying to have to include the cfg here
    #[cfg(any(tokio_taskdump, all(feature = "rt-multi-thread", not(tokio_wasi))))]
    pub(crate) fn is_closed(&self, synced: &Synced) -> bool {
        synced.is_closed
    }

    /// Closes the injection queue, returns `true` if the queue is open when the
    /// transition is made.
    pub(crate) fn close(&self, synced: &mut Synced) -> bool {
        if synced.is_closed {
            return false;
        }

        synced.is_closed = true;
        true
    }

    pub(crate) fn len(&self) -> usize {
        self.len.load(Acquire)
    }

    /// Pushes a value into the queue.
    ///
    /// This does nothing if the queue is closed.
    ///
    /// # Safety
    ///
    /// Must be called with the same `Synced` instance returned by `Inject::new`
    pub(crate) unsafe fn push(&self, synced: &mut Synced, task: task::Notified<T>) {
        if synced.is_closed {
            return;
        }

        // safety: only mutated with the lock held
        let len = self.len.unsync_load();
        let task = task.into_raw();

        // The next pointer should already be null
        debug_assert!(unsafe { task.get_queue_next().is_none() });

        if let Some(tail) = synced.tail {
            // safety: Holding the Notified for a task guarantees exclusive
            // access to the `queue_next` field.
            unsafe { tail.set_queue_next(Some(task)) };
        } else {
            synced.head = Some(task);
        }

        synced.tail = Some(task);
        self.len.store(len + 1, Release);
    }

    /// Pop a value from the queue.
    ///
    /// # Safety
    ///
    /// Must be called with the same `Synced` instance returned by `Inject::new`
    pub(crate) unsafe fn pop(&self, synced: &mut Synced) -> Option<task::Notified<T>> {
        self.pop_n(synced, 1).next()
    }

    /// Pop `n` values from the queue
    ///
    /// # Safety
    ///
    /// Must be called with the same `Synced` instance returned by `Inject::new`
    pub(crate) unsafe fn pop_n<'a>(&'a self, synced: &'a mut Synced, n: usize) -> Pop<'a, T> {
        use std::cmp;

        // safety: All updates to the len atomic are guarded by the mutex. As
        // such, a non-atomic load followed by a store is safe.
        let len = self.len.unsync_load();
        let n = cmp::min(n, len);

        // Decrement the count.
        self.len.store(len - n, Release);

        Pop::new(n, synced)
    }
}