65 lines
2.8 KiB
Python
65 lines
2.8 KiB
Python
# Example: be/tests/api/v1/test_users.py
|
|
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
from app.schemas.user import UserPublic # For response validation
|
|
from app.core.security import create_access_token
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
# Helper function to get a valid token
|
|
async def get_auth_headers(client: AsyncClient, email: str, password: str) -> dict:
|
|
"""Logs in a user and returns authorization headers."""
|
|
login_payload = {"username": email, "password": password}
|
|
response = await client.post("/api/v1/auth/login", data=login_payload)
|
|
response.raise_for_status() # Raise exception for non-2xx status
|
|
token_data = response.json()
|
|
return {"Authorization": f"Bearer {token_data['access_token']}"}
|
|
|
|
async def test_read_users_me_success(client: AsyncClient):
|
|
# 1. Create user
|
|
email = "testme@example.com"
|
|
password = "password123"
|
|
signup_res = await client.post(
|
|
"/api/v1/auth/signup", json={"email": email, "password": password, "name": "Test Me"}
|
|
)
|
|
assert signup_res.status_code == 201
|
|
user_data = UserPublic(**signup_res.json()) # Validate signup response
|
|
|
|
# 2. Get token
|
|
headers = await get_auth_headers(client, email, password)
|
|
|
|
# 3. Request /users/me
|
|
response = await client.get("/api/v1/users/me", headers=headers)
|
|
assert response.status_code == 200
|
|
me_data = response.json()
|
|
assert me_data["email"] == email
|
|
assert me_data["name"] == "Test Me"
|
|
assert me_data["id"] == user_data.id # Check ID matches signup
|
|
assert "password" not in me_data
|
|
assert "hashed_password" not in me_data
|
|
|
|
|
|
async def test_read_users_me_no_token(client: AsyncClient):
|
|
response = await client.get("/api/v1/users/me") # No headers
|
|
assert response.status_code == 401 # Handled by OAuth2PasswordBearer
|
|
assert response.json()["detail"] == "Not authenticated" # Default detail from OAuth2PasswordBearer
|
|
|
|
async def test_read_users_me_invalid_token(client: AsyncClient):
|
|
headers = {"Authorization": "Bearer invalid-token-string"}
|
|
response = await client.get("/api/v1/users/me", headers=headers)
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "Could not validate credentials" # Detail from our dependency
|
|
|
|
async def test_read_users_me_expired_token(client: AsyncClient):
|
|
# Create a short-lived token manually (or adjust settings temporarily)
|
|
email = "testexpired@example.com"
|
|
# Assume create_access_token allows timedelta override
|
|
expired_token = create_access_token(subject=email, expires_delta=timedelta(seconds=-10))
|
|
headers = {"Authorization": f"Bearer {expired_token}"}
|
|
|
|
response = await client.get("/api/v1/users/me", headers=headers)
|
|
assert response.status_code == 401
|
|
assert response.json()["detail"] == "Could not validate credentials"
|
|
|
|
# Add test case for valid token but user deleted from DB if needed |