Skip to content

dvf/synapse

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

82 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation


Synapse Logo


Build agent swarms that discover each other, share abilities, join conversations, expose custom endpoints, and wake up on schedules like sunrise... across any network.

PyPI Tests License Python PyPI - Downloads



Synapse is a lightweight peer-to-peer substrate for agent infrastructure. Give each process a Node and Synapse gives that node a name, peers, capabilities, RPC endpoints, shared conversations, agent cards, heartbeats, and periodic tasks.

from synapse_p2p import Node, solar

node = Node(
    name="garden-node",
    swarm="garden.example.com",
    capabilities=["sensors", "watering"],
    mdns=True,
)


@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def morning_check() -> None:
    await node.broadcast("garden.status")


node.run()

A node can wrap an LLM agent, a script, a service, a sensor, or a tool. Synapse is not the agent brain. It is the swarm layer agents stand on.

What makes it fun:

  • Swarms are teamsβ€”nodes find teammates by swarm name.
  • Nodes have names and abilitiesβ€”advertise code-review, weather, memory, watering, anything.
  • Shared conversationsβ€”broadcast once; many nodes can wade in, reply, leave, and come back later.
  • Custom endpointsβ€”expose any async function as swarm-callable RPC.
  • Agent cardsβ€”publish metadata so peers can understand what you are.
  • Periodic tasksβ€”start work every minute, every weekday, or literally at sunrise.

πŸ“¦ Install

Using uv or pip:

uv add synapse-p2p
pip install synapse-p2p

Then:

sn --help

🧠 The mental model

Synapse uses these words carefully:

Word Meaning
Node A running Synapse participant. You create one with Node(...).
Peer Another node this node knows about.
Swarm Nodes with the same swarm name.
Agent Your higher-level logic: an LLM loop, workflow, script, or automation.
Capability A short advertised skill like code-review, web-research, or watering.
Endpoint An async function peers can call over RPC.

Use node for Synapse networking. Use agent for the logic you put behind a node.

Synapse gives nodes:

  • a name and swarm identity
  • peer discovery over mDNS or seeds
  • advertised capabilities / abilities
  • custom async RPC endpoints
  • direct task delegation
  • shared broadcast conversations
  • agent cards and other advertised metadata
  • periodic tasks on interval, cron, or solar time
  • heartbeats and offline detection
  • a simple MsgPack-over-TCP protocol

✨ Why it feels different

Most agent frameworks start with one agent loop. Synapse starts with the swarm.

planner-node ── broadcasts "who can review this?"
      β”‚
      β”œβ”€β”€ security-node replies now
      β”œβ”€β”€ tests-node replies now
      └── docs-node replies later using the same conversation nonce

A node can join a swarm, advertise what it can do, expose custom endpoints, publish an agent card, and participate in conversations without a central coordinator.

Periodic tasks make nodes feel alive:

@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def wake_up() -> None:
    await node.broadcast("daily.start")

πŸ› οΈ What can you build?

  • Research swarms β€” a planner node broadcasts a question; specialist nodes wade into the same conversation.
  • Code-review teams β€” nodes advertise security, tests, or docs; a coordinator delegates to the right peer.
  • Local-first automations β€” laptop, server, and Raspberry Pi nodes discover each other with mDNS.
  • Sunrise agents β€” a node starts work at sunrise, sunset, or civil twilight.
  • Internal agent APIs β€” put an LLM agent behind a custom endpoint so other nodes can discover and call it.
  • Self-describing tools β€” publish an agent card with name, role, abilities, input modes, and output modes.
  • Live dashboards β€” watch joins, heartbeats, messages, replies, and offline events with sn watch.

Synapse handles the substrate. You bring the behavior.


βš–οΈ Synapse vs A2A

A2A is a full agent interoperability protocol. Synapse is much smaller: a peer-to-peer substrate for nodes that need to find each other and talk.

Use A2A when you need a formal cross-vendor agent protocol with task lifecycle, message parts, artifacts, streaming, push notifications, and enterprise-style integration points.

Use Synapse when you want to build a swarm quickly:

A2A Synapse
Agent protocol Swarm substrate
HTTP / JSON-RPC oriented Length-prefixed MsgPack over TCP
Formal task lifecycle Simple RPC, ask, and broadcast
Agent cards are central Agent cards are optional artifacts
More concepts to implement One main primitive: Node
Best for interoperability Best for local-first swarms and fast experimentation

