diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9f959c6b..96a55cfc 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -78,11 +78,13 @@ jobs: - name: Check out source code uses: actions/checkout@v4 - - name: Install ruff - run: pip install ruff + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.10' - - name: Ruff lint - run: ruff check . + - name: Install pre-commit + run: pip install pre-commit - - name: Ruff format - run: ruff format --check --diff . + - name: Run pre-commit hooks + run: pre-commit run --all-files --show-diff-on-failure diff --git a/api/auth.py b/api/auth.py index 7a324ed0..31f75e1d 100644 --- a/api/auth.py +++ b/api/auth.py @@ -6,16 +6,93 @@ """User authentication utilities""" +from typing import Optional + +import jwt as pyjwt +from fastapi_users import exceptions, models from fastapi_users.authentication import ( AuthenticationBackend, BearerTransport, JWTStrategy, ) +from fastapi_users.jwt import SecretType, decode_jwt +from fastapi_users.manager import BaseUserManager from passlib.context import CryptContext from .config import AuthSettings +class DualSecretJWTStrategy(JWTStrategy): + """JWTStrategy that accepts tokens signed with either of two secrets. + + Tokens are always *written* with the primary secret. On *read*, the + primary secret is tried first; if verification fails **and** a unified + secret is configured, the token is retried with the unified secret. + """ + + def __init__( + self, + secret: SecretType, + lifetime_seconds: Optional[int], + algorithm: str = "HS256", + unified_secret: str = "", + ): + super().__init__( + secret=secret, + lifetime_seconds=lifetime_seconds, + algorithm=algorithm, + ) + self.unified_secret = unified_secret + + async def read_token( + self, + token: Optional[str], + user_manager: BaseUserManager[models.UP, models.ID], + ) -> Optional[models.UP]: + if token is None: + return None + + # Try primary secret first + user = await self._decode_and_lookup( + token, self.decode_key, user_manager + ) + if user is not None: + return user + + # Fallback to unified secret + if self.unified_secret: + return await self._decode_and_lookup( + token, self.unified_secret, user_manager + ) + + return None + + async def _decode_and_lookup( + self, + token: str, + secret: SecretType, + user_manager: BaseUserManager[models.UP, models.ID], + ) -> Optional[models.UP]: + try: + data = decode_jwt( + token, + secret, + self.token_audience, + algorithms=[self.algorithm], + ) + user_id = data.get("sub") + if user_id is None: + return None + except pyjwt.PyJWTError: + return None + + try: + parsed_id = user_manager.parse_id(user_id) + return await user_manager.get(parsed_id) + except (exceptions.UserNotExists, exceptions.InvalidID): + return None + + class Authentication: """Authentication utility class""" @@ -30,12 +107,13 @@ def get_password_hash(cls, password): """Get a password hash for a given clear text password string""" return cls.CRYPT_CTX.hash(password) - def get_jwt_strategy(self) -> JWTStrategy: + def get_jwt_strategy(self) -> DualSecretJWTStrategy: """Get JWT strategy for authentication backend""" - return JWTStrategy( + return DualSecretJWTStrategy( secret=self._settings.secret_key, algorithm=self._settings.algorithm, lifetime_seconds=self._settings.access_token_expire_seconds, + unified_secret=self._settings.unified_secret, ) def get_user_authentication_backend(self): diff --git a/api/config.py b/api/config.py index 111357a0..f02262b1 100644 --- a/api/config.py +++ b/api/config.py @@ -9,11 +9,11 @@ from pydantic_settings import BaseSettings -# pylint: disable=too-few-public-methods class AuthSettings(BaseSettings): """Authentication settings""" secret_key: str + unified_secret: str = "" algorithm: str = "HS256" # Set to None so tokens don't expire access_token_expire_seconds: float = 315360000 @@ -21,7 +21,6 @@ class AuthSettings(BaseSettings): public_base_url: str | None = None -# pylint: disable=too-few-public-methods class PubSubSettings(BaseSettings): """Pub/Sub settings loaded from the environment""" @@ -35,7 +34,6 @@ class PubSubSettings(BaseSettings): subscriber_state_ttl_days: int = 30 # Cleanup unused subscriber states -# pylint: disable=too-few-public-methods class EmailSettings(BaseSettings): """Email settings""" diff --git a/api/email_sender.py b/api/email_sender.py index 1afb851d..5318c159 100644 --- a/api/email_sender.py +++ b/api/email_sender.py @@ -17,7 +17,7 @@ from .config import EmailSettings -class EmailSender: # pylint: disable=too-few-public-methods +class EmailSender: """Class to send email report using SMTP""" def __init__(self): diff --git a/api/main.py b/api/main.py index efd39c75..27a39548 100644 --- a/api/main.py +++ b/api/main.py @@ -5,8 +5,6 @@ # Author: Jeny Sadadia # Author: Denys Fedoryshchenko -# pylint: disable=unused-argument,global-statement,too-many-lines - """KernelCI API main module""" import asyncio @@ -120,7 +118,7 @@ def _validate_startup_environment(): @asynccontextmanager -async def lifespan(app: FastAPI): # pylint: disable=redefined-outer-name +async def lifespan(app: FastAPI): """Lifespan functions for startup and shutdown events""" await pubsub_startup() await create_indexes() @@ -139,7 +137,7 @@ async def lifespan(app: FastAPI): # pylint: disable=redefined-outer-name app = FastAPI(lifespan=lifespan, debug=True, docs_url=None, redoc_url=None) db = Database(service=os.getenv("MONGO_SERVICE", DEFAULT_MONGO_SERVICE)) auth = Authentication(token_url="user/login") -pubsub = None # pylint: disable=invalid-name +pubsub = None auth_backend = auth.get_user_authentication_backend() fastapi_users_instance = FastAPIUsers[User, PydanticObjectId]( @@ -151,7 +149,7 @@ async def lifespan(app: FastAPI): # pylint: disable=redefined-outer-name async def pubsub_startup(): """Startup event handler to create Pub/Sub object""" - global pubsub # pylint: disable=invalid-name + global pubsub pubsub = await PubSub.create() @@ -557,7 +555,7 @@ async def invite_user( invite_url, ) email_sent = True - except Exception as exc: # pylint: disable=broad-exception-caught + except Exception as exc: print(f"Failed to send invite email: {exc}") return UserInviteResponse( @@ -640,7 +638,7 @@ async def accept_invite(accept: InviteAcceptRequest): try: await user_manager.send_invite_accepted_email(updated_user) - except Exception as exc: # pylint: disable=broad-exception-caught + except Exception as exc: print(f"Failed to send invite accepted email: {exc}") return updated_user diff --git a/api/models.py b/api/models.py index 37390837..65ab0afb 100644 --- a/api/models.py +++ b/api/models.py @@ -3,12 +3,6 @@ # Copyright (C) 2023 Collabora Limited # Author: Jeny Sadadia -# Disable flag as user models don't require any public methods -# at the moment -# pylint: disable=too-few-public-methods - -# pylint: disable=no-name-in-module - """Server-side model definitions""" from datetime import datetime @@ -120,7 +114,7 @@ class UserGroupCreateRequest(BaseModel): class User( BeanieBaseUser, - Document, # pylint: disable=too-many-ancestors + Document, DatabaseModel, ): """API User model""" @@ -131,7 +125,7 @@ class User( ) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = {group.name for group in groups} if len(unique_names) != len(groups): @@ -159,7 +153,7 @@ class UserRead(schemas.BaseUser[PydanticObjectId], ModelId): groups: List[UserGroup] = Field(default=[]) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = {group.name for group in groups} if len(unique_names) != len(groups): @@ -174,7 +168,7 @@ class UserCreateRequest(schemas.BaseUserCreate): groups: List[str] = Field(default=[]) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = set(groups) if len(unique_names) != len(groups): @@ -189,7 +183,7 @@ class UserCreate(schemas.BaseUserCreate): groups: List[UserGroup] = Field(default=[]) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = {group.name for group in groups} if len(unique_names) != len(groups): @@ -206,7 +200,7 @@ class UserUpdateRequest(schemas.BaseUserUpdate): groups: List[str] = Field(default=[]) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = set(groups) if len(unique_names) != len(groups): @@ -223,7 +217,7 @@ class UserUpdate(schemas.BaseUserUpdate): groups: List[UserGroup] = Field(default=[]) @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = {group.name for group in groups} if len(unique_names) != len(groups): @@ -246,7 +240,7 @@ class UserInviteRequest(BaseModel): resend_if_exists: bool = False @field_validator("groups") - def validate_groups(cls, groups): # pylint: disable=no-self-argument + def validate_groups(cls, groups): """Unique group constraint""" unique_names = set(groups) if len(unique_names) != len(groups): diff --git a/api/pubsub_mongo.py b/api/pubsub_mongo.py index 308a086b..dafc0954 100644 --- a/api/pubsub_mongo.py +++ b/api/pubsub_mongo.py @@ -3,7 +3,6 @@ # Copyright (C) 2025 Collabora Limited # Author: Denys Fedoryshchenko -# pylint: disable=duplicate-code # Note: This module intentionally shares interface code with pubsub.py # as both implement the same PubSub API contract @@ -35,7 +34,7 @@ logger = logging.getLogger(__name__) -class PubSub: # pylint: disable=too-many-instance-attributes +class PubSub: """Hybrid Pub/Sub implementation with MongoDB durability Supports two modes: @@ -328,7 +327,6 @@ def _eventhistory_to_cloudevent(self, event: Dict) -> str: ce = CloudEvent(attributes=attributes, data=event.get("data", {})) return to_json(ce).decode("utf-8") - # pylint: disable=too-many-arguments async def _get_missed_events( self, channel: str, @@ -410,7 +408,6 @@ async def subscribe( return sub - # pylint: disable=too-many-arguments async def _setup_durable_subscription( self, sub_id: int, diff --git a/tests/e2e_tests/listen_handler.py b/tests/e2e_tests/listen_handler.py index f32e410d..e65d35a4 100644 --- a/tests/e2e_tests/listen_handler.py +++ b/tests/e2e_tests/listen_handler.py @@ -23,7 +23,7 @@ def create_listen_task(test_async_client, subscription_id): listen_path, headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, ) ) diff --git a/tests/e2e_tests/test_node_handler.py b/tests/e2e_tests/test_node_handler.py index f331a343..c192151e 100644 --- a/tests/e2e_tests/test_node_handler.py +++ b/tests/e2e_tests/test_node_handler.py @@ -24,7 +24,7 @@ async def create_node(test_async_client, node): "node", headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, data=json.dumps(node), ) @@ -44,7 +44,7 @@ async def get_node_by_id(test_async_client, node_id): f"node/{node_id}", headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, ) assert response.status_code == 200 @@ -65,7 +65,7 @@ async def get_node_by_attribute(test_async_client, params): params=params, headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, ) assert response.status_code == 200 @@ -85,7 +85,7 @@ async def update_node(test_async_client, node): f"node/{node['id']}", headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, data=json.dumps(node), ) @@ -104,7 +104,7 @@ async def patch_node(test_async_client, node_id, patch_data): f"node/{node_id}", headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, data=json.dumps(patch_data), ) diff --git a/tests/e2e_tests/test_password_handler.py b/tests/e2e_tests/test_password_handler.py index 67bb8bda..eec5d608 100644 --- a/tests/e2e_tests/test_password_handler.py +++ b/tests/e2e_tests/test_password_handler.py @@ -28,7 +28,7 @@ async def test_password_endpoint(test_async_client): "user/me", headers={ "Accept": "application/json", - "Authorization": f"Bearer {pytest.BEARER_TOKEN}", # pylint: disable=no-member + "Authorization": f"Bearer {pytest.BEARER_TOKEN}", }, data=json.dumps({"password": "foo"}), ) diff --git a/tests/e2e_tests/test_pipeline.py b/tests/e2e_tests/test_pipeline.py index 567d5098..55262539 100644 --- a/tests/e2e_tests/test_pipeline.py +++ b/tests/e2e_tests/test_pipeline.py @@ -45,7 +45,7 @@ async def test_node_pipeline(test_async_client): # Create Task to listen pubsub event on 'node' channel task_listen = create_listen_task( test_async_client, pytest.node_channel_subscription_id - ) # pylint: disable=no-member + ) # Create a node node = { @@ -94,7 +94,7 @@ async def test_node_pipeline(test_async_client): # Create Task to listen 'updated' event on 'node' channel task_listen = create_listen_task( test_async_client, pytest.node_channel_subscription_id - ) # pylint: disable=no-member + ) # Update node.state node.update({"state": "done"}) diff --git a/tests/e2e_tests/test_pubsub_handler.py b/tests/e2e_tests/test_pubsub_handler.py index 36ed772c..3015b7ac 100644 --- a/tests/e2e_tests/test_pubsub_handler.py +++ b/tests/e2e_tests/test_pubsub_handler.py @@ -28,7 +28,7 @@ async def test_pubsub_handler(test_async_client): # Create Task to listen pubsub event on 'test_channel' channel task_listen = create_listen_task( test_async_client, pytest.test_channel_subscription_id - ) # pylint: disable=no-member + ) # Created and publish CloudEvent attributes = { @@ -38,7 +38,7 @@ async def test_pubsub_handler(test_async_client): data = {"message": "Test message"} event = CloudEvent(attributes, data) headers, body = to_structured(event) - headers["Authorization"] = f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member + headers["Authorization"] = f"Bearer {pytest.BEARER_TOKEN}" response = await test_async_client.post( "publish/test_channel", headers=headers, data=body ) diff --git a/tests/e2e_tests/test_subscribe_handler.py b/tests/e2e_tests/test_subscribe_handler.py index 50012068..446896ea 100644 --- a/tests/e2e_tests/test_subscribe_handler.py +++ b/tests/e2e_tests/test_subscribe_handler.py @@ -24,9 +24,7 @@ async def test_subscribe_node_channel(test_async_client): """ response = await test_async_client.post( "subscribe/node", - headers={ - "Authorization": f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member - }, + headers={"Authorization": f"Bearer {pytest.BEARER_TOKEN}"}, ) pytest.node_channel_subscription_id = response.json()["id"] assert response.status_code == 200 @@ -52,9 +50,7 @@ async def test_subscribe_test_channel(test_async_client): """ response = await test_async_client.post( "subscribe/test_channel", - headers={ - "Authorization": f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member - }, + headers={"Authorization": f"Bearer {pytest.BEARER_TOKEN}"}, ) pytest.test_channel_subscription_id = response.json()["id"] assert response.status_code == 200 @@ -81,9 +77,7 @@ async def test_subscribe_user_group_channel(test_async_client): """ response = await test_async_client.post( "subscribe/user_group", - headers={ - "Authorization": f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member - }, + headers={"Authorization": f"Bearer {pytest.BEARER_TOKEN}"}, ) pytest.user_group_channel_subscription_id = response.json()["id"] assert response.status_code == 200 diff --git a/tests/e2e_tests/test_unsubscribe_handler.py b/tests/e2e_tests/test_unsubscribe_handler.py index dff85f65..16f613f2 100644 --- a/tests/e2e_tests/test_unsubscribe_handler.py +++ b/tests/e2e_tests/test_unsubscribe_handler.py @@ -24,10 +24,8 @@ async def test_unsubscribe_node_channel(test_async_client): HTTP Response Code 200 OK """ response = await test_async_client.post( - f"unsubscribe/{pytest.node_channel_subscription_id}", # pylint: disable=no-member - headers={ - "Authorization": f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member - }, + f"unsubscribe/{pytest.node_channel_subscription_id}", + headers={"Authorization": f"Bearer {pytest.BEARER_TOKEN}"}, ) assert response.status_code == 200 @@ -47,9 +45,7 @@ async def test_unsubscribe_test_channel(test_async_client): HTTP Response Code 200 OK """ response = await test_async_client.post( - f"unsubscribe/{pytest.test_channel_subscription_id}", # pylint: disable=no-member - headers={ - "Authorization": f"Bearer {pytest.BEARER_TOKEN}" # pylint: disable=no-member - }, + f"unsubscribe/{pytest.test_channel_subscription_id}", + headers={"Authorization": f"Bearer {pytest.BEARER_TOKEN}"}, ) assert response.status_code == 200 diff --git a/tests/e2e_tests/test_user_invite.py b/tests/e2e_tests/test_user_invite.py index 4932f611..e9c388d4 100644 --- a/tests/e2e_tests/test_user_invite.py +++ b/tests/e2e_tests/test_user_invite.py @@ -2,8 +2,6 @@ # # Copyright (C) 2025 Collabora Limited # -# pylint: disable=unused-argument - """End-to-end test functions for KernelCI API invite flow""" import json diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index ad214462..9b168128 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -6,8 +6,6 @@ # Copyright (C) 2022, 2023 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=protected-access - """pytest fixtures for KernelCI API""" from unittest.mock import AsyncMock diff --git a/tests/unit_tests/test_authz_handler.py b/tests/unit_tests/test_authz_handler.py index 0475f1e4..a6b92b2e 100644 --- a/tests/unit_tests/test_authz_handler.py +++ b/tests/unit_tests/test_authz_handler.py @@ -4,8 +4,6 @@ """Unit tests for authorization helpers and user self-update.""" -# pylint: disable=duplicate-code - import json from kernelci.api.models import Node, Revision diff --git a/tests/unit_tests/test_count_handler.py b/tests/unit_tests/test_count_handler.py index 5a70c339..a3cfd2c8 100644 --- a/tests/unit_tests/test_count_handler.py +++ b/tests/unit_tests/test_count_handler.py @@ -3,8 +3,6 @@ # Copyright (C) 2022 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test functions for KernelCI API count handler""" diff --git a/tests/unit_tests/test_dual_secret_jwt.py b/tests/unit_tests/test_dual_secret_jwt.py new file mode 100644 index 00000000..42080fe4 --- /dev/null +++ b/tests/unit_tests/test_dual_secret_jwt.py @@ -0,0 +1,284 @@ +# SPDX-License-Identifier: LGPL-2.1-or-later +# +# Copyright (C) 2025 Collabora Limited +# Author: Denys Fedoryshchenko + +"""Unit tests for DualSecretJWTStrategy""" + +import time +from unittest.mock import AsyncMock, MagicMock + +import jwt as pyjwt +import pytest +from fastapi_users.jwt import generate_jwt + +from api.auth import DualSecretJWTStrategy + +PRIMARY_SECRET = "primary-secret-key" +UNIFIED_SECRET = "unified-secret-key" +WRONG_SECRET = "wrong-secret-key" +USER_ID = "65265305c74695807499037f" +AUDIENCE = ["fastapi-users:auth"] + + +def _make_user_manager(user=None): + """Create a mock user manager that returns the given user.""" + manager = AsyncMock() + manager.parse_id = MagicMock(return_value=USER_ID) + manager.get = AsyncMock(return_value=user) + return manager + + +def _make_user(): + """Create a minimal mock user.""" + user = MagicMock() + user.id = USER_ID + return user + + +def _generate_token(secret, user_id=USER_ID): + """Generate a JWT token with the given secret.""" + data = { + "sub": user_id, + "aud": AUDIENCE, + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + } + return generate_jwt(data, secret, lifetime_seconds=3600) + + +@pytest.mark.asyncio +async def test_read_token_primary_secret(): + """Token signed with primary secret should authenticate.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + user = _make_user() + manager = _make_user_manager(user) + token = _generate_token(PRIMARY_SECRET) + + result = await strategy.read_token(token, manager) + assert result is user + + +@pytest.mark.asyncio +async def test_read_token_unified_secret(): + """Token signed with unified secret should authenticate via fallback.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + user = _make_user() + manager = _make_user_manager(user) + token = _generate_token(UNIFIED_SECRET) + + result = await strategy.read_token(token, manager) + assert result is user + + +@pytest.mark.asyncio +async def test_read_token_wrong_secret(): + """Token signed with unknown secret should fail.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager(_make_user()) + token = _generate_token(WRONG_SECRET) + + result = await strategy.read_token(token, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_read_token_no_unified_secret(): + """Without unified secret, only primary secret works.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret="", + ) + user = _make_user() + manager = _make_user_manager(user) + + # Primary should work + token_ok = _generate_token(PRIMARY_SECRET) + result = await strategy.read_token(token_ok, manager) + assert result is user + + # Unified-signed token should fail + token_fail = _generate_token(UNIFIED_SECRET) + result = await strategy.read_token(token_fail, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_read_token_none(): + """None token should return None.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager() + + result = await strategy.read_token(None, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_read_token_user_not_found(): + """Valid token but user not in DB should return None.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager(user=None) + token = _generate_token(PRIMARY_SECRET) + + result = await strategy.read_token(token, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_unified_token_primary_secret(): + """Unified token (all fields) signed with primary secret should work.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + user = _make_user() + manager = _make_user_manager(user) + data = { + "sub": USER_ID, + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + "permissions": ["checkout", "testretry", "patchset"], + "aud": AUDIENCE, + } + token = generate_jwt(data, PRIMARY_SECRET, lifetime_seconds=3600) + + result = await strategy.read_token(token, manager) + assert result is user + + +@pytest.mark.asyncio +async def test_unified_token_unified_secret(): + """Unified token (all fields) signed with unified secret should work.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + user = _make_user() + manager = _make_user_manager(user) + data = { + "sub": USER_ID, + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + "permissions": ["checkout", "testretry", "patchset"], + "aud": AUDIENCE, + } + token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600) + + result = await strategy.read_token(token, manager) + assert result is user + + +@pytest.mark.asyncio +async def test_unified_token_expired(): + """Expired unified token should be rejected.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager(_make_user()) + data = { + "sub": USER_ID, + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + "permissions": ["checkout", "testretry", "patchset"], + "aud": AUDIENCE, + "exp": int(time.time()) - 3600, # expired 1 hour ago + } + token = pyjwt.encode(data, UNIFIED_SECRET, algorithm="HS256") + + result = await strategy.read_token(token, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_unified_token_wrong_audience(): + """Unified token with wrong audience should be rejected.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager(_make_user()) + data = { + "sub": USER_ID, + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + "permissions": ["checkout", "testretry", "patchset"], + "aud": ["wrong-audience"], + } + token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600) + + result = await strategy.read_token(token, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_unified_token_missing_sub(): + """Unified token without sub claim should be rejected.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + manager = _make_user_manager(_make_user()) + data = { + "email": "test@kernelci.org", + "origin": "kernelci-pipeline", + "permissions": ["checkout", "testretry", "patchset"], + "aud": AUDIENCE, + } + token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600) + + result = await strategy.read_token(token, manager) + assert result is None + + +@pytest.mark.asyncio +async def test_write_token_uses_primary_secret(): + """write_token should always use the primary secret.""" + strategy = DualSecretJWTStrategy( + secret=PRIMARY_SECRET, + lifetime_seconds=3600, + unified_secret=UNIFIED_SECRET, + ) + user = _make_user() + token = await strategy.write_token(user) + + # Should be verifiable with primary secret + manager = _make_user_manager(user) + result = await strategy.read_token(token, manager) + assert result is user + + # Verify it was NOT signed with unified secret by creating + # a strategy that only knows the unified secret + strategy_unified_only = DualSecretJWTStrategy( + secret=UNIFIED_SECRET, + lifetime_seconds=3600, + unified_secret="", + ) + result = await strategy_unified_only.read_token(token, manager) + assert result is None diff --git a/tests/unit_tests/test_listen_handler.py b/tests/unit_tests/test_listen_handler.py index 43f727ce..30be9c9c 100644 --- a/tests/unit_tests/test_listen_handler.py +++ b/tests/unit_tests/test_listen_handler.py @@ -6,8 +6,6 @@ # Copyright (C) 2022 Jeny Sadadia # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test functions for KernelCI API listen handler""" from tests.unit_tests.conftest import BEARER_TOKEN diff --git a/tests/unit_tests/test_node_handler.py b/tests/unit_tests/test_node_handler.py index fcd37a51..88cd1de1 100644 --- a/tests/unit_tests/test_node_handler.py +++ b/tests/unit_tests/test_node_handler.py @@ -6,8 +6,6 @@ # Copyright (C) 2022 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test functions for KernelCI API node handler""" import json diff --git a/tests/unit_tests/test_pubsub.py b/tests/unit_tests/test_pubsub.py index 385ae5a1..bc28d3a3 100644 --- a/tests/unit_tests/test_pubsub.py +++ b/tests/unit_tests/test_pubsub.py @@ -4,8 +4,6 @@ # Author: Michal Galka # Author: Alexandra Pereira -# pylint: disable=protected-access - """Unit test functions for KernelCI API Pub/Sub""" import json diff --git a/tests/unit_tests/test_root_handler.py b/tests/unit_tests/test_root_handler.py index 9d0efa5b..f74955c5 100644 --- a/tests/unit_tests/test_root_handler.py +++ b/tests/unit_tests/test_root_handler.py @@ -3,8 +3,6 @@ # Copyright (C) 2022 Jeny Sadadia # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test function for KernelCI API root handler""" diff --git a/tests/unit_tests/test_subscribe_handler.py b/tests/unit_tests/test_subscribe_handler.py index 808d396a..9d97bc85 100644 --- a/tests/unit_tests/test_subscribe_handler.py +++ b/tests/unit_tests/test_subscribe_handler.py @@ -3,8 +3,6 @@ # Copyright (C) 2022 Jeny Sadadia # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test function for KernelCI API subscribe handler""" from api.pubsub import Subscription diff --git a/tests/unit_tests/test_token_handler.py b/tests/unit_tests/test_token_handler.py index 34405820..0b57c863 100644 --- a/tests/unit_tests/test_token_handler.py +++ b/tests/unit_tests/test_token_handler.py @@ -6,8 +6,6 @@ # Copyright (C) 2022, 2023 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test function for KernelCI API token handler""" import pytest diff --git a/tests/unit_tests/test_unsubscribe_handler.py b/tests/unit_tests/test_unsubscribe_handler.py index afa1a982..8ac10f2b 100644 --- a/tests/unit_tests/test_unsubscribe_handler.py +++ b/tests/unit_tests/test_unsubscribe_handler.py @@ -3,8 +3,6 @@ # Copyright (C) 2022 Jeny Sadadia # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test functions for KernelCI API unsubscribe handler""" from tests.unit_tests.conftest import BEARER_TOKEN diff --git a/tests/unit_tests/test_user_handler.py b/tests/unit_tests/test_user_handler.py index c85a899a..7c13770f 100644 --- a/tests/unit_tests/test_user_handler.py +++ b/tests/unit_tests/test_user_handler.py @@ -3,8 +3,6 @@ # Copyright (C) 2022, 2023 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test function for KernelCI API user handler""" import json diff --git a/tests/unit_tests/test_whoami_handler.py b/tests/unit_tests/test_whoami_handler.py index 0b579008..0c901252 100644 --- a/tests/unit_tests/test_whoami_handler.py +++ b/tests/unit_tests/test_whoami_handler.py @@ -6,8 +6,6 @@ # Copyright (C) 2023 Collabora Limited # Author: Jeny Sadadia -# pylint: disable=unused-argument - """Unit test function for KernelCI API whoami handler""" import pytest