Skip to content

Commit

Permalink
remove flag
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv committed Dec 30, 2023
1 parent 3185aad commit d5230fa
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 207 deletions.
1 change: 0 additions & 1 deletion python/cugraph-pyg/cugraph_pyg/sampler/cugraph_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def __neighbor_sample(
# conversion required by cugraph api
list(num_neighbors),
replace,
with_edge_properties=True,
)

if self.__graph_store._is_delayed:
Expand Down
172 changes: 65 additions & 107 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,58 +94,44 @@ def create_empty_df_with_edge_props(indices_t, weight_t, return_offsets=False):
return df


def convert_to_cudf(cp_arrays, weight_t, with_edge_properties, return_offsets=False):
def convert_to_cudf(cp_arrays, return_offsets=False):
"""
Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper
"""
df = cudf.DataFrame()

if with_edge_properties:
(
sources,
destinations,
weights,
edge_ids,
edge_types,
batch_ids,
offsets,
hop_ids,
) = cp_arrays

df[src_n] = sources
df[dst_n] = destinations
df[weight_n] = weights
df[edge_id_n] = edge_ids
df[edge_type_n] = edge_types
df[hop_id_n] = hop_ids

if return_offsets:
offsets_df = cudf.DataFrame(
{
batch_id_n: batch_ids,
offsets_n: offsets[:-1],
}
)
return df, offsets_df
else:
if len(batch_ids) > 0:
batch_ids = cudf.Series(batch_ids).repeat(cp.diff(offsets))
batch_ids.reset_index(drop=True, inplace=True)
(
sources,
destinations,
weights,
edge_ids,
edge_types,
batch_ids,
offsets,
hop_ids,
) = cp_arrays

df[src_n] = sources
df[dst_n] = destinations
df[weight_n] = weights
df[edge_id_n] = edge_ids
df[edge_type_n] = edge_types
df[hop_id_n] = hop_ids

df[batch_id_n] = batch_ids
return df
if return_offsets:
offsets_df = cudf.DataFrame(
{
batch_id_n: batch_ids,
offsets_n: offsets[:-1],
}
)
return df, offsets_df
else:
cupy_sources, cupy_destinations, cupy_indices = cp_arrays

df[src_n] = cupy_sources
df[dst_n] = cupy_destinations
df[indices_n] = cupy_indices

if weight_t == "int32":
df.indices = df.indices.astype("int32")
elif weight_t == "int64":
df.indices = df.indices.astype("int64")
if len(batch_ids) > 0:
batch_ids = cudf.Series(batch_ids).repeat(cp.diff(offsets))
batch_ids.reset_index(drop=True, inplace=True)

df[batch_id_n] = batch_ids
return df


Expand All @@ -157,8 +143,6 @@ def _call_plc_uniform_neighbor_sample(
label_to_output_comm_rank,
fanout_vals,
with_replacement,
weight_t,
with_edge_properties,
random_state=None,
return_offsets=False,
):
Expand All @@ -173,12 +157,11 @@ def _call_plc_uniform_neighbor_sample(
h_fan_out=fanout_vals,
with_replacement=with_replacement,
do_expensive_check=False,
with_edge_properties=with_edge_properties,
batch_id_list=batch_id_list_x,
random_state=random_state,
)
return convert_to_cudf(
cp_arrays, weight_t, with_edge_properties, return_offsets=return_offsets
cp_arrays, return_offsets=return_offsets
)


