diff --git a/CHANGELOG.md b/CHANGELOG.md index 72fb69a82..0e0d6f073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - Use a JSON-RPC `ping` (instead of `initialize`) for the OAuth auth-discovery probe, so the probe POST is never counted as a real handshake by servers or tests that track requests by method name. +- Add AWS Bedrock provider using the native `Converse`/`ConverseStream` APIs with bearer-token auth, supporting models not available on Bedrock's OpenAI-compatible endpoint (e.g. Claude inference profiles). #254 ## 0.134.1 diff --git a/docs/config.json b/docs/config.json index f05429571..c2077c677 100644 --- a/docs/config.json +++ b/docs/config.json @@ -508,7 +508,8 @@ "enum": [ "openai-responses", "openai-chat", - "anthropic" + "anthropic", + "bedrock" ] }, "fetchModels": { diff --git a/docs/config/models.md b/docs/config/models.md index 08c9ee68a..1fbe53c35 100644 --- a/docs/config/models.md +++ b/docs/config/models.md @@ -168,6 +168,34 @@ ECA support lots of providers via its supported APIs (openai-chat, openai-respon } ``` +=== "AWS Bedrock" + + Uses the native Bedrock `Converse`/`ConverseStream` APIs, so models that are not exposed on Bedrock's OpenAI-compatible endpoint (e.g. Claude inference profiles) are reachable. + + Authentication uses a bearer token (`AWS_BEARER_TOKEN_BEDROCK`); the region is part of the runtime URL. `models` are Bedrock model ids or inference-profile ids. + + 1. Login via the chat command `/login`. + 2. Type 'bedrock' and send it. + 3. Specify your API key, runtime URL and at least one model. + 4. Done, it should be saved to your global config. + + or manually via config: + + ```javascript title="~/.config/eca/config.json" + { + "providers": { + "bedrock": { + "api": "bedrock", + "url": "https://bedrock-runtime.us-east-1.amazonaws.com", + "key": "${env:AWS_BEARER_TOKEN_BEDROCK}", + "models": { + "us.anthropic.claude-sonnet-4-5-20250929-v1:0": {} + } + } + } + } + ``` + === "Mistral" The property `max_completion_token` is set to `null`, because Mistral does not support it. diff --git a/src/eca/features/providers.clj b/src/eca/features/providers.clj index 5852c2572..dd71c365d 100644 --- a/src/eca/features/providers.clj +++ b/src/eca/features/providers.clj @@ -17,6 +17,7 @@ "github-copilot" "GitHub Copilot" "google" "Google" "azure" "Azure" + "bedrock" "AWS Bedrock" "deepseek" "DeepSeek" "litellm" "LiteLLM" "lmstudio" "LM Studio" @@ -35,6 +36,7 @@ "github-copilot" [{:key "device" :label "GitHub device flow"}] "google" [{:key "api-key" :label "Enter API key"}] "azure" [{:key "api-key" :label "Enter API key, URL & models"}] + "bedrock" [{:key "api-key" :label "Enter API key, URL & models"}] "deepseek" [{:key "api-key" :label "Enter API key & models"}] "litellm" [{:key "api-key" :label "Enter API key, URL & models"}] "lmstudio" [{:key "api-key" :label "Enter models"}] @@ -61,7 +63,10 @@ {:key "models" :label "Model names (comma-separated)" :type "text"}] "azure" [{:key "api-key" :label "API Key" :type "secret"} {:key "url" :label "API URL" :type "text"} - {:key "models" :label "Model names (comma-separated)" :type "text"}]}) + {:key "models" :label "Model names (comma-separated)" :type "text"}] + "bedrock" [{:key "api-key" :label "API Key (AWS_BEARER_TOKEN_BEDROCK)" :type "secret"} + {:key "url" :label "Runtime URL (e.g. https://bedrock-runtime.us-east-1.amazonaws.com)" :type "text"} + {:key "models" :label "Model/inference-profile ids (comma-separated)" :type "text"}]}) (def ^:private provider-configs {"deepseek" {:api "openai-chat" :url "https://api.deepseek.com"} @@ -73,7 +78,8 @@ "moonshot" {:api "openai-chat" :url "https://api.kimi.com/coding/v1"} "openrouter" {:api "openai-chat" :url "https://openrouter.ai/api/v1"} "z-ai" {:api "anthropic" :url "https://api.z.ai/api/anthropic"} - "azure" {:api "openai-chat"}}) + "azure" {:api "openai-chat"} + "bedrock" {:api "bedrock"}}) ;; --- Auth resolution --- diff --git a/src/eca/llm_api.clj b/src/eca/llm_api.clj index 9e086598e..cbfba9c67 100644 --- a/src/eca/llm_api.clj +++ b/src/eca/llm_api.clj @@ -6,6 +6,7 @@ [eca.features.prompt :as f.prompt] [eca.llm-providers.anthropic :as llm-providers.anthropic] [eca.llm-providers.azure] + [eca.llm-providers.bedrock :as llm-providers.bedrock] [eca.llm-providers.copilot] [eca.llm-providers.deepseek] [eca.llm-providers.errors :as llm-providers.errors] @@ -143,6 +144,8 @@ :handler llm-providers.anthropic/chat!} "openai-chat" {:api :openai-chat :handler llm-providers.openai-chat/chat-completion!} + "bedrock" {:api :bedrock + :handler llm-providers.bedrock/chat!} nil))) (def ^:private reasoning-keys-by-api diff --git a/src/eca/llm_providers/bedrock.clj b/src/eca/llm_providers/bedrock.clj new file mode 100644 index 000000000..0defaf06f --- /dev/null +++ b/src/eca/llm_providers/bedrock.clj @@ -0,0 +1,491 @@ +(ns eca.llm-providers.bedrock + "AWS Bedrock provider using the native Converse / ConverseStream APIs. + + Bedrock exposes an OpenAI-compatible surface too, but it only serves the + `openai.*` models — Claude (and other) inference profiles are reachable + only through Converse, hence this dedicated handler. + + Auth is the bearer-token flow (`Authorization: Bearer `), which + sidesteps SigV4 and is the simplest path for a local desktop client." + (:require + [cheshire.core :as json] + [clojure.string :as string] + [eca.client-http :as client] + [eca.llm-util :as llm-util] + [eca.logger :as logger] + [eca.shared :as shared :refer [assoc-some join-api-url]] + [hato.client :as http]) + (:import + [java.io InputStream] + [java.net URLEncoder])) + +(set! *warn-on-reflection* true) + +(def ^:private logger-tag "[BEDROCK]") + +(def ^:private default-max-output-tokens + "Floor for `maxTokens` used only when the caller omits `:max-output-tokens`. + It is not a per-model limit — model-aware caps come from the caller." + 32000) + +(def ^:private default-reasoning-budget-tokens 2048) + +;; --- Message normalization (ECA <-> Bedrock Converse blocks) --- + +(defn ^:private image-format + "Maps an `image/*` media type to the format string Bedrock expects. + Warns and falls back to `png` for unrecognized types." + [media-type] + (case media-type + "image/png" "png" + "image/jpeg" "jpeg" + "image/gif" "gif" + "image/webp" "webp" + (do (logger/warn logger-tag (format "Unknown image media type '%s', defaulting to png" media-type)) + "png"))) + +(defn ^:private ->image-block [{:keys [media-type base64]}] + {:image {:format (image-format media-type) + :source {:bytes base64}}}) + +(defn ^:private ->tool-config + "Builds the Converse `toolConfig` value from ECA tools, or nil when there + are none." + [tools] + (when (seq tools) + {:tools (mapv (fn [tool] + {:toolSpec {:name (:full-name tool) + :description (:description tool) + :inputSchema {:json (:parameters tool)}}}) + tools)})) + +(defn ^:private normalize-messages + "Converts ECA's internal message format into Converse `{:role :content}` + maps where content is always a vector of Converse content blocks." + [messages supports-image?] + (keep + (fn [{:keys [role content]}] + (case role + "tool_call" + {:role "assistant" + :content [{:toolUse {:toolUseId (:id content) + :name (:full-name content) + :input (or (:arguments content) {})}}]} + + "tool_call_output" + (let [contents (-> content :output :contents) + image-contents (when supports-image? + (seq (filter #(= :image (:type %)) contents))) + text (llm-util/stringfy-tool-result content)] + {:role "user" + :content [{:toolResult + {:toolUseId (:id content) + :content (into [{:text (if (string/blank? text) "(no content)" text)}] + (map ->image-block) + image-contents) + :status (if (:error content) "error" "success")}}]}) + + "reason" + {:role "assistant" + :content [(if (:redacted? content) + {:reasoningContent {:redactedContent (:data content)}} + {:reasoningContent {:reasoningText (assoc-some {:text (:text content)} + :signature (:external-id content))}})]} + + ;; "user" / "assistant" + {:role role + :content (if (string? content) + [{:text (string/trim content)}] + (vec (keep (fn [block] + (case (some-> (:type block) name) + "text" {:text (:text block)} + "image" (when supports-image? (->image-block block)) + nil)) + content)))})) + messages)) + +(defn ^:private group-parallel-tool-calls + "Reorders consecutive tool_call/tool_call_output messages so all tool_calls + precede all tool_call_outputs, keeping parallel tool turns grouped." + [messages] + (let [tool-msg? #(contains? #{"tool_call" "tool_call_output"} (:role %))] + (->> messages + (partition-by tool-msg?) + (mapcat (fn [group] + (if-not (tool-msg? (first group)) + group + (let [{calls "tool_call" outputs "tool_call_output"} (group-by :role group) + call-id->pos (into {} (map-indexed (fn [i m] [(get-in m [:content :id]) i]) calls))] + (concat calls (sort-by #(call-id->pos (get-in % [:content :id])) outputs))))))))) + +(defn ^:private merge-adjacent-by-role + "Merges consecutive messages of `role` into one, concatenating their content + vectors. Converse rejects consecutive same-role messages." + [role messages] + (reduce + (fn [acc msg] + (let [prev (peek acc)] + (if (and (= role (:role prev)) (= role (:role msg))) + (conj (pop acc) {:role role :content (into (:content prev) (:content msg))}) + (conj acc msg)))) + [] + messages)) + +(defn ^:private normalize-conversation [past-messages user-messages supports-image?] + (->> (concat past-messages user-messages) + group-parallel-tool-calls + (#(normalize-messages % supports-image?)) + (merge-adjacent-by-role "assistant") + (merge-adjacent-by-role "user") + vec)) + +;; --- Request body --- + +(def ^:private allowed-extra-payload-keys + "Top-level members the Converse / ConverseStream APIs accept (minus + `:messages` and `:modelId`, which the handler owns). extraPayload is + filtered to these so provider/model variant payloads aimed at other APIs + (e.g. Anthropic's `:thinking`) don't reach Bedrock and trigger a 400." + #{:additionalModelRequestFields :additionalModelResponseFieldPaths + :guardrailConfig :inferenceConfig :outputConfig :performanceConfig + :promptVariables :requestMetadata :serviceTier :system :toolConfig}) + +(defn ^:private build-body + [{:keys [messages instructions max-output-tokens tools reason? extra-payload]}] + (shared/deep-merge + (assoc-some + {:messages messages + :inferenceConfig {:maxTokens (or max-output-tokens default-max-output-tokens)}} + :system (when-not (string/blank? instructions) [{:text instructions}]) + :toolConfig (->tool-config tools) + :additionalModelRequestFields (when reason? + {:reasoning_config {:type "enabled" + :budget_tokens default-reasoning-budget-tokens}})) + (select-keys extra-payload allowed-extra-payload-keys))) + +;; --- AWS event-stream (vnd.amazon.eventstream) binary decoder --- + +(defn ^:private u32 + "Reads a big-endian unsigned 32-bit integer from `b` at `off`." + ^long [^bytes b ^long off] + (bit-or (bit-shift-left (bit-and (long (aget b off)) 0xff) 24) + (bit-shift-left (bit-and (long (aget b (+ off 1))) 0xff) 16) + (bit-shift-left (bit-and (long (aget b (+ off 2))) 0xff) 8) + (bit-and (long (aget b (+ off 3))) 0xff))) + +(defn ^:private read-fully + "Reads exactly `n` bytes from `is`. Returns the byte-array, nil on a clean + EOF before any byte was read, or throws on a truncated frame." + ^bytes [^InputStream is ^long n] + (let [buf (byte-array n) + first-read (.read is buf 0 n)] + (cond + (neg? first-read) nil + :else (loop [off (long first-read)] + (if (< off n) + (let [r (.read is buf off (- n off))] + (if (neg? r) + (throw (ex-info "Unexpected EOF in Bedrock event-stream frame" {})) + (recur (+ off (long r))))) + buf))))) + +(defn ^:private parse-headers + "Parses event-stream frame headers. Only the string header type (7) is + handled — every header Bedrock emits for Converse stream events is a + string. Parsing stops at the first non-string header, which assumes + `:event-type` is never preceded by one (true for Bedrock today)." + [^bytes b] + (loop [pos 0 + acc {}] + (if (>= pos (alength b)) + acc + (let [name-len (bit-and (long (aget b pos)) 0xff) + header-name (String. b (int (inc pos)) (int name-len) "UTF-8") + type-pos (+ pos 1 name-len) + header-type (bit-and (long (aget b type-pos)) 0xff)] + (if (= 7 header-type) + (let [value-len (bit-or (bit-shift-left (bit-and (long (aget b (+ type-pos 1))) 0xff) 8) + (bit-and (long (aget b (+ type-pos 2))) 0xff)) + value-start (+ type-pos 3)] + (recur (+ value-start value-len) + (assoc acc header-name (String. b (int value-start) (int value-len) "UTF-8")))) + acc))))) + +(defn ^:private event-stream-seq + "Lazily decodes an AWS event-stream into `[event-type data]` pairs. + `event-type` falls back to the exception/message type header for error + frames. Prelude and message CRCs are not validated." + [^InputStream is] + (lazy-seq + (when-let [prelude (read-fully is 12)] + (let [total-len (u32 prelude 0) + headers-len (u32 prelude 4) + payload-len (- total-len headers-len 16)] + (when (neg? payload-len) + (throw (ex-info "Malformed Bedrock event-stream frame: negative payload length" + {:total-len total-len :headers-len headers-len}))) + (let [headers (parse-headers (read-fully is headers-len)) + payload (read-fully is payload-len) + _crc (read-fully is 4) + event-type (or (get headers ":event-type") + (get headers ":exception-type") + (get headers ":message-type")) + data (json/parse-string (String. ^bytes payload "UTF-8") true)] + (cons [event-type data] (event-stream-seq is))))))) + +;; --- Non-streaming response parsing --- + +(defn ^:private parse-usage [usage] + {:input-tokens (or (:inputTokens usage) 0) + :output-tokens (or (:outputTokens usage) 0) + :input-cache-read-tokens (or (:cacheReadInputTokens usage) 0) + :input-cache-creation-tokens (or (:cacheWriteInputTokens usage) 0)}) + +(def ^:private terminal-stop-reasons #{"end_turn" "tool_use" "stop_sequence"}) + +(defn ^:private response->result [body on-tools-called-wrapper] + (let [content (-> body :output :message :content) + stop-reason (:stopReason body) + reason-block (some :reasoningContent content) + tools-to-call (->> content + (keep :toolUse) + (mapv (fn [{:keys [toolUseId name input]}] + {:id toolUseId + :full-name name + :arguments (or input {})})))] + ;; The non-streaming sync path has no :limit-reached channel, so a + ;; truncated/filtered response would otherwise look like a clean finish. + (when (and stop-reason (not (contains? terminal-stop-reasons stop-reason))) + (logger/warn logger-tag (format "Response ended with non-terminal stop reason '%s'" stop-reason))) + (assoc-some + {:output-text (string/join (keep :text content)) + :usage (parse-usage (:usage body))} + :reason-text (get-in reason-block [:reasoningText :text]) + :reason-id (when reason-block (str (random-uuid))) + :tools-to-call (not-empty tools-to-call) + :call-tools-fn (when (seq tools-to-call) + (fn [on-tools-called] + (on-tools-called-wrapper tools-to-call on-tools-called)))))) + +;; --- HTTP request (shared by stream / non-stream) --- + +(defn ^:private base-request! + [{:keys [rid body model api-url api-key extra-headers http-client cancelled? + content-block* on-error on-stream on-tools-called-wrapper]}] + (let [path (str "/model/" (URLEncoder/encode ^String model "UTF-8") + (if on-stream "/converse-stream" "/converse")) + url (join-api-url api-url path) + headers (client/merge-llm-headers + (merge {"Authorization" (str "Bearer " api-key) + "Content-Type" "application/json"} + extra-headers)) + response* (atom nil) + on-error (if on-stream + on-error + (fn [error-data] + (llm-util/log-response logger-tag rid "response-error" error-data) + (reset! response* {:error error-data})))] + (llm-util/log-request logger-tag rid url body headers) + (try + (let [{:keys [status body]} (http/post url + {:headers headers + :body (json/generate-string body) + :throw-exceptions? false + :http-client (client/merge-with-global-http-client http-client) + :as (if on-stream :stream :json)})] + (if (not= 200 status) + (let [body-str (if on-stream (slurp body) (json/generate-string body))] + (logger/warn logger-tag (format "[%s] Unexpected response status: %s body: %s" rid status body-str)) + (on-error {:message (format "Bedrock response status: %s body: %s" status body-str) + :status status + :body body-str})) + (if on-stream + (let [{:keys [touch-fn set-reading-fn stop-fn reason*]} + (llm-util/start-stream-watchdog! body cancelled? {}) + completed?* (atom false)] + (try + (with-open [^InputStream is body] + (doseq [[event data] (event-stream-seq is)] + (set-reading-fn false) + (touch-fn) + (llm-util/log-response logger-tag rid event data) + ;; on-stream reports whether the event terminates the + ;; turn — messageStop, but also a modeled error frame, + ;; which can end the stream without a messageStop. + (when (on-stream event data content-block*) + (reset! completed?* true)) + (set-reading-fn true))) + (when-not (or @completed?* (cancelled?)) + (logger/warn logger-tag "Stream ended without messageStop, retrying") + (on-error {:message "Stream ended without completion signal" + :error/type :premature-stop})) + (catch java.io.IOException e + (case @reason* + :cancelled (throw (ex-info "Stream cancelled" {:silent? true})) + :idle-timeout (on-error {:message "Stream idle timeout: no data received" :exception e}) + (on-error {:exception e :message (llm-util/connection-error-message e)}))) + (finally + (stop-fn)))) + (do + (llm-util/log-response logger-tag rid "response" body) + (reset! response* (response->result body on-tools-called-wrapper)))))) + (catch Exception e + (on-error {:exception e :message (llm-util/connection-error-message e)}))) + @response*)) + +(defn ^:private reissue-after-tools! + "Rebuilds the request body from the post-tool-call history and re-issues it + via `base-request!`. Shared by the streaming and non-streaming tool loops. + + The tool loop recurses per turn (mirroring `anthropic.clj`); termination + relies on the model and the chat layer's subagent step cap rather than a + provider-side ceiling." + [{:keys [base-opts body supports-image?]} + {:keys [new-messages tools fresh-api-key]} + extra-opts] + (base-request! + (merge base-opts + {:rid (llm-util/gen-rid) + :body (assoc-some (assoc body :messages (normalize-conversation new-messages nil supports-image?)) + :toolConfig (->tool-config tools)) + :api-key (or fresh-api-key (:api-key base-opts))} + extra-opts))) + +;; --- Streaming event handler --- + +(def ^:private known-stream-events + #{"messageStart" "contentBlockStart" "contentBlockDelta" "contentBlockStop" + "messageStop" "metadata"}) + +(defn ^:private handle-stream + "Drives ECA callbacks from a single ConverseStream event. `ctx` carries the + callbacks plus what `reissue-after-tools!` needs to continue the tool loop. + + Returns true when the event terminates the turn: `messageStop`, or an + unknown event — modeled error frames (validationException, + throttlingException, modelStreamErrorException) end the stream without a + messageStop, so they must count as completion to avoid a spurious + premature-stop error after the real one." + [event data content-block* + {:keys [on-message-received on-error on-reason on-prepare-tool-call + on-tools-called on-usage-updated] + :as ctx}] + (case event + "messageStart" nil + + "contentBlockStart" + (when-let [tool-use (-> data :start :toolUse)] + (swap! content-block* assoc (:contentBlockIndex data) + {:type :tool-use + :toolUseId (:toolUseId tool-use) + :name (:name tool-use) + :input-json ""}) + (on-prepare-tool-call {:full-name (:name tool-use) + :id (:toolUseId tool-use) + :arguments-text ""})) + + "contentBlockDelta" + (let [idx (:contentBlockIndex data) + delta (:delta data)] + (cond + (:text delta) + (on-message-received {:type :text :text (:text delta)}) + + (:toolUse delta) + (let [chunk (-> delta :toolUse :input)] + (swap! content-block* update-in [idx :input-json] str chunk) + (let [block (get @content-block* idx)] + (on-prepare-tool-call {:full-name (:name block) + :id (:toolUseId block) + :arguments-text (or chunk "")}))) + + (:reasoningContent delta) + (let [reasoning (:reasoningContent delta) + reason-id (or (get-in @content-block* [idx :reason-id]) + (let [new-id (str (random-uuid))] + (swap! content-block* assoc idx {:type :reasoning :reason-id new-id}) + (on-reason {:status :started :id new-id}) + new-id))] + (cond + (:text reasoning) + (on-reason {:status :thinking :id reason-id :text (:text reasoning)}) + + (:signature reasoning) + (do (on-reason {:status :finished :id reason-id :external-id (:signature reasoning)}) + (swap! content-block* assoc-in [idx :signature-done?] true)))))) + + "contentBlockStop" + (let [block (get @content-block* (:contentBlockIndex data))] + (when (and (= :reasoning (:type block)) + (not (:signature-done? block))) + (on-reason {:status :finished :id (:reason-id block)}))) + + "messageStop" + (case (:stopReason data) + "tool_use" + (let [tool-calls (->> (vals @content-block*) + (filter #(= :tool-use (:type %))) + (mapv (fn [{:keys [toolUseId name input-json]}] + {:id toolUseId + :full-name name + :arguments (json/parse-string input-json)})))] + (when-let [tools-result (on-tools-called tool-calls)] + (reissue-after-tools! ctx tools-result + {:content-block* (atom {}) + :on-error on-error + :on-stream (fn [e d cb] (handle-stream e d cb ctx))}))) + + "max_tokens" + (on-message-received {:type :limit-reached}) + + ;; "end_turn" / "stop_sequence" / "content_filtered" / "guardrail_intervened" + (on-message-received {:type :finish :finish-reason (:stopReason data)})) + + "metadata" + (when-let [usage (:usage data)] + (on-usage-updated (parse-usage usage))) + + ;; exception / error frames + (on-error {:message (format "Bedrock stream error (%s): %s" event data)})) + (or (= "messageStop" event) + (not (contains? known-stream-events event)))) + +;; --- Entry point --- + +(defn chat! + [{:keys [model user-messages instructions max-output-tokens api-url api-key + reason? past-messages tools extra-payload extra-headers supports-image? + http-client cancelled?]} + {:keys [on-error] :as callbacks}] + (let [stream? (boolean callbacks) + cancelled? (or cancelled? (constantly false)) + body (build-body {:messages (normalize-conversation past-messages user-messages supports-image?) + :instructions instructions + :max-output-tokens max-output-tokens + :tools tools + :reason? reason? + :extra-payload extra-payload}) + base-opts {:model model + :api-url api-url + :api-key api-key + :extra-headers extra-headers + :http-client http-client + :cancelled? cancelled?} + reissue-ctx {:base-opts base-opts :body body :supports-image? supports-image?} + ;; Non-streaming tool loop: re-issue with the updated history, which + ;; yields another result map the sync caller drives. + on-tools-called-wrapper + (fn on-tools-called-wrapper [tools-to-call on-tools-called] + (when-let [tools-result (on-tools-called tools-to-call)] + (reissue-after-tools! reissue-ctx tools-result + {:on-tools-called-wrapper on-tools-called-wrapper}))) + stream-ctx (merge callbacks reissue-ctx)] + (base-request! (merge base-opts + {:rid (llm-util/gen-rid) + :body body + :content-block* (atom {}) + :on-error (or on-error identity) + :on-tools-called-wrapper on-tools-called-wrapper + :on-stream (when stream? + (fn [event data content-block*] + (handle-stream event data content-block* stream-ctx)))})))) diff --git a/test/eca/llm_providers/bedrock_test.clj b/test/eca/llm_providers/bedrock_test.clj new file mode 100644 index 000000000..122a0410e --- /dev/null +++ b/test/eca/llm_providers/bedrock_test.clj @@ -0,0 +1,340 @@ +(ns eca.llm-providers.bedrock-test + (:require + [cheshire.core :as json] + [clojure.test :refer [deftest is testing]] + [eca.client-test-helpers :refer [with-client-proxied]] + [eca.llm-providers.bedrock :as llm-providers.bedrock] + [hato.client :as http] + [matcher-combinators.test :refer [match?]]) + (:import + [java.io ByteArrayInputStream ByteArrayOutputStream])) + +;; --- event-stream binary frame helpers --- + +(defn ^:private write-u32! [^ByteArrayOutputStream baos n] + (doseq [shift [24 16 8 0]] + (.write baos (bit-and (bit-shift-right (long n) shift) 0xff)))) + +(defn ^:private string-header [name value] + (let [name-bytes (.getBytes ^String name "UTF-8") + value-bytes (.getBytes ^String value "UTF-8") + baos (ByteArrayOutputStream.)] + (.write baos (alength name-bytes)) + (.write baos name-bytes) + (.write baos 7) ;; string header type + (.write baos (bit-and (bit-shift-right (alength value-bytes) 8) 0xff)) + (.write baos (bit-and (alength value-bytes) 0xff)) + (.write baos value-bytes) + (.toByteArray baos))) + +(defn ^:private make-frame + "Builds a single AWS event-stream frame. CRCs are written as zero since the + decoder does not validate them." + [event-type payload-map] + (let [headers (string-header ":event-type" event-type) + payload (.getBytes ^String (json/generate-string payload-map) "UTF-8") + baos (ByteArrayOutputStream.)] + (write-u32! baos (+ 12 (alength headers) (alength payload) 4)) + (write-u32! baos (alength headers)) + (write-u32! baos 0) ;; prelude crc + (.write baos headers) + (.write baos payload) + (write-u32! baos 0) ;; message crc + (.toByteArray baos))) + +(defn ^:private frames-stream [& frames] + (ByteArrayInputStream. (byte-array (mapcat seq frames)))) + +;; --- decoder --- + +(deftest event-stream-seq-test + (testing "decodes consecutive event-stream frames into [event-type data] pairs" + (is (= [["messageStart" {:role "assistant"}] + ["contentBlockDelta" {:contentBlockIndex 0 :delta {:text "hi"}}] + ["messageStop" {:stopReason "end_turn"}]] + (vec (#'llm-providers.bedrock/event-stream-seq + (frames-stream (make-frame "messageStart" {:role "assistant"}) + (make-frame "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "hi"}}) + (make-frame "messageStop" {:stopReason "end_turn"}))))))) + + (testing "returns empty seq on a clean EOF" + (is (= [] (vec (#'llm-providers.bedrock/event-stream-seq (ByteArrayInputStream. (byte-array 0))))))) + + (testing "throws on a truncated frame" + (is (thrown-with-msg? clojure.lang.ExceptionInfo #"Unexpected EOF" + (vec (#'llm-providers.bedrock/event-stream-seq + (ByteArrayInputStream. (byte-array 6))))))) + + (testing "throws on a frame with negative payload length" + (let [baos (ByteArrayOutputStream.)] + (write-u32! baos 20) ;; total-len < headers-len + 16 + (write-u32! baos 8) ;; headers-len + (write-u32! baos 0) + (.write baos (byte-array 8)) + (is (thrown-with-msg? clojure.lang.ExceptionInfo #"negative payload length" + (vec (#'llm-providers.bedrock/event-stream-seq + (ByteArrayInputStream. (.toByteArray baos))))))))) + +;; --- normalization / body --- + +(deftest normalize-conversation-test + (testing "string content becomes a single text block" + (is (match? [{:role "user" :content [{:text "hello"}]}] + (#'llm-providers.bedrock/normalize-conversation + [] [{:role "user" :content "hello"}] false)))) + + (testing "tool call + output become Converse toolUse / toolResult blocks" + (is (match? [{:role "assistant" + :content [{:toolUse {:toolUseId "t1" :name "get_weather" :input {:city "Paris"}}}]} + {:role "user" + :content [{:toolResult {:toolUseId "t1" + :status "success" + :content [{:text string?}]}}]}] + (#'llm-providers.bedrock/normalize-conversation + [{:role "tool_call" :content {:id "t1" :full-name "get_weather" :arguments {:city "Paris"}}} + {:role "tool_call_output" :content {:id "t1" :output {:contents [{:type :text :text "sunny"}]}}}] + nil false)))) + + (testing "consecutive same-role messages are merged" + (is (match? [{:role "user" :content [{:text "a"} {:text "b"}]}] + (#'llm-providers.bedrock/normalize-conversation + [{:role "user" :content "a"}] [{:role "user" :content "b"}] false))))) + +(deftest build-body-test + (testing "filters extra-payload to Converse-valid keys and deep-merges inferenceConfig" + (let [body (#'llm-providers.bedrock/build-body + {:messages [] + :instructions "sys" + :max-output-tokens 500 + :extra-payload {:thinking {:type "adaptive"} + :inferenceConfig {:temperature 0.5}}})] + (is (nil? (:thinking body))) + (is (match? {:inferenceConfig {:maxTokens 500 :temperature 0.5} + :system [{:text "sys"}]} + body)))) + + (testing "reasoning adds reasoning_config to additionalModelRequestFields" + (is (match? {:additionalModelRequestFields {:reasoning_config {:type "enabled"}}} + (#'llm-providers.bedrock/build-body {:messages [] :reason? true}))))) + +;; --- chat! non-streaming --- + +(deftest chat!-non-streaming-test + (testing "constructs a Converse request and parses the response" + (let [req* (atom nil)] + (with-client-proxied {} + (fn handler [req] + (reset! req* req) + {:status 200 + :body {:output {:message {:role "assistant" :content [{:text "Hello!"}]}} + :stopReason "end_turn" + :usage {:inputTokens 10 :outputTokens 3}}}) + + (let [result (llm-providers.bedrock/chat! + {:model "us.anthropic.claude-sonnet-4-5-20250929-v1:0" + :api-url "http://localhost:1" + :api-key "fake-token" + :instructions "be terse" + :user-messages [{:role "user" :content [{:type :text :text "hi"}]}] + :past-messages [] + :max-output-tokens 1000} + nil)] + (is (match? {:method "POST" + :uri "/model/us.anthropic.claude-sonnet-4-5-20250929-v1%3A0/converse"} + (select-keys @req* [:method :uri]))) + (is (= "Bearer fake-token" (get-in @req* [:headers "Authorization"]))) + (is (match? {:messages [{:role "user" :content [{:text "hi"}]}] + :system [{:text "be terse"}] + :inferenceConfig {:maxTokens 1000}} + (:body @req*))) + (is (match? {:output-text "Hello!" + :usage {:input-tokens 10 :output-tokens 3}} + result))))))) + +(deftest chat!-non-streaming-tool-loop-test + (testing "a toolUse response yields tools-to-call and call-tools-fn re-issues the request" + (let [call-count* (atom 0)] + (with-client-proxied {} + (fn handler [_] + (swap! call-count* inc) + {:status 200 + :body (if (= 1 @call-count*) + {:output {:message {:content [{:toolUse {:toolUseId "t1" + :name "get_weather" + :input {:city "Paris"}}}]}} + :stopReason "tool_use" + :usage {:inputTokens 5 :outputTokens 2}} + {:output {:message {:content [{:text "It is sunny."}]}} + :stopReason "end_turn" + :usage {:inputTokens 8 :outputTokens 4}})}) + + (let [result (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "weather?"}] :past-messages []} + nil)] + (is (match? [{:id "t1" :full-name "get_weather" :arguments {:city "Paris"}}] + (:tools-to-call result))) + (let [next-result ((:call-tools-fn result) + (constantly {:new-messages [{:role "user" :content "weather?"}] + :tools nil}))] + (is (= 2 @call-count*)) + (is (match? {:output-text "It is sunny."} next-result)))))))) + +;; --- chat! streaming --- + +(defn ^:private collecting-callbacks [events*] + {:on-message-received #(swap! events* conj [:msg %]) + :on-error #(swap! events* conj [:error %]) + :on-reason #(swap! events* conj [:reason %]) + :on-prepare-tool-call #(swap! events* conj [:prepare %]) + :on-tools-called (constantly nil) + :on-usage-updated #(swap! events* conj [:usage %])}) + +(deftest chat!-streaming-test + (testing "decodes a streamed response and drives text + finish + usage callbacks" + (let [events* (atom []) + frames (frames-stream + (make-frame "messageStart" {:role "assistant"}) + (make-frame "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "Hello!"}}) + (make-frame "messageStop" {:stopReason "end_turn"}) + (make-frame "metadata" {:usage {:inputTokens 7 :outputTokens 2}}))] + (with-redefs [http/post (fn [_ _] {:status 200 :body frames})] + (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "hi"}] :past-messages []} + (collecting-callbacks events*))) + (is (= [[:msg {:type :text :text "Hello!"}] + [:msg {:type :finish :finish-reason "end_turn"}] + [:usage {:input-tokens 7 :output-tokens 2 :input-cache-read-tokens 0 :input-cache-creation-tokens 0}]] + @events*))))) + +(deftest chat!-streaming-tool-loop-test + (testing "a streamed tool_use turn re-issues the request with the updated history" + (let [events* (atom []) + call-count* (atom 0) + tool-frames (frames-stream + (make-frame "messageStart" {:role "assistant"}) + (make-frame "contentBlockStart" {:contentBlockIndex 0 + :start {:toolUse {:toolUseId "t1" :name "get_weather"}}}) + (make-frame "contentBlockDelta" {:contentBlockIndex 0 + :delta {:toolUse {:input "{\"city\": \"Paris\"}"}}}) + (make-frame "contentBlockStop" {:contentBlockIndex 0}) + (make-frame "messageStop" {:stopReason "tool_use"})) + final-frames (frames-stream + (make-frame "messageStart" {:role "assistant"}) + (make-frame "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "done"}}) + (make-frame "messageStop" {:stopReason "end_turn"})) + callbacks (assoc (collecting-callbacks events*) + :on-tools-called (fn [tool-calls] + (swap! events* conj [:tools tool-calls]) + {:new-messages [{:role "user" :content "weather?"}] + :tools nil}))] + (with-redefs [http/post (fn [_ _] + (swap! call-count* inc) + {:status 200 :body (if (= 1 @call-count*) tool-frames final-frames)})] + (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "weather?"}] :past-messages []} + callbacks)) + (is (= 2 @call-count*)) + (is (match? [[:prepare {:full-name "get_weather" :id "t1" :arguments-text ""}] + [:prepare {:full-name "get_weather" :id "t1" :arguments-text "{\"city\": \"Paris\"}"}] + [:tools [{:id "t1" :full-name "get_weather" :arguments {"city" "Paris"}}]] + [:msg {:type :text :text "done"}] + [:msg {:type :finish :finish-reason "end_turn"}]] + @events*))))) + +(deftest chat!-error-paths-test + (testing "non-200 response is surfaced as an error result" + (with-client-proxied {} + (fn [_] {:status 400 :body {:message "ValidationException"}}) + (let [result (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "hi"}] :past-messages []} + nil)] + (is (match? {:error {:status 400}} result))))) + + (testing "a transport exception is surfaced as an error result" + (with-redefs [http/post (fn [_ _] (throw (java.io.IOException. "connection refused")))] + (let [result (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "hi"}] :past-messages []} + nil)] + (is (match? {:error {:exception some?}} result))))) + + (testing "a stream that ends without messageStop reports a premature stop" + (let [events* (atom []) + frames (frames-stream + (make-frame "messageStart" {:role "assistant"}) + (make-frame "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "partial"}}))] + (with-redefs [http/post (fn [_ _] {:status 200 :body frames})] + (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "hi"}] :past-messages []} + (collecting-callbacks events*))) + (is (match? [[:msg {:type :text :text "partial"}] + [:error {:error/type :premature-stop}]] + @events*)))) + + (testing "a modeled stream error frame surfaces one error and no premature-stop" + (let [events* (atom []) + frames (frames-stream + (make-frame "messageStart" {:role "assistant"}) + (make-frame "throttlingException" {:message "slow down"}))] + (with-redefs [http/post (fn [_ _] {:status 200 :body frames})] + (llm-providers.bedrock/chat! + {:model "model-x" :api-url "http://localhost:1" :api-key "k" + :user-messages [{:role "user" :content "hi"}] :past-messages []} + (collecting-callbacks events*))) + (is (match? [[:error {:message #"throttlingException"}]] + (filterv #(= :error (first %)) @events*)) + "exactly one error event, no trailing premature-stop")))) + +;; --- handle-stream branches --- + +(deftest handle-stream-test + (let [events* (atom []) + ctx {:on-message-received #(swap! events* conj [:msg %]) + :on-error #(swap! events* conj [:error %]) + :on-reason #(swap! events* conj [:reason %]) + :on-prepare-tool-call #(swap! events* conj [:prepare %]) + :on-usage-updated #(swap! events* conj [:usage %]) + :on-tools-called (constantly nil)} + run! (fn [cb* event data] + (reset! events* []) + (#'llm-providers.bedrock/handle-stream event data cb* ctx) + @events*)] + (testing "text delta emits a message" + (is (= [[:msg {:type :text :text "hi"}]] + (run! (atom {}) "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "hi"}})))) + + (testing "reasoning deltas start, stream and finish a reason block" + (let [cb* (atom {})] + (is (match? [[:reason {:status :started}] [:reason {:status :thinking :text "thinking"}]] + (run! cb* "contentBlockDelta" {:contentBlockIndex 0 + :delta {:reasoningContent {:text "thinking"}}})) + "second event matched after first started it") + (is (match? [[:reason {:status :thinking :text "thinking"}]] + (run! cb* "contentBlockDelta" {:contentBlockIndex 0 + :delta {:reasoningContent {:text "thinking"}}}))) + (is (match? [[:reason {:status :finished :external-id "sig"}]] + (run! cb* "contentBlockDelta" {:contentBlockIndex 0 + :delta {:reasoningContent {:signature "sig"}}}))))) + + (testing "max_tokens stop reason emits limit-reached" + (is (= [[:msg {:type :limit-reached}]] + (run! (atom {}) "messageStop" {:stopReason "max_tokens"})))) + + (testing "metadata emits usage" + (is (match? [[:usage {:input-tokens 3 :output-tokens 1}]] + (run! (atom {}) "metadata" {:usage {:inputTokens 3 :outputTokens 1}})))) + + (testing "an unknown/exception frame is surfaced as an error" + (is (match? [[:error {:message #"Bedrock stream error"}]] + (run! (atom {}) "throttlingException" {:message "slow down"})))) + + (testing "returns true for terminal events (messageStop, error frames), false otherwise" + (is (true? (#'llm-providers.bedrock/handle-stream "messageStop" {:stopReason "end_turn"} (atom {}) ctx))) + (is (true? (#'llm-providers.bedrock/handle-stream "throttlingException" {:message "x"} (atom {}) ctx))) + (is (false? (boolean (#'llm-providers.bedrock/handle-stream + "contentBlockDelta" {:contentBlockIndex 0 :delta {:text "x"}} (atom {}) ctx)))))))