Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ set(CORE_SOURCES
src/distributed/raft_group.cpp
src/distributed/raft_manager.cpp
src/distributed/distributed_executor.cpp
src/common/bloom_filter.cpp
src/storage/columnar_table.cpp
)

Expand Down Expand Up @@ -117,6 +118,7 @@ if(BUILD_TESTS)
add_cloudsql_test(catalog_coverage_tests tests/catalog_coverage_tests.cpp)
add_cloudsql_test(transaction_coverage_tests tests/transaction_coverage_tests.cpp)
add_cloudsql_test(utils_coverage_tests tests/utils_coverage_tests.cpp)
add_cloudsql_test(bloom_filter_tests tests/bloom_filter_test.cpp)
add_cloudsql_test(cloudSQL_tests tests/cloudSQL_tests.cpp)
add_cloudsql_test(server_tests tests/server_tests.cpp)
add_cloudsql_test(statement_tests tests/statement_tests.cpp)
Expand Down
41 changes: 40 additions & 1 deletion docs/performance/SQLITE_COMPARISON.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,47 @@ We addressed the gaps via the following optimizations:
2. **Pinned Page Iteration**: Modifying our `HeapTable::Iterator` to hold pages pinned across slot iteration avoids repetitive atomic checks and LRU updates per-row.
3. **Batch Insert Mode**: Skipping single-row undo logs and exclusive locks to exploit pure in-memory bump allocation. This drove the `INSERT` speedup well past SQLite limits, as we write raw tuples uninterrupted.

## 6. Future Roadmap
## 6. Distributed Join Optimization: Bloom Filters

### Problem
Distributed shuffle joins send **all tuples** across the network to partitioned nodes, even when many will never match. This causes unnecessary network traffic and buffer memory usage.

### Solution: Bloom Filter Integration
Implemented bloom filters to filter tuples at the source before network transmission:
- **One-sided bloom filter**: Built from the left/build table, applied to filter the right/probe table
- **Distributed construction**: Each data node constructs its local bloom during the left/build scan phase
- **Coordinator coordination**: `BloomFilterPush` RPC broadcasts filter metadata to all nodes before the right/probe shuffle

### Architecture
```
[Phase 1: Shuffle Left] [Phase 2: Shuffle Right]
| |
v v
Build local bloom Apply bloom filter
from join keys before buffering
| |
+---- BloomFilterPush ----->---+
(filter metadata) |
v
Filtered tuples buffered
```
Comment on lines +54 to +65
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add a language tag to the fenced diagram block.

The code fence is missing a language identifier, which triggers MD040.

Suggested patch
-```
+```text
 [Phase 1: Shuffle Left]     [Phase 2: Shuffle Right]
      |                             |
      v                             v
 Build local bloom           Apply bloom filter
 from join keys              before buffering
      |                             |
      +---- BloomFilterPush ----->---+
      (filter metadata)              |
                                     v
                          Filtered tuples buffered
</details>

<details>
<summary>🧰 Tools</summary>

<details>
<summary>🪛 markdownlint-cli2 (0.22.0)</summary>

[warning] 54-54: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

</details>

</details>

<details>
<summary>🤖 Prompt for AI Agents</summary>

Verify each finding against the current code and only fix it if needed.

