Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ set(CORE_SOURCES
src/distributed/raft_group.cpp
src/distributed/raft_manager.cpp
src/distributed/distributed_executor.cpp
src/common/bloom_filter.cpp
src/storage/columnar_table.cpp
)

Expand Down Expand Up @@ -117,6 +118,7 @@ if(BUILD_TESTS)
add_cloudsql_test(catalog_coverage_tests tests/catalog_coverage_tests.cpp)
add_cloudsql_test(transaction_coverage_tests tests/transaction_coverage_tests.cpp)
add_cloudsql_test(utils_coverage_tests tests/utils_coverage_tests.cpp)
add_cloudsql_test(bloom_filter_tests tests/bloom_filter_test.cpp)
add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp)
add_cloudsql_test(server_tests tests/server_tests.cpp)
add_cloudsql_test(statement_tests tests/statement_tests.cpp)
Expand Down
42 changes: 41 additions & 1 deletion docs/performance/SQLITE_COMPARISON.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,48 @@ We addressed the gaps via the following optimizations:
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.

## 6. Future Roadmap
## 6. Distributed Join Optimization: Bloom Filters

### Problem
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.

### Solution: Bloom Filter Integration
Implemented bloom filters to filter tuples at the source before network transmission:
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle

### Architecture
```
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
| |
v v
Build local bloom Apply bloom filter
from join keys before buffering
| |
+---- BloomFilterPush ----->---+
(filter metadata) |
v
Filtered tuples buffered
```
Comment on lines +54 to +65
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a language tag to the fenced diagram block.

The code fence is missing a language identifier, which triggers MD040.

Suggested patch
-```
+```text
 [Phase 1: Shuffle Left]     [Phase 2: Shuffle Right]
      |                             |
      v                             v
 Build local bloom           Apply bloom filter
 from join keys              before buffering
      |                             |
      +---- BloomFilterPush ----->---+
      (filter metadata)              |
                                     v
                          Filtered tuples buffered
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.0)</summary>

