Build agent swarms that discover each other, share abilities, join conversations, expose custom endpoints, and wake up on schedules like sunrise... across any network.
Synapse is a lightweight peer-to-peer substrate for agent infrastructure. Give each process a Node and Synapse gives that node a name, peers, capabilities, RPC endpoints, shared conversations, agent cards, heartbeats, and periodic tasks.
from synapse_p2p import Node, solar
node = Node(
name="garden-node",
swarm="garden.example.com",
capabilities=["sensors", "watering"],
mdns=True,
)
@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def morning_check() -> None:
await node.broadcast("garden.status")
node.run()A node can wrap an LLM agent, a script, a service, a sensor, or a tool. Synapse is not the agent brain. It is the swarm layer agents stand on.
What makes it fun:
- Swarms are teamsβnodes find teammates by swarm name.
- Nodes have names and abilitiesβadvertise
code-review,weather,memory,watering, anything. - Shared conversationsβbroadcast once; many nodes can wade in, reply, leave, and come back later.
- Custom endpointsβexpose any async function as swarm-callable RPC.
- Agent cardsβpublish metadata so peers can understand what you are.
- Periodic tasksβstart work every minute, every weekday, or literally at sunrise.
Using uv or pip:
uv add synapse-p2ppip install synapse-p2pThen:
sn --helpSynapse uses these words carefully:
| Word | Meaning |
|---|---|
| Node | A running Synapse participant. You create one with Node(...). |
| Peer | Another node this node knows about. |
| Swarm | Nodes with the same swarm name. |
| Agent | Your higher-level logic: an LLM loop, workflow, script, or automation. |
| Capability | A short advertised skill like code-review, web-research, or watering. |
| Endpoint | An async function peers can call over RPC. |
Use node for Synapse networking. Use agent for the logic you put behind a node.
Synapse gives nodes:
- a name and swarm identity
- peer discovery over mDNS or seeds
- advertised capabilities / abilities
- custom async RPC endpoints
- direct task delegation
- shared broadcast conversations
- agent cards and other advertised metadata
- periodic tasks on interval, cron, or solar time
- heartbeats and offline detection
- a simple MsgPack-over-TCP protocol
Most agent frameworks start with one agent loop. Synapse starts with the swarm.
planner-node ββ broadcasts "who can review this?"
β
βββ security-node replies now
βββ tests-node replies now
βββ docs-node replies later using the same conversation nonce
A node can join a swarm, advertise what it can do, expose custom endpoints, publish an agent card, and participate in conversations without a central coordinator.
Periodic tasks make nodes feel alive:
@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def wake_up() -> None:
await node.broadcast("daily.start")- Research swarms β a planner node broadcasts a question; specialist nodes wade into the same conversation.
- Code-review teams β nodes advertise
security,tests, ordocs; a coordinator delegates to the right peer. - Local-first automations β laptop, server, and Raspberry Pi nodes discover each other with mDNS.
- Sunrise agents β a node starts work at sunrise, sunset, or civil twilight.
- Internal agent APIs β put an LLM agent behind a custom endpoint so other nodes can discover and call it.
- Self-describing tools β publish an agent card with name, role, abilities, input modes, and output modes.
- Live dashboards β watch joins, heartbeats, messages, replies, and offline events with
sn watch.
Synapse handles the substrate. You bring the behavior.
A2A is a full agent interoperability protocol. Synapse is much smaller: a peer-to-peer substrate for nodes that need to find each other and talk.
Use A2A when you need a formal cross-vendor agent protocol with task lifecycle, message parts, artifacts, streaming, push notifications, and enterprise-style integration points.
Use Synapse when you want to build a swarm quickly:
| A2A | Synapse |
|---|---|
| Agent protocol | Swarm substrate |
| HTTP / JSON-RPC oriented | Length-prefixed MsgPack over TCP |
| Formal task lifecycle | Simple RPC, ask, and broadcast |
| Agent cards are central | Agent cards are optional artifacts |
| More concepts to implement | One main primitive: Node |
| Best for interoperability | Best for local-first swarms and fast experimentation |
Synapse is intentionally less bloated:
- no required task state machine
- no required message/part/artifact object model
- no server/client role ceremony inside a swarm
- no central coordinator
- no hosted registry requirement
- no opinion about how agents think
The core idea is simple: start nodes, advertise abilities, call endpoints, broadcast into shared conversations, and run periodic jobs.
Create a node with an endpoint:
from synapse_p2p import Node
node = Node(name="calculator", port=9999)
@node.endpoint("sum", description="Add two numbers")
async def sum(a: int, b: int) -> int:
return a + b
node.run()Call it:
import asyncio
from synapse_p2p import Client
async def main() -> None:
result = await Client("127.0.0.1", 9999).call("sum", 1, 2)
print(result)
asyncio.run(main())A swarm is a group of nodes with the same swarm name. Nodes only join and heartbeat peers in their own swarm.
node = Node(
name="coder",
swarm="foo.electron.network",
capabilities=["python", "tests"],
)Use a domain-style swarm name to avoid collisions.
Use mDNS for zero-config discovery on a LAN:
node = Node(
name="reviewer",
swarm="foo.electron.network",
capabilities=["code-review"],
mdns=True,
)
await node.start()
await node.join()Any node on the same LAN with the same swarm and mdns=True can discover it.
Use seeds when nodes are not on the same LAN:
node = Node(
name="planner",
swarm="foo.electron.network",
seeds=["bootstrap.foo.electron.network:9999"],
)
await node.start()
await node.join()A seed is just another Synapse node. It is a first contact point, not a coordinator. Once joined, nodes exchange known peers and can talk directly.
Capabilities tell peers what a node can do.
node = Node(capabilities=["python", "code-review"])Use structured capabilities when you want descriptions and schemas:
from synapse_p2p import Capability, Node
node = Node(
name="researcher",
capabilities=[
Capability(
name="web-research",
description="Find and summarize sources.",
input_schema={"query": "string"},
output_schema={"summary": "string", "sources": "array"},
)
],
)Inspect a peer:
info = await client.call("_node.info")
capabilities = await client.call("_node.capabilities")
methods = await client.call("_synapse.methods")Use @node.ask for the default task handler on a node. Synapse provides the transport; your agent code decides whether and how to answer.
from synapse_p2p import Node
node = Node(name="reviewer", capabilities=["code-review"])
@node.ask
async def handle(task: str, context: dict):
return {"status": "done", "task": task}There are two ways to use it.
Call one known peer directly with _node.ask:
from synapse_p2p import Client
result = await Client.from_peer(peer).call(
"_node.ask",
"Review this diff",
context={"diff": diff},
)Broadcast to the built-in synapse.ask endpoint when you want any interested node to wade in:
broadcast = await node.broadcast(
"synapse.ask",
"Review this diff",
context={"diff": diff},
)A node with an @node.ask handler will ACK the conversation, run the handler, and reply with the result. Nodes without a handler fail quietly from the caller's point of view, just like any other broadcast recipient that cannot help.
The CLI wraps this flow:
sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1Use broadcast when you do not know which node should answer.
broadcast = await node.broadcast("team.question", "Who can review this diff?")Every receiver gets the same conversation nonce. Any node can reply:
from synapse_p2p import Broadcast
@node.endpoint("team.question")
async def answer(question: str, broadcast: Broadcast) -> dict:
await node.reply(broadcast, {"answer": "I can help"})
return {"accepted": True}The origin node can read all replies:
for reply in node.replies(broadcast):
print(reply.peer.name, reply.result)Synapse also keeps a lightweight conversation event log. A broadcast creates a message event whose conversation_id is the broadcast nonce. Nodes may opt into the conversation with ack or other events; Synapse does not decide who should answer.
@node.endpoint("team.question")
async def answer(question: str, broadcast: Broadcast) -> dict:
# ACK means "I saw this and am choosing to wade in".
# It does not mean Synapse assigned this node the work.
await node.ack(broadcast, {"seen": True})
await node.reply(broadcast, {"answer": "I can help"})
return {"accepted": True}Listen for conversation events:
from synapse_p2p import ConversationEvent
@node.on("conversation.ack")
async def on_ack(event: ConversationEvent) -> None:
print(event.peer.name, "acked", event.conversation_id)
for event in node.conversation(broadcast):
print(event.kind, event.peer.name, event.payload)Built-in conversation event kinds are intentionally small conventions:
messageβ a broadcast or conversation message was seenackβ a node chose to acknowledge / enter the conversationreplyβ a node replied with a result
Higher-level agent frameworks can layer routing, claiming, status, artifacts, or task semantics on top by emitting their own event kinds with node.emit_conversation_event(...).
Why this is useful:
- one broadcast creates one shared conversation
- every node sees the same nonce
- nodes can wade in or stay silent
- ACK is opt-in, not automatic assignment
- replies and events group without a central coordinator
- UUIDv7 nonces keep conversations roughly time-ordered when the runtime supports them
Nodes can wake up on a schedule: every few seconds, every weekday morning, or when the sun rises.
from synapse_p2p import Node, cron, every, solar
node = Node(name="worker")
@node.periodic(every(seconds=30))
async def refresh_cache() -> None:
print("refreshing cache")
@node.periodic(cron("0 9 * * mon-fri", tz="Europe/London"))
async def weekday_digest() -> None:
print("weekday digest")
@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def sunrise_job() -> None:
print("the sun is up; time to work")
node.run()A number is shorthand for seconds:
@node.periodic(30) # equivalent to every(seconds=30)
async def refresh_cache() -> None:
...Built-in schedules:
every(seconds=..., minutes=..., hours=..., days=...)cron("*/15 * * * *", tz="UTC")solar("sunrise", latitude=..., longitude=..., tz="UTC")
Solar events include sunrise, sunset, solar_noon, civil_twilight_begin, civil_twilight_end, nautical_twilight_begin, nautical_twilight_end, astronomical_twilight_begin, and astronomical_twilight_end.
Notes:
- periodic handlers must be
async def - the first run starts immediately when the node starts
- later runs follow the schedule
- exceptions are logged and do not stop future runs
- long-running tasks can overlap if the next scheduled time arrives first
Nodes can advertise small metadata documents that peers can fetch over RPC.
node.artifact(
"agent-card",
{
"name": node.name,
"description": "Reviews Python PRs and returns concise feedback.",
"capabilities": ["code-review", "pytest"],
"input_modes": ["text", "git-diff", "url"],
"output_modes": ["text/markdown", "text/x-diff"],
},
mime_type="application/vnd.synapse.agent-card+json",
description="Self-description for peers that understand agent cards.",
)Fetch artifacts from a peer:
from synapse_p2p import Client
client = Client.from_peer(peer)
artifacts = await client.call("_synapse.artifacts")
agent_card = await client.call("_synapse.artifact.get", "agent-card")Synapse does not interpret artifacts. It serves bytes/JSON plus a MIME type. Your application decides what the artifact means.
Nodes heartbeat known peers and mark stale peers offline.
from synapse_p2p import Node, Peer
node = Node(name="planner", heartbeat_interval=5, peer_timeout=20)
@node.on("peer.joined")
async def joined(peer: Peer) -> None:
print(f"joined: {peer.name}")
@node.on("peer.offline")
async def offline(peer: Peer) -> None:
print(f"offline: {peer.name}")Offline means βnot seen within peer_timeout.β
The CLI is sn.
sn --helpsn watch foo.electron.networkUseful options:
sn watch foo.electron.network --show-heartbeats
sn watch foo.electron.network --seed 192.168.1.25:9000 --no-mdns
sn watch foo.electron.network --team backend
sn watch foo.electron.network --no-capabilitiessn ask broadcasts to the built-in synapse.ask endpoint. Nodes with a @node.ask handler can opt in with ACK and reply with their result.
sn ask foo.electron.network "Review this diff"
sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1
sn ask foo.electron.network "Who can help?" --foreverExample output:
ask: 019e4ab0-1d0d-709a-...
waiting for ACKs and replies... press Ctrl+C to stop
β reviewer acked
- reviewer: LGTM after fixing tests
sn broadcast foo.electron.network "Who can review this diff?"
sn broadcast foo.electron.network "Who can help?" --forever
sn broadcast foo.electron.network "Ship status?" --discover 2 --timeout 10sn list-swarms
sn list-swarms --seconds 5See examples/. Each example folder has its own README.
| Example | What it demonstrates |
|---|---|
basic_rpc |
The smallest direct RPC node/client pair. |
isolated_agents |
One node delegates to another through a known seed. |
bootstrap_team_trio |
Bootstrap discovery, ask handlers, and fetching agent cards. |
local_mdns_swarm |
Zero-config mDNS discovery plus ACKs/replies in one conversation. |
pydantic_ai_team |
Pydantic AI agents behind Synapse nodes. |
periodic_tasks |
Interval, cron, sunrise, and sunset jobs in a garden-caretaker node. |
stock_trading_team |
Analyst/news/trader swarm with a dumb paper exchange API and market-hours periodic scans. |
The agent examples use synapse.ask, opt-in ACKs, shared conversation replies, and advertised agent-card artifacts. The stock example shows a periodic job that checks market hours before asking the swarm, so agent/model work only happens when the paper market is open.
Built-in endpoints:
| Endpoint | Purpose |
|---|---|
_synapse.ping |
health check |
_synapse.info |
node identity and swarm metadata |
_synapse.methods |
published RPC methods |
_synapse.peers |
known peers |
_synapse.join |
join through a seed |
_synapse.heartbeat |
update peer liveness |
_synapse.broadcast.reply |
reply to a broadcast nonce |
_synapse.conversation.event |
gossip a shared conversation event |
_synapse.artifacts |
list advertised artifacts |
_synapse.artifact.get |
fetch one advertised artifact |
_node.info |
name, role, description, capabilities |
_node.capabilities |
machine-readable capabilities |
_node.ask |
delegate directly to the node ask handler |
synapse.ask |
swarm-facing ask endpoint used by sn ask |
Wire format:
- 4-byte unsigned big-endian payload length
- MsgPack payload bytes
Request:
{
"type": "request",
"id": "request-id",
"endpoint": "sum",
"args": [1, 2],
"kwargs": {},
}Response:
{
"type": "response",
"id": "request-id",
"ok": True,
"result": 3,
"error": None,
}Useful low-level exports:
from synapse_p2p import (
AdvertisedArtifact,
Broadcast,
BroadcastReply,
Capability,
Client,
ConversationEvent,
Node,
Peer,
RPCError,
RPCRequest,
RPCResponse,
ServedArtifact,
)Enable logs when debugging:
from loguru import logger
logger.enable("synapse_p2p")Synapse does not implement planning, memory, consensus, auth policy, NAT traversal, hosted registries, or UX.
Those belong above Synapse.
Synapse is the substrate:
nodes + discovery + capabilities + conversations + artifacts + heartbeats + schedules + a tiny protocol
swarm substrate, agent substrate, node discovery, local mDNS discovery, agent-to-agent networking, LLM agent RPC, multi-agent systems, capability discovery, language-agnostic RPC, Python RPC, asyncio RPC, peer-to-peer Python, P2P networking, MsgPack RPC, TCP RPC, distributed agents.