In @docs/performance/SQLITE_COMPARISON.md around lines 54 - 65, The fenced
diagram block lacks a language tag (MD040); update the code fence around the
ASCII diagram (the triple-backtick block containing "[Phase 1: Shuffle Left]
[Phase 2: Shuffle Right]" and the diagram lines) to include a language
identifier such as text (e.g., ```text) so the linter recognizes the block
correctly.


</details>

<!-- fingerprinting:phantom:triton:hawk:af1faed4-8e82-4fde-8eff-fae2e9a9c25f -->

<!-- This is an auto-generated comment by CodeRabbit -->


### Key Components
| Component | Location | Purpose |
|-----------|----------|---------|
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
| `PushData` handler | `src/main.cpp` | Applies bloom filter before buffering |
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
Comment thread
coderabbitai[bot] marked this conversation as resolved.

Comment on lines +67 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Surround the table with blank lines to satisfy markdownlint.

Add a blank line before/after the table block (MD058).

Suggested patch
 ### Key Components
+
 | Component | Location | Purpose |
 |-----------|----------|---------|
 | `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
 | `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
 | `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
 | `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
 | `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
 | Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
+
 ### Test Coverage
🧰 Tools
🪛 markdownlint-cli2 (0.22.0)

[warning] 68-68: Tables should be surrounded by blank lines

(MD058, blanks-around-tables)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/performance/SQLITE_COMPARISON.md` around lines 67 - 76, The markdown
table under the "### Key Components" heading in SQLITE_COMPARISON.md must be
surrounded by blank lines to satisfy markdownlint MD058; edit the block that
lists the components (the header "### Key Components" and the following table
rows including `BloomFilter`, `BloomFilterArgs`, `ClusterManager`, `PushData`,
`ShuffleFragment`, and `Coordinator`) and add one blank line before the table
and one blank line after the table so the table is separated from surrounding
text.

### Test Coverage
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
- Tests located in `tests/bloom_filter_test.cpp`

## 8. Future Roadmap
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
With the scan gap closed, our focus shifts to higher-level analytical throughput:
* **Stage 1: SIMD-Accelerated Filtering**: Utilize AVX-512/NEON instructions to filter multiple rows in a single CPU cycle.
* **Stage 2: Vectorized Execution**: Move from row-at-a-time `TupleView` to batch-at-a-time `VectorBatch` processing.
* **Stage 3: Columnar Storage**: Transition from row-oriented heap files to columnar persistence for extreme analytical scanning.
* **Stage 4: Distributed Hash Join**: Enhance the single `HashJoinOperator` with parallel partitioned hash join for multi-node execution.
9 changes: 9 additions & 0 deletions docs/phases/PHASE_6_DISTRIBUTED_JOIN.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Introduced isolated staging areas for inter-node data movement.
Developed a dedicated binary protocol for efficient data redistribution.
- **ShuffleFragment**: Metadata describing the fragment being pushed (target context, source node, schema).
- **PushData**: High-speed binary payload containing the actual tuple data for the shuffle phase.
- **BloomFilterPush**: Bloom filter metadata broadcast to enable tuple filtering before network transmission.

### 3. Two-Phase Join Orchestration (`distributed/distributed_executor.cpp`)
Implemented the control logic for distributed shuffle joins.
Expand All @@ -24,9 +25,17 @@ Implemented the control logic for distributed shuffle joins.
Seamlessly integrated shuffle buffers into the Volcano execution model.
- **Vectorized Buffering**: Optimized the `BufferScanOperator` to handle large volumes of redistributed data with minimal overhead.

### 5. Bloom Filter Optimization (`common/bloom_filter.hpp`)
Added probabilistic filtering to reduce network traffic in shuffle joins.
- **MurmurHash3-based BloomFilter**: Configurable false positive rate (default 1%) with optimal bit count and hash function calculation.
- **Filter Construction**: Built during Phase 1 scan, stored in `ClusterManager` per context.
- **Filter Application**: `PushData` handler checks `might_contain()` before buffering, skipping tuples that will definitely not match.

## Lessons Learned
- Shuffle joins significantly reduce network traffic compared to broadcast joins for large-to-large table joins.
- Fine-grained locking in the shuffle buffers is critical for maintaining high throughput during the redistribution phase.
- Bloom filters provide significant network traffic reduction when join selectivity is low, at the cost of a small false positive rate (typically <1%).

## Status: 100% Test Pass
Verified the end-to-end shuffle join flow, including multi-node data movement and final result merging, through automated integration tests.
- 10 unit tests for bloom filter implementation and integration (`tests/bloom_filter_test.cpp`)
1 change: 1 addition & 0 deletions docs/phases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ This directory contains the technical documentation for the lifecycle of the clo
- Context-aware Shuffle infrastructure in `ClusterManager`.
- Implementation of `ShuffleFragment` and `PushData` RPC protocols.
- Two-phase Shuffle Join orchestration in `DistributedExecutor`.
- **Bloom Filter Optimization**: Probabilistic tuple filtering to reduce network traffic in shuffle joins.

### [Phase 7: Replication & High Availability](./PHASE_7_REPLICATION_HA.md)
**Focus**: Fault Tolerance & Data Redundancy.
Expand Down
83 changes: 83 additions & 0 deletions include/common/bloom_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* @file bloom_filter.hpp
* @brief Bloom filter implementation for distributed join optimization
*/

#ifndef SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
#define SQL_ENGINE_COMMON_BLOOM_FILTER_HPP

#include <cstdint>
#include <cstring>
#include <vector>

#include "value.hpp"

namespace cloudsql {
namespace common {

/**
* @brief Bloom filter for probabilistic membership testing
*
* Used in distributed joins to filter tuples that cannot possibly
* match before network transmission.
*/
class BloomFilter {
public:
/**
* @brief Construct a bloom filter with expected elements and false positive rate
* @param expected_elements Number of elements expected to be inserted
* @param false_positive_rate Target false positive rate (default 0.01 = 1%)
*/
explicit BloomFilter(size_t expected_elements, double false_positive_rate = 0.01);

/**
* @brief Construct from serialized data
*/
BloomFilter(const uint8_t* data, size_t size);

/**
* @brief Insert a value into the bloom filter
*/
void insert(const Value& key);

/**
* @brief Check if a value might be in the bloom filter
* @return true if possibly present, false if definitely not present
*/
[[nodiscard]] bool might_contain(const Value& key) const;

/**
* @brief Serialize the bloom filter for network transmission
*/
[[nodiscard]] std::vector<uint8_t> serialize() const;

/**
* @brief Get the bit array size in bytes
*/
[[nodiscard]] size_t bit_size() const { return (num_bits_ + 7) / 8; }

/**
* @brief Get number of hash functions used
*/
[[nodiscard]] size_t num_hashes() const { return num_hashes_; }

/**
* @brief Get expected elements
*/
[[nodiscard]] size_t expected_elements() const { return expected_elements_; }

private:
size_t num_bits_;
size_t num_hashes_;
size_t expected_elements_;
std::vector<uint8_t> bits_;

size_t get_bit_position(size_t hash, size_t i) const;
size_t murmur3_hash(const Value& key) const;
size_t murmur3_hash(const uint8_t* data, size_t len, size_t seed) const;
};

} // namespace common
} // namespace cloudsql

#endif // SQL_ENGINE_COMMON_BLOOM_FILTER_HPP
85 changes: 85 additions & 0 deletions include/common/cluster_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <unordered_map>
#include <vector>

#include "common/bloom_filter.hpp"
#include "common/config.hpp"
#include "executor/types.hpp"

Expand Down Expand Up @@ -210,7 +211,89 @@ class ClusterManager {
return data;
}

/**
* @brief Store a bloom filter for a shuffle context
*/
void set_bloom_filter(const std::string& context_id, const std::string& build_table,
const std::string& probe_table, const std::string& probe_key_col,
std::vector<uint8_t> filter_data, size_t expected_elements,
size_t num_hashes) {
const std::scoped_lock<std::mutex> lock(mutex_);
auto& entry = bloom_filters_[context_id];
entry.build_table = build_table;
entry.probe_table = probe_table;
entry.probe_key_col = probe_key_col;
entry.filter_data = std::move(filter_data);
entry.expected_elements = expected_elements;
entry.num_hashes = num_hashes;
}

/**
* @brief Check if a bloom filter exists for a context
*/
[[nodiscard]] bool has_bloom_filter(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
return bloom_filters_.count(context_id) != 0U;
}

/**
* @brief Get bloom filter for a context (reconstructs BloomFilter object)
*/
[[nodiscard]] common::BloomFilter get_bloom_filter(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end() && !it->second.filter_data.empty()) {
return common::BloomFilter(it->second.filter_data.data(),
it->second.filter_data.size());
}
return common::BloomFilter(1); // Empty filter
}

/**
* @brief Get probe table name for a context
*/
[[nodiscard]] std::string get_probe_table(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end()) {
return it->second.probe_table;
}
return "";
}

/**
* @brief Get probe key column for a context
*/
[[nodiscard]] std::string get_probe_key_col(const std::string& context_id) const {
const std::scoped_lock<std::mutex> lock(mutex_);
auto it = bloom_filters_.find(context_id);
if (it != bloom_filters_.end()) {
return it->second.probe_key_col;
}
return "";
}

/**
* @brief Clear bloom filter for a context
*/
void clear_bloom_filter(const std::string& context_id) {
const std::scoped_lock<std::mutex> lock(mutex_);
bloom_filters_.erase(context_id);
}

private:
/**
* @brief Stored bloom filter data for a context
*/
struct BloomFilterEntry {
std::string build_table;
std::string probe_table;
std::string probe_key_col; // Join key column on probe side
std::vector<uint8_t> filter_data;
size_t expected_elements = 0;
size_t num_hashes = 0;
};

const config::Config* config_;
raft::RaftManager* raft_manager_;
NodeInfo self_node_;
Expand All @@ -220,6 +303,8 @@ class ClusterManager {
/* context_id -> table_name -> rows */
std::unordered_map<std::string, std::unordered_map<std::string, std::vector<executor::Tuple>>>
shuffle_buffers_;
/* context_id -> bloom filter data */
std::unordered_map<std::string, BloomFilterEntry> bloom_filters_;
mutable std::mutex mutex_;
};

Expand Down
68 changes: 68 additions & 0 deletions include/network/rpc_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ enum class RpcType : uint8_t {
TxnAbort = 8,
PushData = 9,
ShuffleFragment = 10,
BloomFilterPush = 11,
Error = 255
};

Expand Down Expand Up @@ -439,6 +440,73 @@ struct ShuffleFragmentArgs {
}
};

/**
* @brief Arguments for BloomFilterPush RPC
*/
struct BloomFilterArgs {
std::string context_id;
std::string build_table;
std::string probe_table;
std::string probe_key_col; // Join key column on probe side for filtering
std::vector<uint8_t> filter_data;
size_t expected_elements = 0;
size_t num_hashes = 0;

[[nodiscard]] std::vector<uint8_t> serialize() const {
std::vector<uint8_t> out;
Serializer::serialize_string(context_id, out);
Serializer::serialize_string(build_table, out);
Serializer::serialize_string(probe_table, out);
Serializer::serialize_string(probe_key_col, out);

// Serialize filter data (blob)
const auto filter_len = static_cast<uint32_t>(filter_data.size());
const size_t off = out.size();
out.resize(off + Serializer::VAL_SIZE_32);
std::memcpy(out.data() + off, &filter_len, Serializer::VAL_SIZE_32);
out.insert(out.end(), filter_data.begin(), filter_data.end());

// Serialize metadata using fixed-width temporaries
uint64_t tmp_expected = static_cast<uint64_t>(expected_elements);
uint8_t tmp_hashes = static_cast<uint8_t>(num_hashes);
const size_t off2 = out.size();
out.resize(off2 + 9); // 8 bytes for expected_elements + 1 for num_hashes
std::memcpy(out.data() + off2, &tmp_expected, 8);
out[off2 + 8] = tmp_hashes;
return out;
}

static BloomFilterArgs deserialize(const std::vector<uint8_t>& in) {
BloomFilterArgs args;
size_t offset = 0;
args.context_id = Serializer::deserialize_string(in.data(), offset, in.size());
args.build_table = Serializer::deserialize_string(in.data(), offset, in.size());
args.probe_table = Serializer::deserialize_string(in.data(), offset, in.size());
args.probe_key_col = Serializer::deserialize_string(in.data(), offset, in.size());

uint32_t filter_len = 0;
if (offset + Serializer::VAL_SIZE_32 <= in.size()) {
std::memcpy(&filter_len, in.data() + offset, Serializer::VAL_SIZE_32);
offset += Serializer::VAL_SIZE_32;
}
if (offset + filter_len <= in.size()) {
args.filter_data.resize(filter_len);
std::memcpy(args.filter_data.data(), in.data() + offset, filter_len);
offset += filter_len;
}

// Deserialize metadata using fixed-width temporaries
if (offset + 9 <= in.size()) {
uint64_t tmp_expected = 0;
std::memcpy(&tmp_expected, in.data() + offset, 8);
args.expected_elements = static_cast<size_t>(tmp_expected);
offset += 8;
args.num_hashes = static_cast<size_t>(in[offset]);
}
return args;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

/**
* @brief Arguments for TxnPrepare/Commit/Abort RPC
*/
Expand Down
Loading