面向 AI Agent 的执行内核 | An agent-first execution kernel and action harness
ExecGo 的核心模块(github.com/iammm0/execgo)仅依赖 Go 标准库;可选能力(SQLite 持久化、Redis 读穿缓存)放在独立子模块 contrib/* 中,避免强绑第三方依赖。
ExecGo’s core module uses only the Go standard library. Optional features (SQLite persistence, Redis read-through cache) live in separate contrib/* submodules so consumers can opt in without pulling drivers they do not need.
定位补充 | Positioning note
ExecGo 更适合被理解为一个面向 AI Agent 的执行内核(execution kernel / action harness),而不是一个纯通用工作流引擎。 它的职责是把上层 agent 的决策,可靠、安全、可观测地映射到真实工具与运行环境。
| 特性 Feature | 描述 Description |
|---|---|
| Task DSL | 严格的任务契约:支持 id, type, params, depends_on, retry, timeout |
| DAG Scheduling | 基于依赖图的任务编排,Kahn 算法环检测 |
| Concurrent Execution | goroutine + channel 并发模型,信号量控制最大并发 |
| Pluggable Executors V2 | 内置 os / mcp / cli-skills 三大类;os 内含 shell/file/dns/tcp/sleep/noop/http 工具 |
| Retry & Timeout | 指数退避重试 + context 超时控制 |
| State Persistence | 内存存储 + JSON 文件定期持久化,崩溃恢复 |
| Observability | 结构化 JSON 日志 (slog) + traceID 追踪 + /metrics 端点 |
| Graceful Shutdown | 信号监听 → HTTP 关闭 → 调度器停止 → 状态持久化 |
┌─────────────────────────────────────────────────────┐
│ AI Agent (secbot) │
│ POST /tasks ←→ GET /tasks/{id} │
└─────────────────────┬───────────────────────────────┘
│ HTTP/JSON
┌─────────────────────▼───────────────────────────────┐
│ API Layer (net/http) │
│ POST /tasks │ GET /tasks/{id} │ DELETE │ /health │
├─────────────────────┬───────────────────────────────┤
│ Scheduler (DAG) │
│ readyQueue(chan) │ semaphore │ dependency counter │
├──────────┬──────────┬──────────┬────────────────────┤
│ OS │ MCP │ CLI+Skill│ ... (extensible) │
│ Category │ Category │ Category │ │
├──────────┴──────────┴──────────┴────────────────────┤
│ Store (store.Store) │
│ jsonfile (default) │ sqlite │ + Redis (contrib) │
├─────────────────────────────────────────────────────┤
│ Observability │
│ slog/JSON │ traceID │ /metrics │
└─────────────────────────────────────────────────────┘
# 构建 / Build
go build -o execgo ./cmd/execgo
# 运行 / Run (默认监听 :8080)
./execgo
# 自定义配置 / Custom config
./execgo -addr :9090 -max-concurrency 20 -data-dir ./mydata
# 环境变量 / Environment variables
EXECGO_ADDR=:9090 EXECGO_MAX_CONCURRENCY=20 ./execgo单个任务 | Single task:
curl -X POST http://localhost:8080/tasks \
-H "Content-Type: application/json" \
-d '{
"tasks": [
{
"id": "check-host",
"type": "shell",
"params": {"command": "hostname"},
"retry": 2,
"timeout": 5000
}
]
}'DAG 工作流 | DAG workflow:
curl -X POST http://localhost:8080/tasks \
-H "Content-Type: application/json" \
-d '{
"tasks": [
{
"id": "fetch-data",
"type": "http",
"params": {"url": "https://httpbin.org/json", "method": "GET"},
"timeout": 10000
},
{
"id": "save-result",
"type": "file",
"params": {"action": "write", "path": "output.txt", "content": "fetched!"},
"depends_on": ["fetch-data"]
},
{
"id": "verify",
"type": "file",
"params": {"action": "read", "path": "output.txt"},
"depends_on": ["save-result"]
}
]
}'# 列出所有任务 / List all tasks
curl http://localhost:8080/tasks
# 查询单个任务 / Get single task
curl http://localhost:8080/tasks/fetch-data
# 删除任务 / Delete task
curl -X DELETE http://localhost:8080/tasks/fetch-data
# 健康检查 / Health check
curl http://localhost:8080/health
# 指标 / Metrics
curl http://localhost:8080/metrics- CI(
.github/workflows/ci.yml):在mainpush 和 PR 上执行根模块与子模块全量测试。 - Release(
.github/workflows/release.yml):当推送v*tag 时自动执行测试、跨平台构建、生成校验和,并发布 GitHub Release 资产。 - 发布说明(Release Notes):默认读取
.github/release-notes/<tag>.md(例如.github/release-notes/v1.0.0.md);若不存在则回退到CHANGELOG.md。
示例(发布 v1.0.0):
git tag -a v1.0.0 -m "Release v1.0.0"
git push origin v1.0.0完整文档位于仓库下的 docs/ 目录,包含:
- 上层编排层(Orchestrator/Agent)如何把你的 DAG 映射成 ExecGo 的
TaskGraph - Docker Compose 与 Kubernetes 集群部署示范
- Go/Java/Python 的 HTTP 接入示例
- API / Task DSL / 执行器参数等参考手册
快速入口:
- 中文总入口:
docs/zh/README.md - 中文 HTTP API 入门:
docs/zh/integration/http-api-getting-started.md - English total entry:
docs/en/README.md
- 我是上层编排层,应该怎么把工作流映射成 ExecGo 的
TaskGraph? 见:映射:DAG -> TaskGraph depends_on到底表达什么?为什么下游不会自动拿到上游结果? 见:映射:DAG -> TaskGraph- 为什么任务失败后下游会变成
skipped? 见:失败语义:failed vs skipped - 提交后如何拿到最终结果?是同步还是异步? 见:轮询与幂等:稳定提交与读取结果
- 为什么会收到
400 Bad Request?TaskGraph.Validate()校验失败是什么意思? 见:Task DSL 参考(索引) - 如何设置
retry和timeout? 见:任务 DSL 参考(索引) result/error怎么解析、数据长什么样? 见:HTTP API 参考(索引)- 如何部署到自己的 Docker Compose? 见:Docker Compose 部署示范
- 如何部署到 Kubernetes(Deployment/Service/PVC)? 见:Kubernetes 部署示范
- Kubernetes 多副本能不能
replicas > 1? 见:Kubernetes 多副本注意事项 - 我想用 Go 调用 ExecGo,怎么做? 见:Go(HTTP)接入示例
- 我想用 Java 调用 ExecGo,怎么做? 见:Java(HTTP)接入示例
- 我想用 Python 调用 ExecGo,怎么做? 见:Python(HTTP)接入示例
- 我想用 Node.js + TypeScript 调用 ExecGo,怎么做? 见:Node.js + TypeScript(HTTP)接入示例
- shell 执行器是否安全?怎么避免任意命令执行风险? 见:执行器与参数参考(索引),进一步:Shell 执行器参数
- 任务状态存储在哪里?如何持久化/恢复? 见:数据持久化策略
- 怎么扩展执行器或实现自定义执行器? 见:执行器与参数参考(索引)
- 遇到幂等/重复提交问题怎么办? 见:轮询与幂等:稳定提交与读取结果
- See
docs/en/faqs.md
ExecGo 内置的 gRPC 服务使用 proto 路径:execgo.v1.ExecGo,默认监听端口 50051(由环境变量 EXECGO_GRPC_ADDR 控制;默认 :50051)。
SubmitTasks:提交任务 DAG(异步执行)GetTask:查询单个任务ListTasks:列出所有任务DeleteTask:删除任务Health:健康检查Metrics:指标
paramsJson 与 resultJson 采用“字符串形式的 JSON”(即把原本 params/result 的 JSON 内容用字符串包起来)。
timeoutMs 单位为毫秒。
(需要你本地安装 grpcurl;下面演示使用 plaintext)
- 提交任务
grpcurl -plaintext localhost:50051 execgo.v1.ExecGo/SubmitTasks -d '{
"tasks": [
{
"id": "check-host",
"type": "shell",
"paramsJson": "{\"command\":\"hostname\"}",
"retry": 2,
"timeoutMs": 5000,
"dependsOn": []
}
]
}'- 查询任务
grpcurl -plaintext localhost:50051 execgo.v1.ExecGo/GetTask -d '{
"id": "check-host"
}'项目采用分层测试结构:保留必要的包内测试(用于私有实现细节),并在根目录 tests/ 下集中维护跨包测试。
# 全量(含包内 + tests 分层)
go test ./...
# 单元测试(导出 API、边界与错误路径)
go test ./tests/unit/...
# 模块测试(跨组件但非完整端到端)
go test ./tests/module/...
# 集成测试(HTTP 提交 -> 调度执行 -> 状态查询)
go test ./tests/integration/...execgo/
├── cmd/execgo/main.go # 默认二进制:JSON 文件存储 / default binary: JSON file store
├── pkg/
│ ├── models/ # Task DSL / DTO(可被其他模块导入)/ importable DTOs
│ ├── store/ # Store 接口 / store interface
│ ├── store/jsonfile/ # 默认:内存 + state.json / default persistence
│ ├── executor/ # 执行器注册表与内置实现 / executor registry
│ ├── scheduler/ # DAG 调度器(依赖 store.Store)/ DAG scheduler
│ ├── httpserver/ # Engine + 中间件链 + 路由 / HTTP engine & middleware
│ ├── config/ # Config + Provider(类 Viper 可插拔配置源)/ pluggable config
│ └── observability/ # slog、trace、指标 / logging, trace, metrics
├── tests/
│ ├── unit/ # 单元测试(黑盒)/ unit tests
│ ├── module/ # 模块级测试 / module tests
│ ├── integration/ # 集成测试 / integration tests
│ └── testutil/ # 复用测试夹具 / shared test helpers
├── contrib/sqlite/ # 子模块:SQLite 版 store.Store / SQLite store submodule
├── contrib/rediscache/ # 子模块:Redis 读穿缓存装饰器 / Redis cache submodule
├── examples/fullserver/ # 子模块:组合 jsonfile|sqlite + 可选 Redis / full stack example
├── go.work # 仅本仓库开发用;下游项目不需要 / dev-only; consumers do not need this
├── data/
├── go.mod # 核心模块无第三方依赖 / core module: no third-party deps
└── README.md
{
"id": "unique-task-id",
"type": "os | mcp | cli-skills",
"tool_name": "shell | file | dns | tcp | sleep | noop | http | ...",
"params": { /* 工具相关参数 / tool-specific params */ },
"depends_on": ["other-task-id"],
"retry": 3,
"timeout": 5000,
"status": "pending | running | success | failed | skipped"
}V2 分类执行建议使用 type=os + tool_name=*。同时保留 legacy 输入兼容:type=shell/file/... 会自动映射到 os 分类。
HTTP Executor:
{"url": "https://example.com", "method": "GET", "headers": {"Authorization": "Bearer xxx"}, "body": "..."}Shell Executor(跨平台双模式 | cross-platform dual modes):
{"command": "echo", "args": ["hello", "world"], "dir": "/tmp"}或脚本模式 | Script mode:
{"runner": "auto", "script": "echo hello from script", "dir": "/tmp"}runner 支持 | Supported runners: auto | direct | powershell | cmd | sh。
auto默认:Windows 使用powershell -NoProfile -NonInteractive -Command,Linux/macOS 使用/bin/sh -c。script与command同时给出时,优先执行script。direct仅用于command + args直连执行(不是脚本 runner)。
允许的命令 | Allowed commands: echo, cat, ls, date, whoami, hostname, uname, pwd, curl, wget, ping, dig, grep, awk, sed, head, tail, wc, sort, uniq, find, dir, where, type
可选开放模式 | Optional open mode: 设置环境变量 EXECGO_SHELL_POLICY=open 后,shell 执行器将跳过 direct 模式的命令白名单校验;script 模式同样可执行任意脚本(仍受进程权限与 OS 约束)。
安全建议 | Security note: open 模式仅建议在可信的 Agent 编排层 + 已有鉴权/网络隔离场景启用;若 API 对公网暴露,不建议启用该模式。
File Executor:
{"action": "read | write | append | delete | stat", "path": "/tmp/data.txt", "content": "..."}Sleep Executor(编排延时,可被任务 timeout / context 取消 | delay for orchestration, cancellable):
{"duration_ms": 2000}单次上限 | Max duration_ms: 600000(10 分钟 | 10 minutes)。
DNS Executor:
{"name": "example.com", "record": "ip"}record 可选:ip(默认,A/AAAA 地址列表)、txt、cname。
TCP Executor(端口连通性 | TCP dial probe):
{"address": "example.com:443", "timeout_ms": 5000}timeout_ms 可选,默认 5000;上限 | Max timeout_ms: 60000。
Noop Executor(占位 / 测试,无外部 IO | placeholder, no I/O):
{"message": "optional"}| Flag | 环境变量 Env Var | 默认值 Default | 描述 Description |
|---|---|---|---|
-addr |
EXECGO_ADDR |
:8080 |
HTTP 监听地址 / listen address |
-data-dir |
EXECGO_DATA_DIR |
data |
数据目录 / data directory |
-max-concurrency |
EXECGO_MAX_CONCURRENCY |
10 |
最大并发数 / max concurrency |
-shutdown-timeout |
EXECGO_SHUTDOWN_TIMEOUT |
15 |
关闭超时(秒) / shutdown timeout (seconds) |
优先级 / Priority: flag > env > default
import (
"context"
"encoding/json"
"github.com/iammm0/execgo/pkg/executor"
"github.com/iammm0/execgo/pkg/models"
)
type MyExecutor struct{}
func (e *MyExecutor) Name() string { return "my_type" }
func (e *MyExecutor) Category() string { return "custom" }
func (e *MyExecutor) Execute(ctx context.Context, task *models.Task) (*executor.Result, error) {
out, _ := json.Marshal(map[string]any{"result": "ok"})
return &executor.Result{Status: "success", Output: out}, nil
}
func (e *MyExecutor) ListTools(ctx context.Context) ([]executor.Tool, error) { return nil, nil }
func (e *MyExecutor) HealthCheck() error { return nil }
func (e *MyExecutor) Shutdown(ctx context.Context) error { return nil }
func init() {
executor.Register(&MyExecutor{})
}GET /mcp/tools: list MCP tools.POST /mcp/call: call MCP tool, returnshandle_id.GET /mcp/tasks/{id}: poll MCP task handle status/result.
- 存储 | Storage:实现或使用
pkg/store.Store。默认使用pkg/store/jsonfile。SQLite 与 Redis 见下文子模块。 - HTTP(类 Gin 的 Use 链) | HTTP (Gin-style
Use):httpserver.NewEngine(store, scheduler, metrics, logger),按需engine.Use(mw),engine.Handler();挂载到已有ServeMux时用httpserver.Mount(parentMux, "/execgo", engine)。 - 配置(类 Viper 的 Provider) | Config (Viper-style
Provider):config.Load(config.NewFlagEnvProvider())或实现config.Provider(GetString/GetInt),将 Viper / 自有配置源适配到该接口即可。
不需要配置 go.work,也不需要为集成 ExecGo 单独写特殊 CI 工作流。在业务项目的 go.mod 里按需 require 已发布版本(或伪版本),照常 go build / go test 即可。
不需要 | No need for: go.work, custom pipelines just for ExecGo.
需要 | Typical go.mod:
require (
github.com/iammm0/execgo v1.x.y // 核心:scheduler、httpserver、jsonfile 等
github.com/iammm0/execgo/contrib/sqlite v1.x.y // 可选
github.com/iammm0/execgo/contrib/rediscache v1.x.y // 可选
)仅在同时改 ExecGo 源码与你的应用时,才在业务 go.mod 里用 replace 指向本地路径;这与 go.work 无关,也不是常规集成方式。
go.work 仅用于 clone 本仓库后同时开发核心、contrib、examples 多模块;维护者本地或本仓库 CI 可用,不是使用方的义务。
模块路径:github.com/iammm0/execgo/contrib/sqlite(子目录独立 go.mod)。对外集成:在业务 go.mod 中 require 该模块即可;clone 本仓库做开发时可在仓库根使用 go.work 方便联编。
import "github.com/iammm0/execgo/contrib/sqlite"
st, err := sqlite.Open("/path/to/execgo.db")
defer st.Close()模块路径:github.com/iammm0/execgo/contrib/rediscache。包装任意 store.Store;Get 走缓存,Put / UpdateStatus / Delete 会失效对应键;GetAll 始终直读底层存储。
import "github.com/iammm0/execgo/contrib/rediscache"
st = rediscache.Wrap(st, redisClient, rediscache.Options{TTL: 5 * time.Minute})在 examples/fullserver 子模块中:EXECGO_STORE=sqlite 使用 SQLite(默认路径为 data-dir/execgo.db,也可用 EXECGO_SQLITE_PATH);设置 EXECGO_REDIS_URL 时在存储之上叠加 Redis 缓存。构建:
cd examples/fullserver && go build -o fullserver .- 核心无第三方依赖 | Core module uses only the standard library — optional drivers live in
contrib/* - 分层架构 | Layered architecture — API → Scheduler → Executor → State
- 并发安全 | Concurrency safe —
sync.RWMutex+ channel 保护所有共享状态 - 可扩展 | Extensible — 注册表模式,添加新执行器无需修改核心代码
- 可观测 | Observable — 结构化日志 + traceID + 指标端点
- 韧性 | Resilient — 重试、超时、崩溃恢复、优雅关闭
MIT