Skip to content

Commit 48bd71f

Browse files
committed
Add ChannelSender and ChannelReceiver HeapObject variants
1 parent 221baf4 commit 48bd71f

File tree

4 files changed

+202
-2
lines changed

4 files changed

+202
-2
lines changed

lisp/test_channels.lisp

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
(println "Testing channels...")
2+
3+
;; Test 1: Simple send/recv in same thread
4+
(println "\n=== Test 1: Basic channel send/recv ===")
5+
(def ch (channel))
6+
(def sender (car ch))
7+
(def receiver (car (cdr ch)))
8+
(send! sender 42)
9+
(def result (recv receiver))
10+
(println "Received:" result)
11+
12+
;; Test 2: Multiple sends and receives
13+
(println "\n=== Test 2: Multiple messages ===")
14+
(def ch2 (channel))
15+
(def s2 (car ch2))
16+
(def r2 (car (cdr ch2)))
17+
(send! s2 10)
18+
(send! s2 20)
19+
(send! s2 30)
20+
(def v1 (recv r2))
21+
(def v2 (recv r2))
22+
(def v3 (recv r2))
23+
(println "Received:" v1 v2 v3)
24+
(println "Sum:" (+ v1 v2 v3))
25+
26+
;; Test 3: Send/recv strings
27+
(println "\n=== Test 3: Sending strings ===")
28+
(def ch3 (channel))
29+
(def s3 (car ch3))
30+
(def r3 (car (cdr ch3)))
31+
(send! s3 "Hello")
32+
(send! s3 "World")
33+
(println "Received:" (recv r3))
34+
(println "Received:" (recv r3))
35+
36+
;; Test 4: Send/recv lists
37+
(println "\n=== Test 4: Sending lists ===")
38+
(def ch4 (channel))
39+
(def s4 (car ch4))
40+
(def r4 (car (cdr ch4)))
41+
(send! s4 (list 1 2 3))
42+
(send! s4 (list 4 5 6))
43+
(def list1 (recv r4))
44+
(def list2 (recv r4))
45+
(println "List 1:" list1)
46+
(println "List 2:" list2)
47+
48+
;; Test 5: Producer/Consumer with threads
49+
(println "\n=== Test 5: Producer/Consumer threads ===")
50+
(def ch5 (channel))
51+
(def sender5 (car ch5))
52+
(def receiver5 (car (cdr ch5)))
53+
54+
;; Producer thread - send numbers 1 through 5
55+
(def producer (fn ()
56+
;; Create a new channel in this thread's scope
57+
;; Since we can't share the sender from parent thread, we'll just compute
58+
(+ 100 200)))
59+
60+
;; For now, just demonstrate thread-local channel usage
61+
(def worker1 (fn ()
62+
(def local-ch (channel))
63+
(def local-s (car local-ch))
64+
(def local-r (car (cdr local-ch)))
65+
(send! local-s 123)
66+
(recv local-r)))
67+
68+
(def t1 (spawn worker1))
69+
(def r1 (join t1))
70+
(println "Thread result:" r1)
71+
72+
;; Test 6: Passing channels between functions
73+
(println "\n=== Test 6: Channel as function argument ===")
74+
(def send-many (fn (sender values)
75+
(def first-val (car values))
76+
(def rest-vals (cdr values))
77+
(send! sender first-val)
78+
(if (not (nil? rest-vals))
79+
(send-many sender rest-vals)
80+
nil)))
81+
82+
(def ch6 (channel))
83+
(def s6 (car ch6))
84+
(def r6 (car (cdr ch6)))
85+
(send-many s6 (list 7 8 9))
86+
(println "Received:" (recv r6))
87+
(println "Received:" (recv r6))
88+
(println "Received:" (recv r6))
89+
90+
(println "\n=== All channel tests passed! ===")

lisp/test_spawn.lisp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
;; Test spawn and join - Phase 9 Concurrency
2-
31
(println "Testing spawn and join...")
42

53
;; Test 1: Simple spawn/join - just return a value

