Skip to content

Commit

Permalink
Merge pull request #61 from johnlees/cpu-progress
Browse files Browse the repository at this point in the history
Add CPU progress bars and handle interrupts
  • Loading branch information
johnlees authored Jun 8, 2021
2 parents 1b246d0 + c2f75e6 commit 4bc635d
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 117 deletions.
15 changes: 10 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ file(WRITE src/version.h "\#define SKETCH_VERSION \"${sketch_hash}\"\n")
set(TARGET_NAME pp_sketchlib)
add_compile_definitions(PYTHON_EXT)

# Add -O0 to remove optimizations when using gcc
# gcc: Add openmp
# gcc: Add -O0 to remove optimizations when using debug
IF(CMAKE_COMPILER_IS_GNUCC)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0")
ENDIF(CMAKE_COMPILER_IS_GNUCC)

if(UNIX AND NOT APPLE)
if(CMAKE_CXX_COMPILER STREQUAL "icpc")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -fast -xCASCADELAKE -DMKL_ILP64 -m64 -static-intel")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fast -xCASCADELAKE -DMKL_ILP64 -m64 -static-intel")
else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D__STDC_LIMIT_MACROS -D__STDC_CONSTANT_MACROS")
set(CMAKE_LD_FLAGS "${CMAKE_LDFLAGS} -Wl,--as-needed")
endif()
endif()
Expand Down Expand Up @@ -75,7 +77,7 @@ if(CMAKE_CUDA_COMPILER)

