Files
KiloStar/tests/core/database/database_exception_test.py
T
zhaoxi 209ba45477 refactor(core): decouple actors and remove workflow templates (#67)
Removes the deprecated `workflow_template` concept entirely across both backend API routers, internal logic handling within the `supervisory_node` and `consciousness_node`, and front-end components. Enables `consciousness_node` to work autonomously.

Also refactors core package structure to enforce the "one python package, one Ray Actor" architectural rule. `GlobalWorkflowManager`, `WorkflowRunningEngine`, `PostgresDatabase`, and `WorkerCluster` have been moved to their own top-level decoupled package directories with properly exported `__init__.py` modules. Test suites have been relocated and import paths updated across the system.

Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
Co-authored-by: zhaoxi826 <198742034+zhaoxi826@users.noreply.github.com>
2026-05-06 15:05:47 +08:00

87 lines
2.6 KiB
Python

import pytest
from unittest.mock import patch
from sqlalchemy.exc import IntegrityError, OperationalError
from pydantic import ValidationError
from pretor.utils.error import UserNotExistError
from pretor.core.database.database_exception import database_exception
@database_exception
async def success_func():
return "success"
@database_exception
async def validation_error_func():
raise ValidationError.from_exception_data(title="Mock", line_errors=[])
@database_exception
async def integrity_error_func():
raise IntegrityError("mock_statement", "mock_params", "mock_orig")
@database_exception
async def operational_error_func():
raise OperationalError("mock_statement", "mock_params", "mock_orig")
@database_exception
async def user_not_exist_error_func():
raise UserNotExistError("mock user")
@database_exception
async def exception_func():
raise Exception("mock generic exception")
@pytest.mark.asyncio
async def test_success_func():
assert await success_func() == "success"
@pytest.mark.asyncio
@patch("pretor.core.database.database_exception.logger")
async def test_validation_error(mock_logger):
with pytest.raises(ValidationError):
await validation_error_func()
mock_logger.error.assert_called_once()
assert "对象校验失败" in mock_logger.error.call_args[0][0]
@pytest.mark.asyncio
@patch("pretor.core.database.database_exception.logger")
async def test_integrity_error(mock_logger):
with pytest.raises(IntegrityError):
await integrity_error_func()
mock_logger.error.assert_called_once()
assert "数据库完整性错误" in mock_logger.error.call_args[0][0]
@pytest.mark.asyncio
@patch("pretor.core.database.database_exception.logger")
async def test_operational_error(mock_logger):
with pytest.raises(OperationalError):
await operational_error_func()
mock_logger.error.assert_called_once()
assert "数据库连接异常" in mock_logger.error.call_args[0][0]
@pytest.mark.asyncio
@patch("pretor.core.database.database_exception.logger")
async def test_user_not_exist_error(mock_logger):
result = await user_not_exist_error_func()
assert result is None
mock_logger.error.assert_called_once()
assert "更改密码失败,用户不存在" in mock_logger.error.call_args[0][0]
@pytest.mark.asyncio
@patch("pretor.core.database.database_exception.logger")
async def test_generic_exception(mock_logger):
with pytest.raises(Exception, match="mock generic exception"):
await exception_func()
mock_logger.exception.assert_called_once()
assert "未预期的数据库错误" in mock_logger.exception.call_args[0][0]