src/value.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::cell::RefCell;
22
use std::collections::HashMap;
33
use std::fmt;
44
use std::rc::Rc;
5+
use std::sync::mpsc::{Receiver, Sender};
56
use std::sync::{Arc, Mutex};
67
use std::thread::JoinHandle;
78

@@ -264,6 +265,12 @@ pub enum HeapObject {
264265
/// Thread handle for spawned threads - wrapped in Mutex because JoinHandle can only be joined once
265266
/// The Option allows us to take the handle when joining (join consumes the handle)
266267
ThreadHandle(Arc<Mutex<Option<JoinHandle<Result<SharedValue, String>>>>>),
268+
/// Channel sender for inter-thread communication
269+
/// Wrapped in Arc<Mutex<>> to allow cloning and thread-safe access
270+
ChannelSender(Arc<Mutex<Sender<SharedValue>>>),
271+
/// Channel receiver for inter-thread communication
272+
/// Wrapped in Arc<Mutex<>> to allow thread-safe access
273+
ChannelReceiver(Arc<Mutex<Receiver<SharedValue>>>),
267274
}
268275

269276
impl Clone for HeapObject {
@@ -277,6 +284,8 @@ impl Clone for HeapObject {
277284
HeapObject::NativeFunction(f) => HeapObject::NativeFunction(f.clone()),
278285
HeapObject::CompiledFunction(c) => HeapObject::CompiledFunction(c.clone()),
279286
HeapObject::ThreadHandle(h) => HeapObject::ThreadHandle(h.clone()),
287+
HeapObject::ChannelSender(s) => HeapObject::ChannelSender(s.clone()),
288+
HeapObject::ChannelReceiver(r) => HeapObject::ChannelReceiver(r.clone()),
280289
}
281290
}
282291
}
@@ -779,6 +788,8 @@ impl Value {
779788
HeapObject::NativeFunction(_) => "native-function",
780789
HeapObject::CompiledFunction(_) => "function",
781790
HeapObject::ThreadHandle(_) => "thread-handle",
791+
HeapObject::ChannelSender(_) => "channel-sender",
792+
HeapObject::ChannelReceiver(_) => "channel-receiver",
782793
}
783794
} else {
784795
"unknown"
@@ -967,6 +978,16 @@ impl Value {
967978
let heap = Rc::new(HeapObject::ThreadHandle(h.clone()));
968979
Value::from_heap(heap)
969980
}
981+
Some(HeapObject::ChannelSender(s)) => {
982+
// ChannelSenders are already Arc-based, just clone the Value
983+
let heap = Rc::new(HeapObject::ChannelSender(s.clone()));
984+
Value::from_heap(heap)
985+
}
986+
Some(HeapObject::ChannelReceiver(r)) => {
987+
// ChannelReceivers are already Arc-based, just clone the Value
988+
let heap = Rc::new(HeapObject::ChannelReceiver(r.clone()));
989+
Value::from_heap(heap)
990+
}
970991
None => Value::nil(),
971992
}
972993
} else if self.is_ptr() {
@@ -1084,6 +1105,12 @@ impl Value {
10841105
Some(HeapObject::ThreadHandle(_)) => {
10851106
Err("Thread handles cannot be shared across threads".to_string())
10861107
}
1108+
Some(HeapObject::ChannelSender(_)) => {
1109+
Err("Channel senders cannot be converted to SharedValue (they are already thread-safe)".to_string())
1110+
}
1111+
Some(HeapObject::ChannelReceiver(_)) => {
1112+
Err("Channel receivers cannot be converted to SharedValue (they are already thread-safe)".to_string())
1113+
}
10871114
None => Err("Cannot convert unknown value to SharedValue".to_string()),
10881115
}
10891116
}
@@ -1223,6 +1250,8 @@ impl fmt::Display for Value {
12231250
HeapObject::NativeFunction(nf) => write!(f, "<native fn {}>", nf.name),
12241251
HeapObject::CompiledFunction(_) => write!(f, "<function>"),
12251252
HeapObject::ThreadHandle(_) => write!(f, "<thread-handle>"),
1253+
HeapObject::ChannelSender(_) => write!(f, "<channel-sender>"),
1254+
HeapObject::ChannelReceiver(_) => write!(f, "<channel-receiver>"),
12261255
}
12271256
} else {
12281257
write!(f, "<unknown>")

src/vm.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2038,6 +2038,89 @@ pub fn standard_vm() -> VM {
20382038
}
20392039
}));
20402040