Synapse is intentionally less bloated:

  • no required task state machine
  • no required message/part/artifact object model
  • no server/client role ceremony inside a swarm
  • no central coordinator
  • no hosted registry requirement
  • no opinion about how agents think

The core idea is simple: start nodes, advertise abilities, call endpoints, broadcast into shared conversations, and run periodic jobs.


⚑ Quickstart: RPC

Create a node with an endpoint:

from synapse_p2p import Node

node = Node(name="calculator", port=9999)


@node.endpoint("sum", description="Add two numbers")
async def sum(a: int, b: int) -> int:
    return a + b


node.run()

Call it:

import asyncio

from synapse_p2p import Client


async def main() -> None:
    result = await Client("127.0.0.1", 9999).call("sum", 1, 2)
    print(result)


asyncio.run(main())

🐝 Swarms and discovery

A swarm is a group of nodes with the same swarm name. Nodes only join and heartbeat peers in their own swarm.

node = Node(
    name="coder",
    swarm="foo.electron.network",
    capabilities=["python", "tests"],
)

Use a domain-style swarm name to avoid collisions.

πŸ“‘ Local discovery with mDNS

Use mDNS for zero-config discovery on a LAN:

node = Node(
    name="reviewer",
    swarm="foo.electron.network",
    capabilities=["code-review"],
    mdns=True,
)

await node.start()
await node.join()

Any node on the same LAN with the same swarm and mdns=True can discover it.

🌍 Remote discovery with seeds

Use seeds when nodes are not on the same LAN:

node = Node(
    name="planner",
    swarm="foo.electron.network",
    seeds=["bootstrap.foo.electron.network:9999"],
)

await node.start()
await node.join()

A seed is just another Synapse node. It is a first contact point, not a coordinator. Once joined, nodes exchange known peers and can talk directly.


🎯 Capabilities

Capabilities tell peers what a node can do.

node = Node(capabilities=["python", "code-review"])

Use structured capabilities when you want descriptions and schemas:

from synapse_p2p import Capability, Node

node = Node(
    name="researcher",
    capabilities=[
        Capability(
            name="web-research",
            description="Find and summarize sources.",
            input_schema={"query": "string"},
            output_schema={"summary": "string", "sources": "array"},
        )
    ],
)

Inspect a peer:

info = await client.call("_node.info")
capabilities = await client.call("_node.capabilities")
methods = await client.call("_synapse.methods")

🀝 Ask: delegate work

Use @node.ask for the default task handler on a node. Synapse provides the transport; your agent code decides whether and how to answer.

from synapse_p2p import Node

node = Node(name="reviewer", capabilities=["code-review"])


@node.ask
async def handle(task: str, context: dict):
    return {"status": "done", "task": task}

There are two ways to use it.

Direct ask

Call one known peer directly with _node.ask:

from synapse_p2p import Client

result = await Client.from_peer(peer).call(
    "_node.ask",
    "Review this diff",
    context={"diff": diff},
)

Swarm ask

Broadcast to the built-in synapse.ask endpoint when you want any interested node to wade in:

broadcast = await node.broadcast(
    "synapse.ask",
    "Review this diff",
    context={"diff": diff},
)

A node with an @node.ask handler will ACK the conversation, run the handler, and reply with the result. Nodes without a handler fail quietly from the caller's point of view, just like any other broadcast recipient that cannot help.

The CLI wraps this flow:

sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1

πŸ’¬ Broadcast: shared conversations

Use broadcast when you do not know which node should answer.

broadcast = await node.broadcast("team.question", "Who can review this diff?")

Every receiver gets the same conversation nonce. Any node can reply:

from synapse_p2p import Broadcast


@node.endpoint("team.question")
async def answer(question: str, broadcast: Broadcast) -> dict:
    await node.reply(broadcast, {"answer": "I can help"})
    return {"accepted": True}

The origin node can read all replies:

for reply in node.replies(broadcast):
    print(reply.peer.name, reply.result)

Synapse also keeps a lightweight conversation event log. A broadcast creates a message event whose conversation_id is the broadcast nonce. Nodes may opt into the conversation with ack or other events; Synapse does not decide who should answer.