Expand All @@ -193,7 +176,6 @@ def _mg_call_plc_uniform_neighbor_sample(
with_replacement,
weight_t,
indices_t,
with_edge_properties,
random_state,
return_offsets=False,
):
Expand All @@ -207,8 +189,6 @@ def _mg_call_plc_uniform_neighbor_sample(
label_to_output_comm_rank,
fanout_vals,
with_replacement,
weight_t=weight_t,
with_edge_properties=with_edge_properties,
# FIXME accept and properly transmute a numpy/cupy random state.
random_state=hash((random_state, i)),
workers=[w],
Expand All @@ -223,8 +203,6 @@ def _mg_call_plc_uniform_neighbor_sample(
create_empty_df_with_edge_props(
indices_t, weight_t, return_offsets=return_offsets
)
if with_edge_properties
else create_empty_df(indices_t, weight_t)
)

if return_offsets:
Expand All @@ -251,7 +229,6 @@ def uniform_neighbor_sample(
start_list: Sequence,
fanout_vals: List[int],
with_replacement: bool = True,
with_edge_properties: bool = False,
batch_id_list: Sequence = None,
label_list: Sequence = None,
label_to_output_comm_rank: bool = None,
Expand Down Expand Up @@ -279,13 +256,8 @@ def uniform_neighbor_sample(
with_replacement: bool, optional (default=True)
Flag to specify if the random sampling is done with replacement
with_edge_properties: bool, optional (default=False)
Flag to specify whether to return edge properties (weight, edge id,
edge type, batch id, hop id) with the sampled edges.
batch_id_list: cudf.Series or dask_cudf.Series (int32), optional (default=None)
List of batch ids that will be returned with the sampled edges if
with_edge_properties is set to True.
List of batch ids that will be returned with the sampled edges.
label_list: cudf.Series or dask_cudf.Series (int32), optional (default=None)
List of unique batch id labels. Used along with
Expand Down Expand Up @@ -314,51 +286,40 @@ def uniform_neighbor_sample(
-------
result : dask_cudf.DataFrame or Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]
GPU distributed data frame containing several dask_cudf.Series
If with_edge_properties=True:
ddf['sources']: dask_cudf.Series
If return_offsets=False:
df['sources']: dask_cudf.Series
Contains the source vertices from the sampling result
ddf['destinations']: dask_cudf.Series
df['destinations']: dask_cudf.Series
Contains the destination vertices from the sampling result
ddf['indices']: dask_cudf.Series
Contains the indices from the sampling result for path
reconstruction
If with_edge_properties=False:
If return_offsets=False:
df['sources']: dask_cudf.Series
Contains the source vertices from the sampling result
df['destinations']: dask_cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If return_offsets=True:
df['sources']: cudf.Series
Contains the source vertices from the sampling result
df['destinations']: cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: cudf.Series
Contains the edge types from the sampling result
df['hop_id']: cudf.Series
Contains the hop ids from the sampling result
offsets_df['batch_id']: cudf.Series
Contains the batch ids from the sampling result
offsets_df['offsets']: cudf.Series
Contains the offsets of each batch in the sampling result
df['edge_weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If return_offsets=True:
df['sources']: cudf.Series
Contains the source vertices from the sampling result
df['destinations']: cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: cudf.Series
Contains the edge types from the sampling result
df['hop_id']: cudf.Series
Contains the hop ids from the sampling result
offsets_df['batch_id']: cudf.Series
Contains the batch ids from the sampling result
offsets_df['offsets']: cudf.Series
Contains the offsets of each batch in the sampling result
"""

if isinstance(start_list, int):
Expand All @@ -371,8 +332,7 @@ def uniform_neighbor_sample(
input_graph.renumber_map.renumbered_src_col_name
].dtype,
)

elif with_edge_properties and batch_id_list is None:
elif batch_id_list is None:
batch_id_list = cudf.Series(cp.zeros(len(start_list), dtype="int32"))

# fanout_vals must be a host array!
Expand Down Expand Up @@ -450,7 +410,6 @@ def uniform_neighbor_sample(
with_replacement=with_replacement,
weight_t=weight_t,
indices_t=indices_t,
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
)
Expand All @@ -472,7 +431,6 @@ def uniform_neighbor_sample(
with_replacement=with_replacement,
weight_t=weight_t,
indices_t=indices_t,
with_edge_properties=with_edge_properties,
random_state=random_state,
return_offsets=return_offsets,
)
Expand Down
Loading

0 comments on commit d5230fa

Please sign in to comment.