Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions authentik/stages/email/tests/test_stage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""email tests"""

from datetime import timedelta
from hashlib import sha256
from unittest.mock import MagicMock, PropertyMock, patch

Expand All @@ -9,6 +10,7 @@
from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
from django.urls import reverse
from django.utils.http import urlencode
from django.utils.timezone import now

from authentik.brands.models import Brand
from authentik.core.tests.utils import RequestFactory, create_test_admin_user, create_test_flow
Expand Down Expand Up @@ -187,6 +189,43 @@ def test_token(self):
self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user)
self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active)

def test_token_expired(self):
"""Test with expired token"""
# Make sure token exists
self.test_pending_user()
self.user.is_active = False
self.user.save()
plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
session = self.client.session
session[SESSION_KEY_PLAN] = plan
session.save()
token: FlowToken = FlowToken.objects.get(user=self.user)
token.expires = now() - timedelta(minutes=1)
token.save()

with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
# Call the executor shell to preseed the session
url = reverse(
"authentik_api:flow-executor",
kwargs={"flow_slug": self.flow.slug},
)
url_query = urlencode(
{
QS_KEY_TOKEN: token.key,
}
)
url += f"?query={url_query}"
self.client.get(url)

# Call the actual executor to get the JSON Response
response = self.client.get(
reverse(
"authentik_api:flow-executor",
kwargs={"flow_slug": self.flow.slug},
)
)
self.assertStageResponse(response, self.flow, component="ak-stage-access-denied")

def test_token_invalid_user(self):
"""Test with token with invalid user"""
# Make sure token exists
Expand Down
Loading