-
-
Notifications
You must be signed in to change notification settings - Fork 16
feat: add chain sync protocol for block history synchronization #84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -156,6 +156,50 @@ async def handler(data): | |
| else: | ||
| logger.warning("📥 Received Block #%s — rejected", block.index) | ||
|
|
||
| elif msg_type == "status": | ||
| import json as _json | ||
| peer_height = payload["height"] | ||
| my_height = chain.height | ||
|
|
||
| if peer_height > my_height: | ||
| writer = data.get("_writer") | ||
| if writer: | ||
| request = _json.dumps({ | ||
| "type": "get_blocks", | ||
| "data": { | ||
| "from_height": my_height + 1, | ||
| "to_height": peer_height | ||
| } | ||
| }) + "\n" | ||
| writer.write(request.encode()) | ||
| await writer.drain() | ||
| logger.info("📡 Requesting blocks %d~%d from %s", | ||
| my_height + 1, peer_height, peer_addr) | ||
| elif msg_type == "get_blocks": | ||
| import json as _json | ||
| from_h = payload["from_height"] | ||
| to_h = payload["to_height"] | ||
| blocks = chain.get_blocks_range(from_h, to_h) | ||
|
|
||
| writer = data.get("_writer") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Injecting _writer into the message payload and directly calling writer.write(...) in main.py breaks the abstraction of the P2P layer. If we ever switch the transport layer (e.g., from raw sockets to WebSockets or a library), we will have to rewrite the application logic in main.py. |
||
| if writer and blocks: | ||
| response = _json.dumps({ | ||
| "type": "blocks", | ||
| "data": {"blocks": blocks} | ||
| }) + "\n" | ||
| writer.write(response.encode()) | ||
| await writer.drain() | ||
| logger.info("📤 Sent %d blocks to %s", len(blocks), peer_addr) | ||
|
|
||
| elif msg_type == "blocks": | ||
| received = payload["blocks"] | ||
| success, count = chain.add_blocks_bulk(received) | ||
|
|
||
| if success: | ||
| logger.info("✅ Chain synced: added %d blocks", count) | ||
| else: | ||
| logger.warning("❌ Chain sync failed after %d blocks", count) | ||
|
Comment on lines
+159
to
+201
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. History import assumes a state snapshot invariant that
🤖 Prompt for AI Agents |
||
|
|
||
| return handler | ||
|
|
||
|
|
||
|
|
@@ -324,7 +368,14 @@ async def on_peer_connected(writer): | |
| }) + "\n" | ||
| writer.write(sync_msg.encode()) | ||
| await writer.drain() | ||
| logger.info("🔄 Sent state sync to new peer") | ||
|
|
||
| status_msg = _json.dumps({ | ||
| "type": "status", | ||
| "data": {"height": chain.height} | ||
| }) + "\n" | ||
| writer.write(status_msg.encode()) | ||
| await writer.drain() | ||
| logger.info("🔄 Sent state sync and status to new peer") | ||
|
Comment on lines
+372
to
+378
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Capture This 🤖 Prompt for AI Agents |
||
|
|
||
| network.register_on_peer_connected(on_peer_connected) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -54,6 +54,12 @@ def last_block(self): | |
| with self._lock: # Acquire lock for thread-safe access | ||
| return self.chain[-1] | ||
|
|
||
| @property | ||
| def height(self) -> int: | ||
| """Returns the current chain height (genesis = 0)""" | ||
| with self._lock: | ||
| return len(self.chain) - 1 | ||
|
|
||
| def add_block(self, block): | ||
| """ | ||
| Validates and adds a block to the chain if all transactions succeed. | ||
|
|
@@ -82,3 +88,26 @@ def add_block(self, block): | |
| self.state = temp_state | ||
| self.chain.append(block) | ||
| return True | ||
|
|
||
| def get_blocks_range(self, from_height: int, to_height: int) -> list: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @0u-Y |
||
| """Return serialized blocks from from_height to to_height inclusive.""" | ||
| with self._lock: | ||
| to_height = min(to_height, len(self.chain) - 1) | ||
| if from_height > to_height or from_height < 0: | ||
| return [] | ||
| return [b.to_dict() for b in self.chain[from_height:to_height + 1]] | ||
|
|
||
| def add_blocks_bulk(self, block_dicts: list) -> tuple: | ||
| """Add blocks validating chain linkage only. State relies on sync message.""" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @0u-Y There is a fundamental issue with add_blocks_bulk. The method skips executing transactions against the state and explicitly mentions State relies on sync message. In a blockchain, the state must be deterministically derived from the blocks. If we decouple block synchronization from state execution:
|
||
| added = 0 | ||
| for block_dict in block_dicts: | ||
| block = Block.from_dict(block_dict) | ||
| with self._lock: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here lock is acquired and released on every iteration. Between iterations another thread can append a block, making self.last_block stale for the next call to validate_block_link_and_hash. Move the with self._lock: outside the loop. |
||
| try: | ||
| validate_block_link_and_hash(self.last_block, block) | ||
| except ValueError as exc: | ||
| logger.warning("Block %s rejected: %s", block.index, exc) | ||
| return False, added | ||
| self.chain.append(block) | ||
| added += 1 | ||
| return True, added | ||
|
Comment on lines
+100
to
+113
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make A failure on a later element returns 🔧 Proposed fix def add_blocks_bulk(self, block_dicts: list) -> tuple:
"""Add blocks validating chain linkage only. State relies on sync message."""
- added = 0
- for block_dict in block_dicts:
- block = Block.from_dict(block_dict)
- with self._lock:
- try:
- validate_block_link_and_hash(self.last_block, block)
- except ValueError as exc:
- logger.warning("Block %s rejected: %s", block.index, exc)
- return False, added
- self.chain.append(block)
- added += 1
- return True, added
+ with self._lock:
+ tip = self.chain[-1]
+ new_blocks = []
+
+ for block_dict in block_dicts:
+ block = Block.from_dict(block_dict)
+ try:
+ validate_block_link_and_hash(tip, block)
+ except ValueError as exc:
+ logger.warning("Block %s rejected: %s", block.index, exc)
+ return False, 0
+ new_blocks.append(block)
+ tip = block
+
+ self.chain.extend(new_blocks)
+ return True, len(new_blocks)🤖 Prompt for AI Agents |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
| logger = logging.getLogger(__name__) | ||
|
|
||
| TOPIC = "minichain-global" | ||
| SUPPORTED_MESSAGE_TYPES = {"sync", "tx", "block"} | ||
| SUPPORTED_MESSAGE_TYPES = {"sync", "tx", "block", "status", "get_blocks", "blocks"} | ||
|
|
||
|
|
||
| class P2PNetwork: | ||
|
|
@@ -207,6 +207,37 @@ def _validate_block_payload(self, payload): | |
| for tx_payload in payload["transactions"] | ||
| ) | ||
|
|
||
| def _validate_status_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"height"}: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed that the new payload validators repeat the exact same type-checking logic, specifically checking if the payload is a dictionary and validating the exact keys using set(payload) != {...}. While I see this follows the pattern of older validators in the file, it introduces unnecessary boilerplate. You can consider creating a generic helper method to reduce the duplication, or simply inline the dict check if the logic is very simple |
||
| return False | ||
| if not isinstance(payload["height"], int) or payload["height"] < 0: | ||
| return False | ||
| return True | ||
|
|
||
| def _validate_get_blocks_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"from_height", "to_height"}: | ||
| return False | ||
| fh, th = payload.get("from_height"), payload.get("to_height") | ||
| if not isinstance(fh, int) or not isinstance(th, int): | ||
| return False | ||
| if fh < 0 or fh > th: | ||
| return False | ||
| return True | ||
|
|
||
| def _validate_blocks_payload(self, payload): | ||
| if not isinstance(payload, dict): | ||
| return False | ||
| if set(payload) != {"blocks"}: | ||
| return False | ||
| blocks = payload.get("blocks") | ||
| if not isinstance(blocks, list): | ||
| return False | ||
| return all(isinstance(b, dict) for b in blocks) | ||
|
Comment on lines
+231
to
+239
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Validate block entries inside This currently accepts any 🔧 Proposed fix def _validate_blocks_payload(self, payload):
if not isinstance(payload, dict):
return False
if set(payload) != {"blocks"}:
return False
blocks = payload.get("blocks")
if not isinstance(blocks, list):
return False
- return all(isinstance(b, dict) for b in blocks)
+ return all(self._validate_block_payload(block) for block in blocks)🧰 Tools🪛 Ruff (0.15.7)[warning] 231-231: Missing return type annotation for private function (ANN202) 🤖 Prompt for AI Agents |
||
|
|
||
| def _validate_message(self, message): | ||
| if not isinstance(message, dict): | ||
| return False | ||
|
|
@@ -226,6 +257,9 @@ def _validate_message(self, message): | |
| "sync": self._validate_sync_payload, | ||
| "tx": self._validate_transaction_payload, | ||
| "block": self._validate_block_payload, | ||
| "status": self._validate_status_payload, | ||
| "get_blocks": self._validate_get_blocks_payload, | ||
| "blocks": self._validate_blocks_payload, | ||
| } | ||
| return validators[msg_type](payload) | ||
|
|
||
|
|
@@ -283,6 +317,7 @@ async def _listen_to_peer( | |
| continue | ||
| self._mark_seen(msg_type, payload) | ||
| data["_peer_addr"] = addr | ||
| data["_writer"] = writer | ||
|
|
||
| if self._handler_callback: | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"to_height": peer_height - if a peer lies about its height (e.g. claims height 999999), we request all of it with no cap. The review only flags the responder side in p2p.py, but the requester side in main.py is equally vulnerable.