wasm_bindgen_futures/
stream.rs1use crate::JsFuture;
9use core::future::Future;
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use futures_core::stream::Stream;
13use js_sys::{AsyncIterator, IteratorNext};
14use wasm_bindgen::convert::FromWasmAbi;
15use wasm_bindgen::{prelude::*, JsGeneric};
16
17pub struct JsStream<T = JsValue> {
19 iter: AsyncIterator<T>,
20 next: Option<JsFuture<IteratorNext<T>>>,
21 done: bool,
22}
23
24impl<T: 'static + FromWasmAbi> JsStream<T> {
25 fn next_future(&self) -> Result<JsFuture<IteratorNext<T>>, JsValue> {
26 self.iter.next_iterator().map(JsFuture::from)
27 }
28}
29
30impl<T> From<AsyncIterator<T>> for JsStream<T> {
31 fn from(iter: AsyncIterator<T>) -> Self {
32 JsStream {
33 iter,
34 next: None,
35 done: false,
36 }
37 }
38}
39
40impl<T: 'static + JsGeneric + FromWasmAbi + Unpin> Stream for JsStream<T> {
41 type Item = Result<T, JsValue>;
42
43 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
44 if self.done {
45 return Poll::Ready(None);
46 }
47
48 let future = match self.next.as_mut() {
49 Some(val) => val,
50 None => match self.next_future() {
51 Ok(val) => {
52 self.next = Some(val);
53 self.next.as_mut().unwrap()
54 }
55 Err(e) => {
56 self.done = true;
57 return Poll::Ready(Some(Err(e)));
58 }
59 },
60 };
61
62 match Pin::new(future).poll(cx) {
63 Poll::Ready(res) => match res {
64 Ok(next) => {
65 if next.done() {
66 self.done = true;
67 Poll::Ready(None)
68 } else {
69 self.next.take();
70 Poll::Ready(Some(Ok(next.value())))
71 }
72 }
73 Err(e) => {
74 self.done = true;
75 Poll::Ready(Some(Err(e)))
76 }
77 },
78 Poll::Pending => Poll::Pending,
79 }
80 }
81}