[warning] 54-54: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/performance/SQLITE_COMPARISON.md around lines 54 - 65, The fenced
diagram block lacks a language tag (MD040); update the code fence around the
ASCII diagram (the triple-backtick block containing "[Phase 1: Shuffle Left]
[Phase 2: Shuffle Right]" and the diagram lines) to include a language
identifier such as text (e.g., ```text) so the linter recognizes the block
correctly.


</details>

<!-- fingerprinting:phantom:triton:hawk:af1faed4-8e82-4fde-8eff-fae2e9a9c25f -->

<!-- This is an auto-generated comment by CodeRabbit -->


### Key Components
| Component | Location | Purpose |
|-----------|----------|---------|
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Comment on lines +67 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Surround the table with blank lines to satisfy markdownlint.

Add a blank line before/after the table block (MD058).

Suggested patch
 ### Key Components
+
 | Component | Location | Purpose |
 |-----------|----------|---------|
 | `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
 | `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
 | `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
 | `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
 | `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
 | Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
+
 ### Test Coverage
🧰 Tools
🪛 markdownlint-cli2 (0.22.0)

[warning] 68-68: Tables should be surrounded by blank lines

(MD058, blanks-around-tables)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/performance/SQLITE_COMPARISON.md` around lines 67 - 76, The markdown
table under the "### Key Components" heading in SQLITE_COMPARISON.md must be
surrounded by blank lines to satisfy markdownlint MD058; edit the block that
lists the components (the header "### Key Components" and the following table
rows including `BloomFilter`, `BloomFilterArgs`, `ClusterManager`, `PushData`,
`ShuffleFragment`, and `Coordinator`) and add one blank line before the table
and one blank line after the table so the table is separated from surrounding
text.

### Test Coverage
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
- Tests located in `tests/bloom_filter_test.cpp`

## 7. Future Roadmap
With the scan gap closed, our focus shifts to higher-level analytical throughput:
* **Stage 1: SIMD-Accelerated Filtering**: Utilize AVX-512/NEON instructions to filter multiple rows in a single CPU cycle.
* **Stage 2: Vectorized Execution**: Move from row-at-a-time `TupleView` to batch-at-a-time `VectorBatch` processing.
* **Stage 3: Columnar Storage**: Transition from row-oriented heap files to columnar persistence for extreme analytical scanning.
* **Stage 4: Distributed Hash Join**: Enhance the single `HashJoinOperator` with parallel partitioned hash join for multi-node execution.
9 changes: 9 additions & 0 deletions docs/phases/PHASE_6_DISTRIBUTED_JOIN.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Introduced isolated staging areas for inter-node data movement.
Developed a dedicated binary protocol for efficient data redistribution.
- **ShuffleFragment**: Metadata describing the fragment being pushed (target context, source node, schema).
- **PushData**: High-speed binary payload containing the actual tuple data for the shuffle phase.
- **BloomFilterPush**: Bloom filter metadata broadcast to enable tuple filtering before network transmission.

### 3. Two-Phase Join Orchestration (`distributed/distributed_executor.cpp`)
Implemented the control logic for distributed shuffle joins.
Expand All @@ -24,9 +25,17 @@ Implemented the control logic for distributed shuffle joins.
Seamlessly integrated shuffle buffers into the Volcano execution model.
- **Vectorized Buffering**: Optimized the `BufferScanOperator` to handle large volumes of redistributed data with minimal overhead.

### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
Added probabilistic filtering to reduce network traffic in shuffle joins.
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.

## Lessons Learned
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.
- Fine-grained locking in the shuffle buffers is critical for maintaining high throughput during the redistribution phase.
- Bloom filters provide significant network traffic reduction when join selectivity is low, at the cost of a small false positive rate (typically <1%).

## Status: 100% Test Pass
Verified the end-to-end shuffle join flow, including multi-node data movement and final result merging, through automated integration tests.
- 10 unit tests for bloom filter implementation and integration (`tests/bloom_filter_test.cpp`)
1 change: 1 addition & 0 deletions docs/phases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ This directory contains the technical documentation for the lifecycle of the clo
- Context-aware Shuffle infrastructure in `ClusterManager`.
- Implementation of `ShuffleFragment` and `PushData` RPC protocols.
- Two-phase Shuffle Join orchestration in `DistributedExecutor`.
- **Bloom Filter Optimization**: Probabilistic tuple filtering to reduce network traffic in shuffle joins.

### [Phase 7: Replication & High Availability](./PHASE_7_REPLICATION_HA.md)
**Focus**: Fault Tolerance & Data Redundancy.
Expand Down
83 changes: 83 additions & 0 deletions include/common/bloom_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* @file bloom_filter.hpp
* @brief Bloom filter implementation for distributed join optimization
*/

#ifndef SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
#define SQL_ENGINE_COMMON_BLOOM_FILTER_HPP

#include <cstdint>
#include <cstring>
#include <vector>

#include "value.hpp"

namespace cloudsql {
namespace common {

/**
* @brief Bloom filter for probabilistic membership testing
*
* Used in distributed joins to filter tuples that cannot possibly
* match before network transmission.
*/
class BloomFilter {
public:
/**
* @brief Construct a bloom filter with expected elements and false positive rate
* @param expected_elements Number of elements expected to be inserted
* @param false_positive_rate Target false positive rate (default 0.01 = 1%)
*/
explicit BloomFilter(size_t expected_elements, double false_positive_rate = 0.01);

/**
* @brief Construct from serialized data
*/
BloomFilter(const uint8_t* data, size_t size);

/**
* @brief Insert a value into the bloom filter
*/
void insert(const Value& key);

/**
* @brief Check if a value might be in the bloom filter
* @return true if possibly present, false if definitely not present
*/
[[nodiscard]] bool might_contain(const Value& key) const;

/**
* @brief Serialize the bloom filter for network transmission
*/
[[nodiscard]] std::vector<uint8_t> serialize() const;

/**
* @brief Get the bit array size in bytes
*/
[[nodiscard]] size_t bit_size() const { return (num_bits_ + 7) / 8; }

/**
* @brief Get number of hash functions used
*/
[[nodiscard]] size_t num_hashes() const { return num_hashes_; }

/**
* @brief Get expected elements
*/
[[nodiscard]] size_t expected_elements() const { return expected_elements_; }

private:
size_t num_bits_;
size_t num_hashes_;
size_t expected_elements_;
std::vector<uint8_t> bits_;

size_t get_bit_position(size_t hash, size_t i) const;
size_t murmur3_hash(const Value& key) const;
size_t murmur3_hash(const uint8_t* data, size_t len, size_t seed) const;
};

} // namespace common
} // namespace cloudsql

#endif // SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
91 changes: 91 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <unordered_map>
#include <vector>

#include "common/bloom_filter.hpp"
#include "common/config.hpp"
#include "executor/types.hpp"

Expand Down Expand Up @@ -210,7 +211,95 @@ class ClusterManager {
return data;
}

/**
* @brief Store a bloom filter for a shuffle context
*/
void set_bloom_filter(const std::string& context_id, const std::string& build_table,
const std::string& probe_table, const std::string& probe_key_col,
std::vector<uint8_t> filter_data, size_t expected_elements,
size_t num_hashes) {
const std::scoped_lock<std::mutex> lock(mutex_);
auto& entry = bloom_filters_[context_id];
entry.build_table = build_table;
entry.probe_table = probe_table;
entry.probe_key_col = probe_key_col;
entry.filter_data = std::move(filter_data);
entry.expected_elements = expected_elements;
entry.num_hashes = num_hashes;
}

/**
* @brief Check if a bloom filter exists for a context
* @note Returns false if filter_data is empty, so bloom filtering is skipped
*/
[[nodiscard]] bool has_bloom_filter(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it == bloom_filters_.end()) {
return false;
}
// Only consider bloom filter valid if it has actual filter data
return !it->second.filter_data.empty();
}

/**
* @brief Get bloom filter for a context (reconstructs BloomFilter object)
*/
[[nodiscard]] common::BloomFilter get_bloom_filter(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end() && !it->second.filter_data.empty()) {
return common::BloomFilter(it->second.filter_data.data(),
it->second.filter_data.size());
}
return common::BloomFilter(1); // Empty filter
}

/**
* @brief Get probe table name for a context
*/
[[nodiscard]] std::string get_probe_table(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end()) {
return it->second.probe_table;
}
return "";
}

/**
* @brief Get probe key column for a context
*/
[[nodiscard]] std::string get_probe_key_col(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end()) {
return it->second.probe_key_col;
}
return "";
}

/**
* @brief Clear bloom filter for a context
*/
void clear_bloom_filter(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
bloom_filters_.erase(context_id);
}

private:
/**
* @brief Stored bloom filter data for a context
*/
struct BloomFilterEntry {
std::string build_table;
std::string probe_table;
std::string probe_key_col; // Join key column on probe side
std::vector<uint8_t> filter_data;
size_t expected_elements = 0;
size_t num_hashes = 0;
};

const config::Config* config_;
raft::RaftManager* raft_manager_;
NodeInfo self_node_;
Expand All @@ -220,6 +309,8 @@ class ClusterManager {
/* context_id -> table_name -> rows */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
shuffle_buffers_;
/* context_id -> bloom filter data */
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
mutable std::mutex mutex_;
};

Expand Down
68 changes: 68 additions & 0 deletions include/network/rpc_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class RpcType : uint8_t {
TxnAbort = 8,
PushData = 9,
ShuffleFragment = 10,
BloomFilterPush = 11,
Error = 255
};

Expand Down Expand Up @@ -439,6 +440,73 @@ struct ShuffleFragmentArgs {
}
};

/**
* @brief Arguments for BloomFilterPush RPC
*/
struct BloomFilterArgs {
std::string context_id;
std::string build_table;
std::string probe_table;
std::string probe_key_col; // Join key column on probe side for filtering
std::vector<uint8_t> filter_data;
size_t expected_elements = 0;
size_t num_hashes = 0;

[[nodiscard]] std::vector<uint8_t> serialize() const {
std::vector<uint8_t> out;
Serializer::serialize_string(context_id, out);
Serializer::serialize_string(build_table, out);
Serializer::serialize_string(probe_table, out);
Serializer::serialize_string(probe_key_col, out);

// Serialize filter data (blob)
const auto filter_len = static_cast<uint32_t>(filter_data.size());
const size_t off = out.size();
out.resize(off + Serializer::VAL_SIZE_32);
std::memcpy(out.data() + off, &filter_len, Serializer::VAL_SIZE_32);
out.insert(out.end(), filter_data.begin(), filter_data.end());

// Serialize metadata using fixed-width temporaries
uint64_t tmp_expected = static_cast<uint64_t>(expected_elements);
uint8_t tmp_hashes = static_cast<uint8_t>(num_hashes);
const size_t off2 = out.size();
out.resize(off2 + 9); // 8 bytes for expected_elements + 1 for num_hashes
std::memcpy(out.data() + off2, &tmp_expected, 8);
out[off2 + 8] = tmp_hashes;
return out;
}

static BloomFilterArgs deserialize(const std::vector<uint8_t>& in) {
BloomFilterArgs args;
size_t offset = 0;
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
args.build_table = Serializer::deserialize_string(in.data(), offset, in.size());
args.probe_table = Serializer::deserialize_string(in.data(), offset, in.size());
args.probe_key_col = Serializer::deserialize_string(in.data(), offset, in.size());

uint32_t filter_len = 0;
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
std::memcpy(&filter_len, in.data() + offset, Serializer::VAL_SIZE_32);
offset += Serializer::VAL_SIZE_32;
}
if (offset + filter_len <= in.size()) {
args.filter_data.resize(filter_len);
std::memcpy(args.filter_data.data(), in.data() + offset, filter_len);
offset += filter_len;
}

// Deserialize metadata using fixed-width temporaries
if (offset + 9 <= in.size()) {
uint64_t tmp_expected = 0;
std::memcpy(&tmp_expected, in.data() + offset, 8);
args.expected_elements = static_cast<size_t>(tmp_expected);
offset += 8;
args.num_hashes = static_cast<size_t>(in[offset]);
}
return args;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

/**
* @brief Arguments for TxnPrepare/Commit/Abort RPC
*/
Expand Down
Loading
Loading