1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use std::prelude::v1::*;
use std::fmt;
use std::mem;
use std::sync::Arc;

use task::{self, UnparkEvent};

use {Async, IntoFuture, Poll, Future};
use stream::{Stream, Fuse};
use stack::{Stack, Drain};

/// An adaptor for a stream of futures to execute the futures concurrently, if
/// possible, delivering results as they become available.
///
/// This adaptor will buffer up a list of pending futures, and then return their
/// results in the order that they complete. This is created by the
/// `Stream::buffer_unordered` method.
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<S>
    where S: Stream,
          S::Item: IntoFuture,
{
    stream: Fuse<S>,

    // A slab of futures that are being executed. Each slot in this vector is
    // either an active future or a pointer to the next empty slot. This is used
    // to get O(1) deallocation in the slab and O(1) allocation.
    //
    // The `next_future` field is the next slot in the `futures` array that's a
    // `Slot::Next` variant. If it points to the end of the array then the array
    // is full.
    futures: Vec<Slot<<S::Item as IntoFuture>::Future>>,
    next_future: usize,

    // A list of events that will get pushed onto concurrently by our many
    // futures. This is filled in and used with the `with_unpark_event`
    // function. The `pending` list here is the last time we drained events from
    // our stack.
    stack: Arc<Stack<usize>>,
    pending: Drain<usize>,

    // Number of active futures running in the `futures` slab
    active: usize,
}

impl<S> fmt::Debug for BufferUnordered<S>
    where S: Stream + fmt::Debug,
          S::Item: IntoFuture,
          <<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
{
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        fmt.debug_struct("BufferUnordered")
            .field("stream", &self.stream)
            .field("futures", &self.futures)
            .field("next_future", &self.next_future)
            .field("stack", &self.stack)
            .field("pending", &self.pending)
            .field("active", &self.active)
            .finish()
    }
}

#[derive(Debug)]
enum Slot<T> {
    Next(usize),
    Data(T),
}

pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
    where S: Stream,
          S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
    BufferUnordered {
        stream: super::fuse::new(s),
        futures: (0..amt).map(|i| Slot::Next(i + 1)).collect(),
        next_future: 0,
        pending: Stack::new().drain(),
        stack: Arc::new(Stack::new()),
        active: 0,
    }
}

impl<S> BufferUnordered<S>
    where S: Stream,
          S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
    fn poll_pending(&mut self)
                    -> Option<Poll<Option<<S::Item as IntoFuture>::Item>,
                                   S::Error>> {
        while let Some(idx) = self.pending.next() {
            let result = match self.futures[idx] {
                Slot::Data(ref mut f) => {
                    let event = UnparkEvent::new(self.stack.clone(), idx);
                    match task::with_unpark_event(event, || f.poll()) {
                        Ok(Async::NotReady) => continue,
                        Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
                        Err(e) => Err(e),
                    }
                },
                Slot::Next(_) => continue,
            };
            self.active -= 1;
            self.futures[idx] = Slot::Next(self.next_future);
            self.next_future = idx;
            return Some(result)
        }
        None
    }
}

impl<S> Stream for BufferUnordered<S>
    where S: Stream,
          S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
    type Item = <S::Item as IntoFuture>::Item;
    type Error = <S as Stream>::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        // First up, try to spawn off as many futures as possible by filling up
        // our slab of futures.
        while self.next_future < self.futures.len() {
            let future = match try!(self.stream.poll()) {
                Async::Ready(Some(s)) => s.into_future(),
                Async::Ready(None) |
                Async::NotReady => break,
            };
            self.active += 1;
            self.stack.push(self.next_future);
            match mem::replace(&mut self.futures[self.next_future],
                               Slot::Data(future)) {
                Slot::Next(next) => self.next_future = next,
                Slot::Data(_) => panic!(),
            }
        }

        // Next, see if our list of `pending` events from last time has any
        // items, and if so process them here.
        if let Some(ret) = self.poll_pending() {
            return ret
        }

        // And finally, take a look at our stack of events, attempting to
        // process all of those.
        assert!(self.pending.next().is_none());
        self.pending = self.stack.drain();
        if let Some(ret) = self.poll_pending() {
            return ret
        }

        // If we've gotten this far then there's no events for us to process and
        // nothing was ready, so figure out if we're not done yet or if we've
        // reached the end.
        Ok(if self.active > 0 || !self.stream.is_done() {
            Async::NotReady
        } else {
            Async::Ready(None)
        })
    }
}

// Forwarding impl of Sink from the underlying stream
impl<S> ::sink::Sink for BufferUnordered<S>
    where S: ::sink::Sink + Stream,
          S::Item: IntoFuture,
{
    type SinkItem = S::SinkItem;
    type SinkError = S::SinkError;

    fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
        self.stream.start_send(item)
    }

    fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
        self.stream.poll_complete()
    }

    fn close(&mut self) -> Poll<(), S::SinkError> {
        self.stream.close()
    }
}