@node.endpoint("team.question")
async def answer(question: str, broadcast: Broadcast) -> dict:
    # ACK means "I saw this and am choosing to wade in".
    # It does not mean Synapse assigned this node the work.
    await node.ack(broadcast, {"seen": True})
    await node.reply(broadcast, {"answer": "I can help"})
    return {"accepted": True}

Listen for conversation events:

from synapse_p2p import ConversationEvent


@node.on("conversation.ack")
async def on_ack(event: ConversationEvent) -> None:
    print(event.peer.name, "acked", event.conversation_id)


for event in node.conversation(broadcast):
    print(event.kind, event.peer.name, event.payload)

Built-in conversation event kinds are intentionally small conventions:

  • message β€” a broadcast or conversation message was seen
  • ack β€” a node chose to acknowledge / enter the conversation
  • reply β€” a node replied with a result

Higher-level agent frameworks can layer routing, claiming, status, artifacts, or task semantics on top by emitting their own event kinds with node.emit_conversation_event(...).

Why this is useful:

  • one broadcast creates one shared conversation
  • every node sees the same nonce
  • nodes can wade in or stay silent
  • ACK is opt-in, not automatic assignment
  • replies and events group without a central coordinator
  • UUIDv7 nonces keep conversations roughly time-ordered when the runtime supports them

πŸŒ… Periodic tasks

Nodes can wake up on a schedule: every few seconds, every weekday morning, or when the sun rises.

from synapse_p2p import Node, cron, every, solar

node = Node(name="worker")


@node.periodic(every(seconds=30))
async def refresh_cache() -> None:
    print("refreshing cache")


@node.periodic(cron("0 9 * * mon-fri", tz="Europe/London"))
async def weekday_digest() -> None:
    print("weekday digest")


@node.periodic(solar("sunrise", latitude=51.5, longitude=-0.1, tz="Europe/London"))
async def sunrise_job() -> None:
    print("the sun is up; time to work")


node.run()

A number is shorthand for seconds:

@node.periodic(30)  # equivalent to every(seconds=30)
async def refresh_cache() -> None:
    ...

Built-in schedules:

  • every(seconds=..., minutes=..., hours=..., days=...)
  • cron("*/15 * * * *", tz="UTC")
  • solar("sunrise", latitude=..., longitude=..., tz="UTC")

Solar events include sunrise, sunset, solar_noon, civil_twilight_begin, civil_twilight_end, nautical_twilight_begin, nautical_twilight_end, astronomical_twilight_begin, and astronomical_twilight_end.

Notes:

  • periodic handlers must be async def
  • the first run starts immediately when the node starts
  • later runs follow the schedule
  • exceptions are logged and do not stop future runs
  • long-running tasks can overlap if the next scheduled time arrives first

πŸͺͺ Artifacts and agent cards

Nodes can advertise small metadata documents that peers can fetch over RPC.

node.artifact(
    "agent-card",
    {
        "name": node.name,
        "description": "Reviews Python PRs and returns concise feedback.",
        "capabilities": ["code-review", "pytest"],
        "input_modes": ["text", "git-diff", "url"],
        "output_modes": ["text/markdown", "text/x-diff"],
    },
    mime_type="application/vnd.synapse.agent-card+json",
    description="Self-description for peers that understand agent cards.",
)

Fetch artifacts from a peer:

from synapse_p2p import Client

client = Client.from_peer(peer)

artifacts = await client.call("_synapse.artifacts")
agent_card = await client.call("_synapse.artifact.get", "agent-card")

Synapse does not interpret artifacts. It serves bytes/JSON plus a MIME type. Your application decides what the artifact means.


πŸ’“ Heartbeats

Nodes heartbeat known peers and mark stale peers offline.

from synapse_p2p import Node, Peer

node = Node(name="planner", heartbeat_interval=5, peer_timeout=20)


@node.on("peer.joined")
async def joined(peer: Peer) -> None:
    print(f"joined: {peer.name}")


@node.on("peer.offline")
async def offline(peer: Peer) -> None:
    print(f"offline: {peer.name}")

Offline means β€œnot seen within peer_timeout.”


πŸ–₯️ CLI

The CLI is sn.

sn --help

πŸ‘€ Watch a swarm

sn watch foo.electron.network
image

Useful options:

