Skip to content

Commit 9503e91

Browse files
committed
Initial commit
0 parents  commit 9503e91

File tree

6 files changed

+398
-0
lines changed

6 files changed

+398
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
**/*.rs.bk

Cargo.lock

+146
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
cargo-features = [ "edition" ]
2+
3+
[package]
4+
name = "tbd-again"
5+
version = "0.1.0"
6+
authors = ["Florian Gilcher <[email protected]>"]
7+
edition = "2018"
8+
9+
[dependencies]
10+
futures-preview = "0.3.0-alpha.2"

src/main.rs

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
#![feature(rust_2018_preview, async_await, await_macro, futures_api, pin, arbitrary_self_types)]
2+
3+
extern crate futures;
4+
5+
mod mini_exec;
6+
mod types;
7+
8+
use std::future::FutureObj;
9+
use std::task::Executor;
10+
use futures::StreamExt;
11+
use futures::stream;
12+
use futures::future;
13+
14+
use crate::types::*;
15+
16+
#[derive(Debug, Clone)]
17+
struct Post {
18+
id: u64,
19+
content: String
20+
}
21+
22+
#[derive(Debug, Clone)]
23+
struct Comment {
24+
id: u64,
25+
content: String,
26+
post_id: u64
27+
}
28+
29+
struct MemoryGateway {
30+
posts: Vec<Post>,
31+
comments: Vec<Comment>
32+
}
33+
34+
impl Gateway for MemoryGateway {}
35+
36+
struct BlogRepository {
37+
gateway: MemoryGateway
38+
}
39+
40+
impl Repository for BlogRepository {}
41+
42+
struct Posts;
43+
44+
impl Relation<BlogRepository> for Posts {
45+
type PrimaryKey = u64;
46+
type Model = Post;
47+
type Error = ();
48+
49+
type Stream = stream::Iter<std::vec::IntoIter<Post>>;
50+
type Future = futures::future::Ready<Option<Post>>;
51+
52+
fn all(&self, repo: &BlogRepository) -> Self::Stream {
53+
stream::iter(repo.gateway.posts.clone().into_iter())
54+
}
55+
56+
fn one(&self, id: u64, repo: &BlogRepository) -> Self::Future {
57+
future::ready(repo.gateway.posts.iter().find(|p| p.id == id).cloned())
58+
}
59+
}
60+
61+
62+
struct Comments;
63+
64+
impl Relation<BlogRepository> for Comments {
65+
type PrimaryKey = u64;
66+
type Model = Comment;
67+
type Error = ();
68+
69+
type Stream = stream::Iter<std::vec::IntoIter<Comment>>;
70+
type Future = futures::future::Ready<Option<Comment>>;
71+
72+
fn all(&self, repo: &BlogRepository) -> Self::Stream {
73+
stream::iter(repo.gateway.comments.clone().into_iter())
74+
}
75+
76+
fn one(&self, id: u64, repo: &BlogRepository) -> Self::Future {
77+
future::ready(repo.gateway.comments.iter().find(|c| c.id == id).cloned())
78+
}
79+
}
80+
81+
async fn read_from_repos() {
82+
let mut posts = vec![];
83+
84+
for id in 1..=3 {
85+
posts.push(
86+
Post {
87+
id: id,
88+
content: format!("Post number {}", id)
89+
}
90+
)
91+
}
92+
93+
let mut comments = vec![];
94+
95+
for id in 1..=9 {
96+
let post_id = id % 3;
97+
comments.push(
98+
Comment {
99+
id: id,
100+
content: format!("Comment number {} on post {}", id, post_id + 1),
101+
post_id: post_id + 1
102+
}
103+
)
104+
}
105+
106+
let gateway = MemoryGateway { posts, comments };
107+
let repos = BlogRepository { gateway };
108+
109+
let f1 = Posts.all(&repos).for_each(|item|{
110+
println!("{:?}", item);
111+
future::ready(())
112+
});
113+
114+
await!(f1);
115+
116+
let f2 = Comments.all(&repos).for_each(|item|{
117+
println!("{:?}", item);
118+
future::ready(())
119+
});
120+
121+
await!(f2);
122+
123+
let model = await!(Comments.one(2, &repos));
124+
125+
println!("{:?}", model);
126+
}
127+
128+
fn main() {
129+
let executor = mini_exec::Executor::new();
130+
131+
(&executor).spawn_obj(FutureObj::new(Box::new(
132+
read_from_repos()
133+
))).unwrap();
134+
135+
executor.run();
136+
}

src/mini_exec.rs

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
#![feature(rust_2018_preview, async_await, await_macro, futures_api, pin, arbitrary_self_types)]
2+
3+
use std::future::{Future, FutureObj};
4+
use std::mem::PinMut;
5+
6+
use std::sync::{Arc, Mutex};
7+
use std::sync::mpsc::{sync_channel, SyncSender, SendError, Receiver};
8+
use std::task::{
9+
self,
10+
Executor as ExecutorTrait,
11+
local_waker_from_nonlocal,
12+
Poll,
13+
SpawnErrorKind,
14+
SpawnObjError,
15+
Wake,
16+
};
17+
use std::time::Duration;
18+
19+
static TIMEOUT: Duration = Duration::from_millis(100);
20+
21+
pub struct Executor {
22+
task_sender: SyncSender<Arc<Task>>,
23+
task_receiver: Receiver<Arc<Task>>,
24+
}
25+
26+
impl<'a> ExecutorTrait for &'a Executor {
27+
fn spawn_obj(&mut self, future: FutureObj<'static, ()>)
28+
-> Result<(), SpawnObjError>
29+
{
30+
let task = Arc::new(Task {
31+
future: Mutex::new(Some(future)),
32+
task_sender: self.task_sender.clone(),
33+
});
34+
35+
self.task_sender.send(task).map_err(|SendError(task)| {
36+
SpawnObjError {
37+
kind: SpawnErrorKind::shutdown(),
38+
future: task.future.lock().unwrap().take().unwrap(),
39+
}
40+
})
41+
}
42+
}
43+
44+
struct Task {
45+
future: Mutex<Option<FutureObj<'static, ()>>>,
46+
task_sender: SyncSender<Arc<Task>>,
47+
}
48+
49+
impl Wake for Task {
50+
fn wake(arc_self: &Arc<Self>) {
51+
let cloned = arc_self.clone();
52+
let _ = arc_self.task_sender.send(cloned);
53+
}
54+
}
55+
56+
impl Executor {
57+
pub fn new() -> Self {
58+
let (task_sender, task_receiver) = sync_channel(1000);
59+
Executor { task_sender, task_receiver }
60+
}
61+
62+
pub fn run(&self) {
63+
let mut executor = &*self;
64+
while let Ok(task) = self.task_receiver.recv_timeout(TIMEOUT) {
65+
let mut future_slot = task.future.lock().unwrap();
66+
if let Some(mut future) = future_slot.take() {
67+
// Should we use the ref version here? might be nice to start
68+
// w/o futures crate at first to show that it can be done,
69+
// and just mention that there's a simple function to avoid
70+
// the clone if anyone asks?
71+
let waker = local_waker_from_nonlocal(task.clone());
72+
let cx = &mut task::Context::new(&waker, &mut executor);
73+
if let Poll::Pending = PinMut::new(&mut future).poll(cx) {
74+
*future_slot = Some(future);
75+
}
76+
}
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)