# Example: be/tests/api/v1/test_users.py import pytest from httpx import AsyncClient from app.schemas.user import UserPublic # For response validation from app.core.security import create_access_token pytestmark = pytest.mark.asyncio # Helper function to get a valid token async def get_auth_headers(client: AsyncClient, email: str, password: str) -> dict: """Logs in a user and returns authorization headers.""" login_payload = {"username": email, "password": password} response = await client.post("/api/v1/auth/login", data=login_payload) response.raise_for_status() # Raise exception for non-2xx status token_data = response.json() return {"Authorization": f"Bearer {token_data['access_token']}"} async def test_read_users_me_success(client: AsyncClient): # 1. Create user email = "testme@example.com" password = "password123" signup_res = await client.post( "/api/v1/auth/signup", json={"email": email, "password": password, "name": "Test Me"} ) assert signup_res.status_code == 201 user_data = UserPublic(**signup_res.json()) # Validate signup response # 2. Get token headers = await get_auth_headers(client, email, password) # 3. Request /users/me response = await client.get("/api/v1/users/me", headers=headers) assert response.status_code == 200 me_data = response.json() assert me_data["email"] == email assert me_data["name"] == "Test Me" assert me_data["id"] == user_data.id # Check ID matches signup assert "password" not in me_data assert "hashed_password" not in me_data async def test_read_users_me_no_token(client: AsyncClient): response = await client.get("/api/v1/users/me") # No headers assert response.status_code == 401 # Handled by OAuth2PasswordBearer assert response.json()["detail"] == "Not authenticated" # Default detail from OAuth2PasswordBearer async def test_read_users_me_invalid_token(client: AsyncClient): headers = {"Authorization": "Bearer invalid-token-string"} response = await client.get("/api/v1/users/me", headers=headers) assert response.status_code == 401 assert response.json()["detail"] == "Could not validate credentials" # Detail from our dependency async def test_read_users_me_expired_token(client: AsyncClient): # Create a short-lived token manually (or adjust settings temporarily) email = "testexpired@example.com" # Assume create_access_token allows timedelta override expired_token = create_access_token(subject=email, expires_delta=timedelta(seconds=-10)) headers = {"Authorization": f"Bearer {expired_token}"} response = await client.get("/api/v1/users/me", headers=headers) assert response.status_code == 401 assert response.json()["detail"] == "Could not validate credentials" # Add test case for valid token but user deleted from DB if needed