Skip to content

Commit 1a0f48c

Browse files
authored
feat: support Queue-based streaming inputs for optimize via new recipe (#606)
1 parent 9836419 commit 1a0f48c

File tree

4 files changed

+283
-64
lines changed

4 files changed

+283
-64
lines changed

README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,63 @@ if __name__ == "__main__":
554554
</details>
555555

556556

557+
<details>
558+
<summary> ✅ Use a <code>Queue</code> as input for optimizing data</summary>
559+
&nbsp;
560+
561+
Sometimes you don’t have a static list of inputs to optimize — instead, you have a stream of data coming in over time. In such cases, you can use a multiprocessing.Queue to feed data into the optimize() function.
562+
563+
- This is especially useful when you're collecting data from a remote source like a web scraper, socket, or API.
564+
565+
- You can also use this setup to store `replay buffer` data during reinforcement learning and later stream it back for training.
566+
567+
```python
568+
from multiprocessing import Process, Queue
569+
from litdata.processing.data_processor import ALL_DONE
570+
import litdata as ld
571+
import time
572+
573+
def yield_numbers():
574+
for i in range(1000):
575+
time.sleep(0.01)
576+
yield (i, i**2)
577+
578+
def data_producer(q: Queue):
579+
for item in yield_numbers():
580+
q.put(item)
581+
582+
q.put(ALL_DONE) # Sentinel value to signal completion
583+
584+
def fn(index):
585+
return index # Identity function for demo
586+
587+
if __name__ == "__main__":
588+
q = Queue(maxsize=100)
589+
590+
producer = Process(target=data_producer, args=(q,))
591+
producer.start()
592+
593+
ld.optimize(
594+
fn=fn, # Function to process each item
595+
queue=q, # 👈 Stream data from this queue
596+
output_dir="fast_data", # Where to store optimized data
597+
num_workers=2,
598+
chunk_size=100,
599+
mode="overwrite",
600+
)
601+
602+
producer.join()
603+
```
604+
605+
📌 Note: Using queues to optimize your dataset impacts optimization time, not streaming speed.
606+
607+
> Irrespective of number of workers, you only need to put one sentinel value to signal completion.
608+
>
609+
> It'll be handled internally by LitData.
610+
611+
</details>
612+
613+
557614
<details>
558615
<summary> ✅ LLM Pre-training </summary>
559616
&nbsp;

0 commit comments

Comments
 (0)