add_compile_definitions(GPU_AVAILABLE)
add_library("${TARGET_NAME}_CUDA" OBJECT src/gpu/dist.cu src/gpu/sketch.cu)
target_include_directories("${TARGET_NAME}_CUDA" PRIVATE "${EIGEN3_INCLUDE_DIR}")
target_include_directories("${TARGET_NAME}_CUDA" PRIVATE "${EIGEN3_INCLUDE_DIR}" "${pybind11_INCLUDE_DIRS}")
set_property(TARGET "${TARGET_NAME}_CUDA"
PROPERTY POSITION_INDEPENDENT_CODE ON
CUDA_SEPARABLE_COMPILATION ON
Expand Down Expand Up @@ -104,10 +106,13 @@ target_sources("${TARGET_NAME}" PRIVATE src/sketchlib_bindings.cpp
src/random/random_match.cpp)
set_target_properties("${TARGET_NAME}" PROPERTIES
CXX_VISIBILITY_PRESET "hidden"
INTERPROCEDURAL_OPTIMIZATION TRUE
PREFIX "${PYTHON_MODULE_PREFIX}"
SUFFIX "${PYTHON_MODULE_EXTENSION}"
)
if(UNIX AND (NOT APPLE OR NOT CMAKE_COMPILER_IS_GNUCC))
set_target_properties("${TARGET_NAME}" PROPERTIES
INTERPROCEDURAL_OPTIMIZATION ON)
endif()

# Link libraries
if(CMAKE_CUDA_COMPILER)
Expand Down
2 changes: 1 addition & 1 deletion pp_sketch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

'''PopPUNK sketching functions'''

__version__ = '1.7.1'
__version__ = '1.7.2'
123 changes: 92 additions & 31 deletions src/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
#include <limits>

#include <H5Cpp.h>
#include <omp.h>
#include <pybind11/pybind11.h>

#include "api.hpp"
#include "database/database.hpp"
#include "gpu/gpu.hpp"
#include "reference.hpp"
#include "sketch/progress.hpp"

using namespace Eigen;
namespace py = pybind11;

bool same_db_version(const std::string &db1_name, const std::string &db2_name) {
// Open databases
Expand Down Expand Up @@ -70,11 +74,43 @@ std::vector<Reference> create_sketches(
}
KmerSeeds kmer_seeds = generate_seeds(kmer_lengths, codon_phased);

#pragma omp parallel for schedule(static) num_threads(num_threads)
ProgressMeter sketch_progress(names.size());
bool interrupt = false;
std::vector<std::runtime_error> errors;
#pragma omp parallel for schedule(dynamic, 5) num_threads(num_threads)
for (unsigned int i = 0; i < names.size(); i++) {
SeqBuf seq_in(files[i], kmer_lengths.back());
sketches[i] = Reference(names[i], seq_in, kmer_seeds, sketchsize64,
codon_phased, use_rc, min_count, exact);
if (interrupt || PyErr_CheckSignals() != 0) {
interrupt = true;
} else {
try {
SeqBuf seq_in(files[i], kmer_lengths.back());
sketches[i] = Reference(names[i], seq_in, kmer_seeds, sketchsize64,
codon_phased, use_rc, min_count, exact);
} catch (const std::runtime_error &e) {
#pragma omp critical
{
errors.push_back(e);
interrupt = true;
}
}
}

if (omp_get_thread_num() == 0) {
sketch_progress.tick(num_threads);
}
}
sketch_progress.finalise();

// Handle errors including Ctrl-C from python
if (interrupt) {
for (auto i = errors.cbegin(); i != errors.cend(); ++i) {
std::cout << i->what() << std::endl;
}
if (errors.size()) {
throw std::runtime_error("Errors during sketching");
} else {
throw py::error_already_set();
}
}

// Save sketches and check for densified sketches
Expand Down Expand Up @@ -132,6 +168,7 @@ NumpyMatrix query_db(std::vector<Reference> &ref_sketches,
// These could be the same but out of order, which could be dealt with
// using a sort, except the return order of the distances wouldn't be as
// expected. self iff ref_names == query_names as input
bool interrupt = false;
if (ref_sketches == query_sketches) {
// calculate dists
size_t dist_rows = static_cast<size_t>(0.5 * (ref_sketches.size()) *
Expand All @@ -140,24 +177,33 @@ NumpyMatrix query_db(std::vector<Reference> &ref_sketches,

arma::mat kmer_mat = kmer2mat<std::vector<size_t>>(kmer_lengths);

// Iterate upper triangle
// Iterate upper triangle
ProgressMeter dist_progress(dist_rows, true);
#pragma omp parallel for simd schedule(guided, 1) num_threads(num_threads)
for (size_t i = 0; i < ref_sketches.size(); i++) {
for (size_t j = i + 1; j < ref_sketches.size(); j++) {
size_t pos = square_to_condensed(i, j, ref_sketches.size());
if (jaccard) {
for (unsigned int kmer_idx = 0; kmer_idx < kmer_lengths.size();
kmer_idx++) {
distMat(pos, kmer_idx) = ref_sketches[i].jaccard_dist(
ref_sketches[j], kmer_lengths[kmer_idx], random_chance);
if (interrupt || PyErr_CheckSignals() != 0) {
interrupt = true;
} else {
for (size_t j = i + 1; j < ref_sketches.size(); j++) {
size_t pos = square_to_condensed(i, j, ref_sketches.size());
if (jaccard) {
for (unsigned int kmer_idx = 0; kmer_idx < kmer_lengths.size();
kmer_idx++) {
distMat(pos, kmer_idx) = ref_sketches[i].jaccard_dist(
ref_sketches[j], kmer_lengths[kmer_idx], random_chance);
}
} else {
std::tie(distMat(pos, 0), distMat(pos, 1)) =
ref_sketches[i].core_acc_dist<RandomMC>(ref_sketches[j], kmer_mat,
random_chance);
}
} else {
std::tie(distMat(pos, 0), distMat(pos, 1)) =
ref_sketches[i].core_acc_dist<RandomMC>(ref_sketches[j], kmer_mat,
random_chance);
}
if (omp_get_thread_num() == 0) {
dist_progress.tick(ref_sketches.size() / 2);
}
}
}
dist_progress.finalise();
} else {
// If ref != query, make a thread queue, with each element one ref
// calculate dists
Expand All @@ -168,36 +214,51 @@ NumpyMatrix query_db(std::vector<Reference> &ref_sketches,
arma::mat kmer_mat = kmer2mat<std::vector<size_t>>(kmer_lengths);
std::vector<size_t> query_lengths(query_sketches.size());
std::vector<uint16_t> query_random_idxs(query_sketches.size());

#pragma omp parallel for simd schedule(static) num_threads(num_threads)
for (unsigned int q_idx = 0; q_idx < query_sketches.size(); q_idx++) {
query_lengths[q_idx] = query_sketches[q_idx].seq_length();
query_random_idxs[q_idx] =
random_chance.closest_cluster(query_sketches[q_idx]);
}

ProgressMeter dist_progress(dist_rows, true);
#pragma omp parallel for collapse(2) schedule(static) num_threads(num_threads)
for (unsigned int q_idx = 0; q_idx < query_sketches.size(); q_idx++) {
for (unsigned int r_idx = 0; r_idx < ref_sketches.size(); r_idx++) {
const long dist_row = q_idx * ref_sketches.size() + r_idx;
if (jaccard) {
for (unsigned int kmer_idx = 0; kmer_idx < kmer_lengths.size();
kmer_idx++) {
double jaccard_random = random_chance.random_match(
if (interrupt || PyErr_CheckSignals() != 0) {
interrupt = true;
} else {
const long dist_row = q_idx * ref_sketches.size() + r_idx;
if (jaccard) {
for (unsigned int kmer_idx = 0; kmer_idx < kmer_lengths.size();
kmer_idx++) {
double jaccard_random = random_chance.random_match(
ref_sketches[r_idx], query_random_idxs[q_idx],
query_lengths[q_idx], kmer_lengths[kmer_idx]);
distMat(dist_row, kmer_idx) = query_sketches[q_idx].jaccard_dist(
ref_sketches[r_idx], kmer_lengths[kmer_idx], jaccard_random);
}
} else {
std::vector<double> jaccard_random = random_chance.random_matches(
ref_sketches[r_idx], query_random_idxs[q_idx],
query_lengths[q_idx], kmer_lengths[kmer_idx]);
distMat(dist_row, kmer_idx) = query_sketches[q_idx].jaccard_dist(
ref_sketches[r_idx], kmer_lengths[kmer_idx], jaccard_random);
query_lengths[q_idx], kmer_lengths);
std::tie(distMat(dist_row, 0), distMat(dist_row, 1)) =
query_sketches[q_idx].core_acc_dist<std::vector<double>>(
ref_sketches[r_idx], kmer_mat, jaccard_random);
}
if (omp_get_thread_num() == 0) {
dist_progress.tick(num_threads);
}
} else {
std::vector<double> jaccard_random = random_chance.random_matches(
ref_sketches[r_idx], query_random_idxs[q_idx],
query_lengths[q_idx], kmer_lengths);
std::tie(distMat(dist_row, 0), distMat(dist_row, 1)) =
query_sketches[q_idx].core_acc_dist<std::vector<double>>(
ref_sketches[r_idx], kmer_mat, jaccard_random);
}
}
}
dist_progress.finalise();
}

// Handle Ctrl-C from python
if (interrupt) {
throw py::error_already_set();
}

return (distMat);
Expand Down
25 changes: 23 additions & 2 deletions src/gpu/cuda.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,35 @@ static void HandleCUDAError(const char *file, int line,

#define CUDA_CALL(err) (HandleCUDAError(__FILE__, __LINE__, err))

// A bit lazy... should be a class and just use the destructor
struct progress_atomics {
volatile int *blocks_complete;
bool *kill_kernel;

void init() {
CUDA_CALL(cudaMallocManaged(&blocks_complete, sizeof(int)));
CUDA_CALL(cudaMallocManaged(&kill_kernel, sizeof(bool)));
*blocks_complete = 0;
*kill_kernel = false;
}

void free() {
CUDA_CALL(cudaFree((void *)blocks_complete));
CUDA_CALL(cudaFree(kill_kernel));
}
};

// Use atomic add to update a counter, so progress works regardless of
// dispatch order
__device__ inline void update_progress(long long dist_idx, long long dist_n,
volatile int *blocks_complete) {
progress_atomics progress) {
// Progress indicator
// The >> progressBitshift is a divide by 1024 - update roughly every 0.1%
if (dist_idx % (dist_n >> progressBitshift) == 0) {
atomicAdd((int *)blocks_complete, 1);
if (*(progress.kill_kernel) == true) {
__trap();
}
atomicAdd((int *)progress.blocks_complete, 1);
__threadfence_system();
}
}
29 changes: 18 additions & 11 deletions src/gpu/dist.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include <unistd.h>
#include <vector>

#include <pybind11/pybind11.h>
namespace py = pybind11;

// memcpy_async
#if __CUDACC_VER_MAJOR__ >= 11
#include <cuda/barrier>
Expand Down Expand Up @@ -140,7 +143,7 @@ __global__ void calculate_dists(
const float *random_table, const uint16_t *ref_idx_lookup,
const uint16_t *query_idx_lookup, const SketchStrides ref_strides,
const SketchStrides query_strides, const RandomStrides random_strides,
volatile int *blocks_complete, const bool use_shared) {
progress_atomics progress, const bool use_shared) {
// Calculate indices for query, ref and results
int ref_idx, query_idx, dist_idx;
if (self) {
Expand Down Expand Up @@ -280,7 +283,7 @@ __global__ void calculate_dists(
simple_linear_regression(dists + dist_idx, dists + dist_n + dist_idx, xsum,
ysum, xysum, xsquaresum, ysquaresum, kmer_used);

update_progress(dist_idx, dist_n, blocks_complete);
update_progress(dist_idx, dist_n, progress);
}
}

Expand Down Expand Up @@ -395,14 +398,18 @@ std::tuple<size_t, size_t> getBlockSize(const size_t ref_samples,

// Writes a progress meter using the device int which keeps
// track of completed jobs
void reportDistProgress(volatile int *blocks_complete, long long dist_rows) {
void reportDistProgress(progress_atomics progress, long long dist_rows) {
long long progress_blocks = 1 << progressBitshift;
int now_completed = 0;
float kern_progress = 0;
if (dist_rows > progress_blocks) {
while (now_completed < progress_blocks - 1) {
if (*blocks_complete > now_completed) {
now_completed = *blocks_complete;
if (PyErr_CheckSignals() != 0) {
*(progress.kill_kernel) = true;
throw py::error_already_set();
}
if (*(progress.blocks_complete) > now_completed) {
now_completed = *(progress.blocks_complete);
kern_progress = now_completed / (float)progress_blocks;
fprintf(stderr, "%cProgress (GPU): %.1lf%%", 13, kern_progress * 100);
} else {
Expand Down Expand Up @@ -446,9 +453,8 @@ std::vector<float> dispatchDists(std::vector<Reference> &ref_sketches,
CUDA_CALL(cudaDeviceSetSharedMemConfig(cudaSharedMemBankSizeEightByte));

// Progress meter
volatile int *blocks_complete;
CUDA_CALL(cudaMallocManaged(&blocks_complete, sizeof(int)));
*blocks_complete = 0;
progress_atomics progress;
progress.init();

RandomStrides random_strides = std::get<0>(flat_random);
long long dist_rows;
Expand Down Expand Up @@ -494,7 +500,7 @@ std::vector<float> dispatchDists(std::vector<Reference> &ref_sketches,
device_arrays.kmers(), kmer_lengths.size(), device_arrays.dist_mat(),
dist_rows, device_arrays.random_table(), device_arrays.ref_random(),
device_arrays.ref_random(), ref_strides, ref_strides, random_strides,
blocks_complete, use_shared);
progress, use_shared);
} else {
std::tie(blockSize, blockCount) =
getBlockSize(sketch_subsample.ref_size, sketch_subsample.query_size,
Expand All @@ -509,13 +515,14 @@ std::vector<float> dispatchDists(std::vector<Reference> &ref_sketches,
device_arrays.kmers(), kmer_lengths.size(), device_arrays.dist_mat(),
dist_rows, device_arrays.random_table(), device_arrays.ref_random(),
device_arrays.query_random(), ref_strides, query_strides,
random_strides, blocks_complete, use_shared);
random_strides, progress, use_shared);
}

// Check for error in kernel launch
CUDA_CALL(cudaGetLastError());
reportDistProgress(blocks_complete, dist_rows);
reportDistProgress(progress, dist_rows);
fprintf(stderr, "%cProgress (GPU): 100.0%%\n", 13);
progress.free();

// Copy results back to host
CUDA_CALL(cudaDeviceSynchronize());
Expand Down
Loading

0 comments on commit 4bc635d

Please sign in to comment.