Skip to content

Commit

Permalink
Improvements in Resolver and some base changes for other features (#73)
Browse files Browse the repository at this point in the history
* Collect the alter table add column info

* add methods to set data members and clear functions

* Remove additional clear method

* Remove unused members in catalog

* Add FilterJoin in the cresolver

* Handle resolving the fused operators

* clean code and rename the table correctly in the benchmark

* throw exception in filter join hashtable is not precomputed

* Fix resolved tests for the changes

* correct the table names in simplified tatp

* remove old-parser module from the hustle metadata

* remove old-parser code and its headers

* use reset() to delete the assigned obj in shared_ptr

* make the getters and setters inline and name changes for conventions

* remove the log stmts and add TODO

* rename getters in hustle resolver according to conventions

* add comments for convention and add macros for the default size
  • Loading branch information
srsuryadev authored Jan 27, 2021
1 parent faf3430 commit df14c96
Show file tree
Hide file tree
Showing 38 changed files with 547 additions and 1,713 deletions.
2 changes: 0 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ target_link_libraries(hustle_main
hustleDB
hustle_src_catalog
hustle_src_storage
hustle_src_parser
hustle_src_resolver
hustle_src_operators
hustle_src_scheduler_Scheduler
Expand All @@ -46,7 +45,6 @@ target_link_libraries(hustle_main
add_subdirectory(utils)
add_subdirectory(catalog)
add_subdirectory(api)
add_subdirectory(parser)
add_subdirectory(resolver)
add_subdirectory(storage)
add_subdirectory(operators)
Expand Down
8 changes: 4 additions & 4 deletions src/benchmark/tatp_workload.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ void TATP::CreateTable() {

std::shared_ptr<DBTable> s, ai, sf, cf;

s = std::make_shared<hustle::storage::DBTable>("table", s_schema, BLOCK_SIZE);
ai = std::make_shared<hustle::storage::DBTable>("table", ai_schema,
s = std::make_shared<hustle::storage::DBTable>("Subscriber", s_schema, BLOCK_SIZE);
ai = std::make_shared<hustle::storage::DBTable>("Access_Info", ai_schema,
BLOCK_SIZE);
sf = std::make_shared<hustle::storage::DBTable>("table", sf_schema,
sf = std::make_shared<hustle::storage::DBTable>("Special_Facility", sf_schema,
BLOCK_SIZE);
cf = std::make_shared<hustle::storage::DBTable>("table", cf_schema,
cf = std::make_shared<hustle::storage::DBTable>("Call_Forwarding", cf_schema,
BLOCK_SIZE);

hustle_db->createTable(subscriber, s);
Expand Down
1 change: 0 additions & 1 deletion src/benchmark/tatp_workload.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "catalog/catalog.h"
#include "catalog/column_schema.h"
#include "catalog/table_schema.h"
#include "parser/parser.h"
#include "sqlite3/sqlite3.h"
#include "storage/util.h"

Expand Down
10 changes: 0 additions & 10 deletions src/catalog/catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,6 @@
#include <iostream>
#include <string>

extern const int SERIAL_BLOCK_SIZE = 4096;
char tableList[SERIAL_BLOCK_SIZE];
char project[SERIAL_BLOCK_SIZE];
char loopPred[SERIAL_BLOCK_SIZE];
char otherPred[SERIAL_BLOCK_SIZE];
char aggregate[SERIAL_BLOCK_SIZE];
char groupBy[SERIAL_BLOCK_SIZE];
char orderBy[SERIAL_BLOCK_SIZE];
char *currPos = nullptr;

namespace hustle {
namespace catalog {

Expand Down
2 changes: 1 addition & 1 deletion src/execution/execution_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void ExecutionPlan::execute() {

for (std::unique_ptr<Operator> &op : operators_) {
DCHECK(op != nullptr);
const std::size_t op_index = op->getOperatorIndex();
const std::size_t op_index = op->operator_index();
const Continuation op_cont = scheduler->allocateContinuation();

const TaskID op_task_id =
Expand Down
4 changes: 2 additions & 2 deletions src/execution/execution_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ class ExecutionPlan : public Task {

std::size_t addOperator(Operator* op) {
const std::size_t op_index = operators_.size();
op->setOperatorIndex(op_index);
op->set_operator_index(op_index);
operators_.emplace_back(op);
return op_index;
}

std::size_t addOperator(std::unique_ptr<Operator> op) {
const std::size_t op_index = operators_.size();
op->setOperatorIndex(op_index);
op->set_operator_index(op_index);
operators_.emplace_back(std::move(op));
return op_index;
}
Expand Down
1 change: 0 additions & 1 deletion src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include "catalog/catalog.h"
#include "catalog/column_schema.h"
#include "catalog/table_schema.h"
#include "parser/parser.h"
#include "storage/util.h"


Expand Down
17 changes: 17 additions & 0 deletions src/operators/aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,23 @@ void Aggregate::SortResult(std::vector<arrow::Datum>& groups,
}
}

void Aggregate::Clear() {
num_aggs_ = 0;

prev_result_.reset();
output_result_.reset();
output_table_.reset();

aggregate_col_data_.clear();
aggregate_refs_.clear();
group_by_refs_.clear();
order_by_refs_.clear();
group_by_cols_.clear();
group_agg_index_map_.clear();
unique_values_map_.clear();
unique_values_.clear();
}

void Aggregate::Finish() {
arrow::Status status;

Expand Down
28 changes: 26 additions & 2 deletions src/operators/aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ class Aggregate : public BaseAggregate {
Aggregate(const std::size_t query_id,
std::shared_ptr<OperatorResult> prev_result,
std::shared_ptr<OperatorResult> output_result,
std::vector<AggregateReference> aggregate_units,
std::vector<AggregateReference> aggregate_refs,
std::vector<ColumnReference> group_by_refs,
std::vector<ColumnReference> order_by_refs);

Aggregate(const std::size_t query_id,
std::shared_ptr<OperatorResult> prev_result,
std::shared_ptr<OperatorResult> output_result,
std::vector<AggregateReference> aggregate_units,
std::vector<AggregateReference> aggregate_refs,
std::vector<ColumnReference> group_by_refs,
std::vector<ColumnReference> order_by_refs,
std::shared_ptr<OperatorOptions> options);
Expand All @@ -132,6 +132,30 @@ class Aggregate : public BaseAggregate {
*/
void execute(Task* ctx) override;

void Clear() override;

inline void set_prev_result(std::shared_ptr<OperatorResult> prev_result) {
prev_result_ = prev_result;
}

inline void set_output_result(std::shared_ptr<OperatorResult> output_result) {
output_result_ = output_result;
}

inline void set_aggregate_refs(std::vector<AggregateReference> aggregate_refs) {
aggregate_refs_ = aggregate_refs;
}

inline void set_groupby_refs(std::vector<ColumnReference> group_by_refs) {
group_by_refs_ = group_by_refs;
}

inline void set_orderby_refs(std::vector<ColumnReference> order_by_refs) {
order_by_refs_ = order_by_refs;
}

void Initialize() {}

private:
// Number of groups to aggregate.
std::size_t num_aggs_;
Expand Down
3 changes: 3 additions & 0 deletions src/operators/fused/filter_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ void FilterJoin::BuildFilters(Task *ctx) {

bloom_filter->set_memory(1);
bloom_filter->set_fact_fk_name(fact_fk_col_names_[table_idx]);
if (dim_tables_[table_idx].hash_table_ == nullptr) {
throw "hash table for the dimension relation not constructed";
}
dim_filters_[table_idx] = {bloom_filter,
dim_tables_[table_idx].hash_table_};
})));
Expand Down
2 changes: 2 additions & 0 deletions src/operators/fused/filter_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class FilterJoin : public Operator {
*/
void execute(Task *ctx) override;

void Clear() override {}

private:
// Row indices of the fact table that successfully probed all Bloom filters.
std::vector<std::vector<uint32_t>> lip_indices_;
Expand Down
2 changes: 2 additions & 0 deletions src/operators/fused/select_build_hash.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ class SelectBuildHash : public Select {
*/
void execute(Task *ctx) override;

void Clear() override {}

private:
ColumnReference join_column_;

Expand Down
32 changes: 24 additions & 8 deletions src/operators/hash_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,42 @@ class HashAggregate : public BaseAggregate {
HashAggregate(const std::size_t query_id,
std::shared_ptr<OperatorResult> prev_result,
std::shared_ptr<OperatorResult> output_result,
std::vector<AggregateReference> aggregate_units,
std::vector<AggregateReference> aggregate_refs,
std::vector<ColumnReference> group_by_refs,
std::vector<ColumnReference> order_by_refs);

HashAggregate(const std::size_t query_id,
std::shared_ptr<OperatorResult> prev_result,
std::shared_ptr<OperatorResult> output_result,
std::vector<AggregateReference> aggregate_units,
std::vector<AggregateReference> aggregate_refs,
std::vector<ColumnReference> group_by_refs,
std::vector<ColumnReference> order_by_refs,
std::shared_ptr<OperatorOptions> options);

void execute(Task* ctx) override;

inline void set_prev_result(std::shared_ptr<OperatorResult> prev_result) {
prev_result_ = prev_result;
}

inline void set_output_result(std::shared_ptr<OperatorResult> output_result) {
output_result_ = output_result;
}

inline void set_aggregate_refs(std::vector<AggregateReference> aggregate_refs) {
aggregate_refs_ = aggregate_refs;
}

inline void set_groupby_refs(std::vector<ColumnReference> group_by_refs) {
group_by_refs_ = group_by_refs;
}

inline void set_orderby_refs(std::vector<ColumnReference> order_by_refs) {
order_by_refs_ = order_by_refs;
}

void Clear() override {}

private:
// Operator result from an upstream operator and output result will be stored
std::shared_ptr<OperatorResult> prev_result_, output_result_;
Expand Down Expand Up @@ -216,12 +238,6 @@ class HashAggregate : public BaseAggregate {
*/
void SecondPhaseAggregate(Task* internal);







hash_t HashCombine(hash_t seed, hash_t val);

void SortResult(std::vector<arrow::Datum> &groups, arrow::Datum &aggregates);
Expand Down
23 changes: 23 additions & 0 deletions src/operators/join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,29 @@ void Join::BuildHashTable(int join_id,
}
}

void Join::Clear() {
prev_result_.reset();
output_result_.reset();

lefts_.clear();
rights_.clear();
left_col_names_.clear();
right_col_names_.clear();
prev_result_vec_.clear();
hash_tables_.clear();

new_left_indices_vector_.clear();
new_right_indices_vector_.clear();

left_index_chunks_vector_.clear();
right_index_chunks_vector_.clear();

joined_indices_.clear();
joined_index_chunks_.clear();
finished_.clear();
}


void Join::ProbeHashTableBlock(
int join_id, const std::shared_ptr<arrow::ChunkedArray> &probe_col,
const std::shared_ptr<arrow::ChunkedArray> &probe_filter, int batch_i,
Expand Down
18 changes: 18 additions & 0 deletions src/operators/join.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ class Join : public Operator {
*/
void execute(Task *ctx) override;

void Clear() override;

inline void set_prev_result(std::vector<std::shared_ptr<OperatorResult>> prev_result) {
prev_result_vec_ = prev_result;
}

inline void set_output_result(std::shared_ptr<OperatorResult> output_result) {
output_result_ = output_result;
}

inline void set_join_graph(JoinGraph join_graph) { graph_ = join_graph; }

void Initialize() {
prev_result_ = std::make_shared<OperatorResult>();
joined_indices_.resize(2);
joined_index_chunks_.resize(2);
}

private:
// lefts_[i] = the left table in the ith join
// rights_[i] = the right table in the ith join
Expand Down
23 changes: 17 additions & 6 deletions src/operators/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,21 @@ class Operator {
public:
virtual void execute(Task *ctx) = 0;

std::size_t getOperatorIndex() const { return op_index_; }
virtual void Clear() = 0;

void setOperatorIndex(const std::size_t op_index) { op_index_ = op_index; }
inline std::size_t operator_index() const { return op_index_; }

std::size_t getQueryId() const { return query_id_; }
std::shared_ptr<OperatorResult> result_;
inline void set_operator_index(const std::size_t op_index) { op_index_ = op_index; }

inline void set_query_id(std::size_t query_id) {
query_id_ = query_id;
}

inline void set_operator_options( std::shared_ptr<OperatorOptions> options) {
options_ = options;
}

inline std::size_t query_id() const { return query_id_; }

// TODO(nicholas): Make private
Task *createTask() {
Expand All @@ -52,8 +61,10 @@ class Operator {
});
}

std::shared_ptr<OperatorResult> result_;

protected:
explicit Operator(const std::size_t query_id) : query_id_(query_id) {
explicit Operator(std::size_t query_id) : query_id_(query_id) {
options_ = std::make_shared<OperatorOptions>();
}

Expand All @@ -62,7 +73,7 @@ class Operator {
: query_id_(query_id), options_(options) {}

std::shared_ptr<OperatorOptions> options_;
const std::size_t query_id_;
std::size_t query_id_;

private:
std::size_t op_index_;
Expand Down
4 changes: 4 additions & 0 deletions src/operators/select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ arrow::Datum Select::Filter(const std::shared_ptr<Block> &block,
}
}

void Select::Clear() {
filters_.clear();
}

template <typename T, typename Op>
arrow::Datum Select::Filter(const std::shared_ptr<Block> &block,
const ColumnReference &col_ref, const T &value,
Expand Down
18 changes: 18 additions & 0 deletions src/operators/select.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,24 @@ class Select : public Operator {
*/
void execute(Task *ctx) override;

inline void set_output_result(std::shared_ptr<OperatorResult> output_result) {
output_result_ = output_result;
}

inline void set_table(std::shared_ptr<DBTable> table) {
table_ = table;
}

inline void set_tree(std::shared_ptr<PredicateTree> tree) {
tree_ = tree;
}

void Initialize() {
filters_.resize(table_->get_num_blocks());
}

void Clear() override;

protected:
std::shared_ptr<DBTable> table_;
std::shared_ptr<OperatorResult> output_result_;
Expand Down
2 changes: 2 additions & 0 deletions src/operators/utils/lip.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class LIP : public Operator {
*/
void execute(Task *ctx) override;

void Clear() override {}

private:
std::unordered_map<std::string, std::vector<std::vector<int64_t>>>
out_fk_cols_;
Expand Down
Loading

0 comments on commit df14c96

Please sign in to comment.