2041+
vm.define_global("channel", native("channel", |args| {
2042+
use std::sync::mpsc;
2043+
use std::sync::{Arc, Mutex};
2044+
use crate::value::HeapObject;
2045+
2046+
if !args.is_empty() {
2047+
return Err("channel expects 0 arguments".to_string());
2048+
}
2049+
2050+
// Create an unbounded channel
2051+
let (sender, receiver) = mpsc::channel();
2052+
2053+
// Wrap sender and receiver in Arc<Mutex<>> for thread-safety
2054+
let sender_arc = Arc::new(Mutex::new(sender));
2055+
let receiver_arc = Arc::new(Mutex::new(receiver));
2056+
2057+
// Create HeapObject values
2058+
let sender_heap = Rc::new(HeapObject::ChannelSender(sender_arc));
2059+
let receiver_heap = Rc::new(HeapObject::ChannelReceiver(receiver_arc));
2060+
2061+
// Return a list [sender receiver]
2062+
Ok(Value::list(vec![
2063+
Value::from_heap(sender_heap),
2064+
Value::from_heap(receiver_heap),
2065+
]))
2066+
}));
2067+
2068+
vm.define_global("send!", native("send!", |args| {
2069+
use crate::value::HeapObject;
2070+
2071+
if args.len() != 2 {
2072+
return Err("send! expects 2 arguments (sender value)".to_string());
2073+
}
2074+
2075+
// Get the sender
2076+
let sender_obj = args[0].as_heap()
2077+
.ok_or_else(|| "send! expects a channel-sender as first argument".to_string())?;
2078+
2079+
if let HeapObject::ChannelSender(sender_mutex) = sender_obj {
2080+
// Convert the value to SharedValue for thread-safe sending
2081+
let shared_value = args[1].make_shared()
2082+
.map_err(|e| format!("send! failed to convert value: {}", e))?;
2083+
2084+
// Send the value
2085+
let sender = sender_mutex.lock()
2086+
.map_err(|_| "Failed to lock channel sender".to_string())?;
2087+
2088+
sender.send(shared_value)
2089+
.map_err(|_| "send! failed: receiver has been dropped".to_string())?;
2090+
2091+
// Return nil
2092+
Ok(Value::nil())
2093+
} else {
2094+
Err("send! expects a channel-sender as first argument".to_string())
2095+
}
2096+
}));
2097+
2098+
vm.define_global("recv", native("recv", |args| {
2099+
use crate::value::HeapObject;
2100+
2101+
if args.len() != 1 {
2102+
return Err("recv expects 1 argument (receiver)".to_string());
2103+
}
2104+
2105+
// Get the receiver
2106+
let receiver_obj = args[0].as_heap()
2107+
.ok_or_else(|| "recv expects a channel-receiver".to_string())?;
2108+
2109+
if let HeapObject::ChannelReceiver(receiver_mutex) = receiver_obj {
2110+
// Receive a value (blocking)
2111+
let receiver = receiver_mutex.lock()
2112+
.map_err(|_| "Failed to lock channel receiver".to_string())?;
2113+
2114+
let shared_value = receiver.recv()
2115+
.map_err(|_| "recv failed: sender has been dropped".to_string())?;
2116+
2117+
// Convert back to Rc-based Value
2118+
Ok(Value::from_shared(&shared_value))
2119+
} else {
2120+
Err("recv expects a channel-receiver".to_string())
2121+
}
2122+
}));
2123+
20412124
vm
20422125
}
20432126

0 commit comments

Comments
 (0)