Skip to main content

wasm_bindgen_futures/
stream.rs

1//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
2//!
3//! Analogous to the promise to future conversion, this module allows
4//! turning objects implementing the async iterator protocol into `Stream`s
5//! that produce values that can be awaited from.
6//!
7
8use crate::JsFuture;
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use futures_core::stream::Stream;
13use js_sys::{AsyncIterator, IteratorNext};
14use wasm_bindgen::convert::FromWasmAbi;
15use wasm_bindgen::{prelude::*, JsGeneric};
16
17/// A `Stream` that yields values from an underlying `AsyncIterator`.
18pub struct JsStream<T = JsValue> {
19    iter: AsyncIterator<T>,
20    next: Option<JsFuture<IteratorNext<T>>>,
21    done: bool,
22}
23
24impl<T: 'static + FromWasmAbi> JsStream<T> {
25    fn next_future(&self) -> Result<JsFuture<IteratorNext<T>>, JsValue> {
26        self.iter.next_iterator().map(JsFuture::from)
27    }
28}
29
30impl<T> From<AsyncIterator<T>> for JsStream<T> {
31    fn from(iter: AsyncIterator<T>) -> Self {
32        JsStream {
33            iter,
34            next: None,
35            done: false,
36        }
37    }
38}
39
40impl<T: 'static + JsGeneric + FromWasmAbi + Unpin> Stream for JsStream<T> {
41    type Item = Result<T, JsValue>;
42
43    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
44        if self.done {
45            return Poll::Ready(None);
46        }
47
48        let future = match self.next.as_mut() {
49            Some(val) => val,
50            None => match self.next_future() {
51                Ok(val) => {
52                    self.next = Some(val);
53                    self.next.as_mut().unwrap()
54                }
55                Err(e) => {
56                    self.done = true;
57                    return Poll::Ready(Some(Err(e)));
58                }
59            },
60        };
61
62        match Pin::new(future).poll(cx) {
63            Poll::Ready(res) => match res {
64                Ok(next) => {
65                    if next.done() {
66                        self.done = true;
67                        Poll::Ready(None)
68                    } else {
69                        self.next.take();
70                        Poll::Ready(Some(Ok(next.value())))
71                    }
72                }
73                Err(e) => {
74                    self.done = true;
75                    Poll::Ready(Some(Err(e)))
76                }
77            },
78            Poll::Pending => Poll::Pending,
79        }
80    }
81}