sn watch foo.electron.network --show-heartbeats
sn watch foo.electron.network --seed 192.168.1.25:9000 --no-mdns
sn watch foo.electron.network --team backend
sn watch foo.electron.network --no-capabilities

πŸ™‹ Ask from the terminal

sn ask broadcasts to the built-in synapse.ask endpoint. Nodes with a @node.ask handler can opt in with ACK and reply with their result.

sn ask foo.electron.network "Review this diff"
sn ask foo.electron.network "Review this diff" --context url=https://github.com/org/repo/pull/1
sn ask foo.electron.network "Who can help?" --forever

Example output:

ask: 019e4ab0-1d0d-709a-...
waiting for ACKs and replies... press Ctrl+C to stop
βœ“ reviewer acked
- reviewer: LGTM after fixing tests

πŸ“£ Broadcast from the terminal

sn broadcast foo.electron.network "Who can review this diff?"
sn broadcast foo.electron.network "Who can help?" --forever
sn broadcast foo.electron.network "Ship status?" --discover 2 --timeout 10

πŸ“‹ List local swarms

sn list-swarms
sn list-swarms --seconds 5

πŸ“š Examples

See examples/. Each example folder has its own README.

Example What it demonstrates
basic_rpc The smallest direct RPC node/client pair.
isolated_agents One node delegates to another through a known seed.
bootstrap_team_trio Bootstrap discovery, ask handlers, and fetching agent cards.
local_mdns_swarm Zero-config mDNS discovery plus ACKs/replies in one conversation.
pydantic_ai_team Pydantic AI agents behind Synapse nodes.
periodic_tasks Interval, cron, sunrise, and sunset jobs in a garden-caretaker node.
stock_trading_team Analyst/news/trader swarm with a dumb paper exchange API and market-hours periodic scans.

The agent examples use synapse.ask, opt-in ACKs, shared conversation replies, and advertised agent-card artifacts. The stock example shows a periodic job that checks market hours before asking the swarm, so agent/model work only happens when the paper market is open.


πŸ”Œ Protocol details

Built-in endpoints:

Endpoint Purpose
_synapse.ping health check
_synapse.info node identity and swarm metadata
_synapse.methods published RPC methods
_synapse.peers known peers
_synapse.join join through a seed
_synapse.heartbeat update peer liveness
_synapse.broadcast.reply reply to a broadcast nonce
_synapse.conversation.event gossip a shared conversation event
_synapse.artifacts list advertised artifacts
_synapse.artifact.get fetch one advertised artifact
_node.info name, role, description, capabilities
_node.capabilities machine-readable capabilities
_node.ask delegate directly to the node ask handler
synapse.ask swarm-facing ask endpoint used by sn ask

Wire format:

  1. 4-byte unsigned big-endian payload length
  2. MsgPack payload bytes

Request:

{
    "type": "request",
    "id": "request-id",
    "endpoint": "sum",
    "args": [1, 2],
    "kwargs": {},
}

Response:

{
    "type": "response",
    "id": "request-id",
    "ok": True,
    "result": 3,
    "error": None,
}

Useful low-level exports:

from synapse_p2p import (
    AdvertisedArtifact,
    Broadcast,
    BroadcastReply,
    Capability,
    Client,
    ConversationEvent,
    Node,
    Peer,
    RPCError,
    RPCRequest,
    RPCResponse,
    ServedArtifact,
)

Enable logs when debugging:

from loguru import logger

logger.enable("synapse_p2p")

🚫 What Synapse is not

Synapse does not implement planning, memory, consensus, auth policy, NAT traversal, hosted registries, or UX.

Those belong above Synapse.

Synapse is the substrate:

nodes + discovery + capabilities + conversations + artifacts + heartbeats + schedules + a tiny protocol


πŸ”Ž Keywords

swarm substrate, agent substrate, node discovery, local mDNS discovery, agent-to-agent networking, LLM agent RPC, multi-agent systems, capability discovery, language-agnostic RPC, Python RPC, asyncio RPC, peer-to-peer Python, P2P networking, MsgPack RPC, TCP RPC, distributed agents.

About

Synapse is a lightweight peer-to-peer substrate for agent infrastructure. Give each process a Node and Synapse gives that node a name, peers, capabilities, RPC endpoints, shared conversations, agent cards, heartbeats, and periodic tasks.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages