Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/_templates/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ <h3>Scale with Ray</h3>
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, concurrency=2)
predictions = ds.map_batches(HuggingFacePredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 4: Show one prediction output.
predictions.show(limit=1)
''') }}
Expand Down
14 changes: 7 additions & 7 deletions doc/source/data/batch_inference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(HuggingFacePredictor, concurrency=2)
predictions = ds.map_batches(HuggingFacePredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 3: Show one prediction output.
predictions.show(limit=1)

Expand Down Expand Up @@ -126,7 +126,7 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
predictions = ds.map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 3: Show one prediction output.
predictions.show(limit=1)

Expand Down Expand Up @@ -171,7 +171,7 @@ For how to configure batch inference, see :ref:`the configuration guide<batch_in
# Step 2: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TFPredictor, concurrency=2)
predictions = ds.map_batches(TFPredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 3: Show one prediction output.
predictions.show(limit=1)

Expand Down Expand Up @@ -280,7 +280,7 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
)
predictions.show(limit=1)

Expand Down Expand Up @@ -328,7 +328,7 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
)
predictions.show(limit=1)

Expand Down Expand Up @@ -375,7 +375,7 @@ The remaining is the same as the :ref:`Quickstart <batch_inference_quickstart>`.
# Increase this for larger datasets.
batch_size=1,
# Set the concurrency to the number of GPUs in your cluster.
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
)
predictions.show(limit=1)

Expand Down Expand Up @@ -465,6 +465,6 @@ Suppose your cluster has 4 nodes, each with 16 CPUs. To limit to at most
# Require 5 CPUs per actor (so at most 3 can fit per 16 CPU node).
num_cpus=5,
# 3 actors per node, with 4 nodes in the cluster means concurrency of 12.
concurrency=12,
compute=ray.data.ActorPoolStrategy(size=12),
)
predictions.show(limit=1)
Original file line number Diff line number Diff line change
Expand Up @@ -655,14 +655,14 @@
},
{
"cell_type": "code",
"execution_count": 19,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ds = ds.map_batches(\n",
" ObjectDetectionModel,\n",
" # Use 4 model replicas. Change this number based on the number of GPUs in your cluster.\n",
" concurrency=4,\n",
" compute=ray.data.ActorPoolStrategy(size=4),\n",
" batch_size=4, # Use the largest batch size that can fit in GPU memory.\n",
" # Specify 1 GPU per model replica. Set to 0 if you are doing CPU inference.\n",
" num_gpus=1,\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,15 +375,15 @@
},
{
"cell_type": "code",
"execution_count": 7,
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"predictions = ds.map_batches(\n",
" ImageClassifier,\n",
" concurrency=4, # Use 4 model replicas. Change this number based on the number of GPUs in your cluster.\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 model replicas. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1 if torch.cuda.is_available() else 0, # Specify GPUs per model replica (use 0 for CPU inference)\n",
" batch_size=BATCH_SIZE # Use batch size from above.\n",
")"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,15 @@
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"predictions = transformed_ds.map_batches(\n",
" ResnetModel,\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=720, # Use the largest batch size that can fit on our GPUs.\n",
")\n"
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-images.rst
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ Finally, call :meth:`Dataset.map_batches() <ray.data.Dataset.map_batches>`.

predictions = ds.map_batches(
ImageClassifier,
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
batch_size=4
)
predictions.show(3)
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-pytorch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ With Ray Datasets, you can do scalable offline batch inference with Torch models
# Step 3: Map the Predictor over the Dataset to get predictions.
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
predictions = ds.map_batches(TorchPredictor, concurrency=2)
predictions = ds.map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
# Step 4: Show one prediction output.
predictions.show(limit=1)

Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/working-with-text.rst
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ that sets up and invokes a model. Then, call

ds = (
ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
.map_batches(TextClassifier, concurrency=2)
.map_batches(TextClassifier, compute=ray.data.ActorPoolStrategy(size=2))
)

ds.show(3)
Expand Down
8 changes: 4 additions & 4 deletions doc/source/ray-overview/examples/e2e-audio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class WhisperPreprocessor:
return {"input_features": [arr for arr in extracted_features], "id": batch["id"]}


ds = ds.map_batches(WhisperPreprocessor, batch_size=2, batch_format="pandas", concurrency=1)
ds = ds.map_batches(WhisperPreprocessor, batch_size=2, batch_format="pandas", compute=ray.data.ActorPoolStrategy(size=1))
# ds.show(1)
```

Expand Down Expand Up @@ -166,12 +166,12 @@ class Transcriber:

# Transcribe audio to text tokens using Whisper.
# Use 2 workers using 1 GPU each.
ds = ds.map_batches(Transcriber, batch_size=2, batch_format="numpy", concurrency=2, num_gpus=1)
ds = ds.map_batches(Transcriber, batch_size=2, batch_format="numpy", compute=ray.data.ActorPoolStrategy(size=2), num_gpus=1)
```

Now decode the tokens into actual transcriptions. This step decouples from the previous step to prevent GPU blocks on CPU work and avoid idle time. This approach also allows independent scaling of the number of decoders from the number of Whisper replicas.

Separating the GPU work from CPU work eliminates GPU idle. The `concurrency=5` and `batch_size=32` parameters show how to use more CPU workers and bigger batch sizes than GPU workers.
Separating the GPU work from CPU work eliminates GPU idle. The `compute=ray.data.ActorPoolStrategy(size=5)` and `batch_size=32` parameters show how to use more CPU workers and bigger batch sizes than GPU workers.


```python
Expand All @@ -186,7 +186,7 @@ class Decoder:
return batch


ds = ds.map_batches(Decoder, batch_size=16, concurrency=5, batch_format="pandas") # CPU only
ds = ds.map_batches(Decoder, batch_size=16, compute=ray.data.ActorPoolStrategy(size=5), batch_format="pandas") # CPU only

# ds.take(1)
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import ray

from doggos.embed import EmbedImages

Expand Down Expand Up @@ -34,7 +35,7 @@ def transform(self, ds):
"device": "cuda",
}, # class kwargs
fn_kwargs={},
concurrency=4,
compute=ray.data.ActorPoolStrategy(size=4),
batch_size=64,
num_gpus=1,
accelerator_type="T4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def display_top_matches(url, matches):
"device": "cuda",
}, # class kwargs
fn_kwargs={},
concurrency=4,
compute=ray.data.ActorPoolStrategy(size=4),
batch_size=64,
num_gpus=1,
accelerator_type="T4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
"ds = ds.map(add_class,\n",
" num_cpus=1,\n",
" num_gpus=0,\n",
" concurrency=4)\n"
" compute=ray.data.TaskPoolStrategy(size=4))\n"
]
},
{
Expand Down Expand Up @@ -435,7 +435,7 @@
" \"device\": \"cuda\",\n",
" }, # class kwargs\n",
" fn_kwargs={}, # __call__ kwargs\n",
" concurrency=4,\n",
" compute=ray.data.ActorPoolStrategy(size=4),\n",
" batch_size=64,\n",
" num_gpus=1,\n",
" accelerator_type=\"T4\",\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@
"\n",
"Like the previous tutorials, We use the same Sentence Transformer model to convert each user request into an embedding. This allows the later retrieval step to find relevant context for each prompt. \n",
"\n",
"**Note**: since we only have 64 user requests for evaluation, we only use CPU to handle the embedding generation process, instead of using GPU. We also set the concurrency=1 which only use one CPU node. If you have a large volume of user requests, then you can consider to use multiple CPU nodes or enabel GPU for acceleration. \n"
"**Note**: since we only have 64 user requests for evaluation, we only use CPU to handle the embedding generation process, instead of using GPU. We also set the compute=ray.data.ActorPoolStrategy(size=1) which only use one CPU node. If you have a large volume of user requests, then you can consider to use multiple CPU nodes or enabel GPU for acceleration. \n"
]
},
{
Expand Down Expand Up @@ -242,7 +242,7 @@
" return batch\n",
"\n",
"# Use the Embedder class to process the batch and generate embeddings.\n",
"ds = ds.map_batches(UserRequestEmbedder, concurrency=1, batch_size=64)\n"
"ds = ds.map_batches(UserRequestEmbedder, compute=ray.data.ActorPoolStrategy(size=1), batch_size=64)\n"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"

import pandas as pd
import ray
import xgboost
from sklearn.metrics import confusion_matrix

Expand Down Expand Up @@ -66,7 +67,7 @@ def main():
test_predictions = test_dataset.map_batches(
Validator,
fn_constructor_kwargs={"loader": load_model_and_preprocessor},
concurrency=4, # Number of model replicas
compute=ray.data.ActorPoolStrategy(size=4), # Number of model replicas
batch_format="pandas",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
"inference_dataset = test_dataset.map_batches(\n",
" BatchObjectDetectionModel,\n",
" batch_size=4,\n",
" concurrency=2,\n",
" compute=ray.data.ActorPoolStrategy(size=2),\n",
" num_gpus=1\n",
")\n",
"results = inference_dataset.take_all()"
Expand Down Expand Up @@ -541,4 +541,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@
"# Apply object detection model.\n",
"ds_predicted = ds_frames.map_batches(\n",
" BatchObjectDetectionModel, \n",
" concurrency=2, # Specify 2 workers.\n",
" compute=ray.data.ActorPoolStrategy(size=2), # Specify 2 workers.\n",
" batch_size=8,\n",
" num_gpus=1 # Each worker uses 1 GPU. In total Ray Data uses 2 GPUs.\n",
")"
Expand Down
4 changes: 2 additions & 2 deletions doc/source/templates/01_batch_inference/start.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,15 @@
},
{
"cell_type": "code",
"execution_count": 12,
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"predictions = transformed_ds.map_batches(\n",
" ResnetModel,\n",
" concurrency=4, # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" compute=ray.data.ActorPoolStrategy(size=4), # Use 4 GPUs. Change this number based on the number of GPUs in your cluster.\n",
" num_gpus=1, # Specify 1 GPU per model replica.\n",
" batch_size=720, # Use the largest batch size that can fit on our GPUs\n",
")\n"
Expand Down
4 changes: 2 additions & 2 deletions doc/source/train/examples/lightgbm/lightgbm_example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"id": "3f1d0c19",
"metadata": {},
"outputs": [],
Expand All @@ -191,7 +191,7 @@
" scores = test_dataset.map_batches(\n",
" Predict, \n",
" fn_constructor_args=[result.checkpoint], \n",
" concurrency=1, \n",
" compute=ray.data.ActorPoolStrategy(size=1), \n",
" batch_format=\"pandas\"\n",
" )\n",
" \n",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
.map_batches(
TorchPredictor,
# Two workers with one GPU each
concurrency=2,
compute=ray.data.ActorPoolStrategy(size=2),
# Batch size is required if you're using GPUs.
batch_size=4,
num_gpus=1
Expand Down
35 changes: 13 additions & 22 deletions python/ray/data/grouped_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,18 @@ def map_groups(
batch of zero or more records, similar to map_batches().
zero_copy_batch: If True, each group of rows (batch) will be provided w/o
making an additional copy.
compute: This argument is deprecated. Use ``concurrency`` argument.
compute: The compute strategy to use for the map operation.

* If ``compute`` is not specified, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

* Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

* Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

* Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

batch_format: Specify ``"default"`` to use the default block format
(NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to
select ``pyarrow.Table``, or ``"numpy"`` to select
Expand All @@ -186,27 +197,7 @@ def map_groups(
to initializing the worker. Args returned from this dict will always
override the args in ``ray_remote_args``. Note: this is an advanced,
experimental feature.
concurrency: The semantics of this argument depend on the type of ``fn``:

* If ``fn`` is a function and ``concurrency`` isn't set (default), the
actual concurrency is implicitly determined by the available
resources and number of input blocks.

* If ``fn`` is a function and ``concurrency`` is an int ``n``, Ray Data
launches *at most* ``n`` concurrent tasks.

* If ``fn`` is a class and ``concurrency`` is an int ``n``, Ray Data
uses an actor pool with *exactly* ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers.

* If ``fn`` is a class and ``concurrency`` is a tuple ``(m, n, initial)``, Ray
Data uses an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

* If ``fn`` is a class and ``concurrency`` isn't set (default), this
method raises an error.

concurrency: This argument is deprecated. Use ``compute`` argument.
ray_remote_args: Additional resource requirements to request from
Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
:func:`ray.remote` for details.
Expand Down