Commit fc18dcb0 by Performance System

fix:1

parent eb34b9e8
Pipeline #3214 failed with stage
in 2 minutes 9 seconds
# 前端环境变量配置
# API 服务地址
# API 服务地址 - 直接指向后端服务
VITE_API_BASE_URL=http://localhost:8000
# 应用配置
......
......@@ -26,7 +26,7 @@ deploy_to_production:
cd performance-score
git pull origin master
docker compose up -d --build
echo '🚀 部署流程执行完毕!!!'
echo ' 部署流程执行完毕!!!'
"
only:
......
#!/usr/bin/env python3
"""
检查数据库表结构脚本
"""
import asyncio
from sqlalchemy import text
from database import database
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def check_table_structure():
"""检查 monthly_history 表结构"""
try:
# 连接数据库
await database.connect()
logger.info("已连接到数据库")
# 查询表结构
query = text("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'monthly_history'
ORDER BY ordinal_position;
""")
result = await database.fetch_all(query)
logger.info("monthly_history 表结构:")
logger.info("-" * 50)
for row in result:
logger.info(f"列名: {row['column_name']}, 类型: {row['data_type']}, 可空: {row['is_nullable']}")
# 检查是否存在 institutions_data 列
institutions_data_exists = any(row['column_name'] == 'institutions_data' for row in result)
if institutions_data_exists:
logger.info("✅ institutions_data 列存在")
else:
logger.error("❌ institutions_data 列不存在")
# 尝试添加列
logger.info("正在尝试添加 institutions_data 列...")
alter_query = text("""
ALTER TABLE monthly_history
ADD COLUMN institutions_data JSONB;
""")
await database.execute(alter_query)
logger.info("✅ 成功添加 institutions_data 列")
await database.disconnect()
logger.info("数据库连接已关闭")
except Exception as e:
logger.error(f"❌ 检查表结构失败: {e}")
raise
if __name__ == "__main__":
asyncio.run(check_table_structure())
......@@ -38,15 +38,7 @@ class Settings(BaseSettings):
# CORS 配置
CORS_ORIGINS: List[str] = [
"http://localhost:5173", # Vite 开发服务器
"http://localhost:3000", # React 开发服务器
"http://localhost:4001", # Docker 前端
"http://localhost:4003", # 其他前端
"http://localhost:8080", # 生产环境前端
"http://127.0.0.1:3000",
"http://127.0.0.1:4001",
"http://127.0.0.1:4003",
"http://127.0.0.1:8080",
"*", # 开发环境允许所有来源
]
# WebSocket 配置
......
......@@ -80,6 +80,7 @@ monthly_history_table = Table(
Column("total_institutions", Integer, nullable=False),
Column("total_images", Integer, nullable=False),
Column("user_stats", JSONB, nullable=False),
Column("institutions_data", JSONB, nullable=True), # 新增:存储完整的机构和图片数据
Column("created_at", TIMESTAMP(timezone=True), server_default=func.now()),
)
......@@ -122,7 +123,7 @@ class DatabaseManager:
logger.error(f"批量数据库操作失败: {e}")
raise
async def transaction(self):
def transaction(self):
"""创建数据库事务"""
return self.database.transaction()
......@@ -168,7 +169,7 @@ async def insert_default_data():
admin_exists = await database.fetch_one(
users_table.select().where(users_table.c.id == "admin")
)
if not admin_exists:
# 插入默认管理员用户
await database.execute(
......@@ -182,6 +183,15 @@ async def insert_default_data():
)
)
logger.info("✅ 默认管理员用户创建成功")
else:
# 检查管理员用户密码是否为空,如果为空则设置默认密码
if not admin_exists["password"]:
await database.execute(
users_table.update().where(users_table.c.id == "admin").values(
password="admin123"
)
)
logger.info("✅ 管理员用户密码已修复")
# 检查并插入默认系统配置
config_items = [
......
......@@ -17,6 +17,7 @@ from loguru import logger
from database import database, engine, metadata
from config import settings
from routers import users, institutions, system_config, history, migration
from scheduler import monthly_scheduler
@asynccontextmanager
......@@ -32,11 +33,50 @@ async def lifespan(app: FastAPI):
# 创建表结构(如果不存在)
metadata.create_all(engine)
logger.info("✅ 数据库表结构检查完成")
# 执行数据库迁移
try:
from migrations.loader import load_migrations
from migrations.manager import migration_manager
# 加载迁移文件
migration_count = load_migrations()
logger.info(f"✅ 加载了 {migration_count} 个迁移文件")
# 执行迁移
migration_result = await migration_manager.migrate_to_latest()
if migration_result["success"]:
if migration_result["executed_migrations"] > 0:
logger.info(f"🎉 数据库迁移完成!执行了 {migration_result['executed_migrations']} 个迁移")
else:
logger.info("✅ 数据库已是最新版本,无需迁移")
else:
logger.error("❌ 数据库迁移失败")
logger.error(f"失败的迁移: {migration_result.get('failed_migrations', [])}")
# 在生产环境中,可以选择是否继续启动应用
# 这里我们记录错误但继续启动,让管理员可以通过API手动处理
except Exception as e:
logger.error(f"数据库迁移过程异常: {e}")
logger.error("应用将继续启动,但建议检查迁移状态")
# 插入默认数据
from database import insert_default_data
await insert_default_data()
logger.info("✅ 默认数据初始化完成")
# 启动定时任务调度器
await monthly_scheduler.start()
yield
# 关闭时执行
logger.info("🔄 正在关闭 API 服务")
# 停止定时任务调度器
await monthly_scheduler.stop()
await database.disconnect()
logger.info("✅ 数据库连接已关闭")
......
#!/usr/bin/env python3
"""
独立的数据库迁移工具脚本
可以在容器外或CI/CD中使用,不依赖FastAPI应用
使用方法:
python migrate.py # 执行所有待执行的迁移
python migrate.py --status # 查看迁移状态
python migrate.py --version # 查看当前版本
python migrate.py --reload # 重新加载迁移文件
python migrate.py --help # 显示帮助信息
环境变量:
DATABASE_URL: 数据库连接URL
MIGRATION_LOG_LEVEL: 日志级别 (DEBUG, INFO, WARNING, ERROR)
"""
import asyncio
import sys
import argparse
import os
from pathlib import Path
from loguru import logger
# 添加项目根目录到Python路径
sys.path.insert(0, str(Path(__file__).parent))
def setup_logging(level: str = "INFO"):
"""设置日志配置"""
logger.remove() # 移除默认处理器
# 添加控制台输出
logger.add(
sys.stdout,
level=level,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
colorize=True
)
# 添加文件输出
logger.add(
"logs/migration.log",
level=level,
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
rotation="10 MB",
retention="30 days",
compression="zip"
)
async def show_migration_status():
"""显示迁移状态"""
try:
from migrations.manager import migration_manager
from migrations.loader import load_migrations
# 加载迁移
migration_count = load_migrations()
logger.info(f"加载了 {migration_count} 个迁移文件")
# 获取状态
status = await migration_manager.get_migration_status()
print("\n" + "="*60)
print("📊 数据库迁移状态")
print("="*60)
print(f"总迁移数量: {status['total_migrations']}")
print(f"已执行数量: {status['executed_count']}")
print(f"待执行数量: {status['pending_count']}")
print(f"是否最新版本: {'✅ 是' if status['is_up_to_date'] else '❌ 否'}")
if status['executed_migrations']:
print(f"\n已执行的迁移:")
for version in status['executed_migrations']:
print(f" ✅ {version}")
if status['pending_migrations']:
print(f"\n待执行的迁移:")
for migration in status['pending_migrations']:
print(f" ⏳ {migration['version']}: {migration['description']}")
print("="*60)
except Exception as e:
logger.error(f"获取迁移状态失败: {e}")
sys.exit(1)
async def show_current_version():
"""显示当前版本"""
try:
from migrations.manager import migration_manager
executed_migrations = await migration_manager.get_executed_migrations()
current_version = executed_migrations[-1] if executed_migrations else "0.0.0"
print(f"\n当前数据库schema版本: {current_version}")
print(f"已执行迁移数量: {len(executed_migrations)}")
except Exception as e:
logger.error(f"获取当前版本失败: {e}")
sys.exit(1)
async def execute_migrations():
"""执行迁移"""
try:
from migrations.loader import load_migrations
from migrations.manager import migration_manager
logger.info("🚀 开始独立迁移过程")
# 加载迁移
migration_count = load_migrations()
logger.info(f"✅ 加载了 {migration_count} 个迁移")
# 执行迁移
result = await migration_manager.migrate_to_latest()
if result["success"]:
if result["executed_migrations"] > 0:
logger.info("🎉 迁移完成!")
print(f"\n✅ 成功执行 {result['executed_migrations']} 个迁移")
print(f"📋 总待执行迁移: {result.get('total_pending', 0)}")
else:
logger.info("✅ 数据库已是最新版本")
print("\n✅ 数据库已是最新版本,无需迁移")
else:
logger.error("❌ 迁移失败!")
print(f"\n❌ 失败的迁移: {result.get('failed_migrations', [])}")
print(f"❌ 错误信息: {result.get('message', '未知错误')}")
sys.exit(1)
except Exception as e:
logger.error(f"迁移过程异常: {e}")
print(f"\n❌ 迁移异常: {e}")
sys.exit(1)
async def reload_migrations():
"""重新加载迁移文件"""
try:
from migrations.loader import reload_migrations
logger.info("🔄 重新加载迁移文件")
migration_count = reload_migrations()
print(f"\n✅ 成功重新加载 {migration_count} 个迁移文件")
except Exception as e:
logger.error(f"重新加载迁移文件失败: {e}")
sys.exit(1)
async def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="数据库迁移工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python migrate.py # 执行所有待执行的迁移
python migrate.py --status # 查看迁移状态
python migrate.py --version # 查看当前版本
python migrate.py --reload # 重新加载迁移文件
环境变量:
DATABASE_URL: 数据库连接URL
MIGRATION_LOG_LEVEL: 日志级别 (DEBUG, INFO, WARNING, ERROR)
"""
)
parser.add_argument(
"--status",
action="store_true",
help="显示迁移状态"
)
parser.add_argument(
"--version",
action="store_true",
help="显示当前数据库schema版本"
)
parser.add_argument(
"--reload",
action="store_true",
help="重新加载迁移文件"
)
parser.add_argument(
"--log-level",
default=os.getenv("MIGRATION_LOG_LEVEL", "INFO"),
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="日志级别"
)
args = parser.parse_args()
# 设置日志
setup_logging(args.log_level)
try:
# 连接数据库
from database import database
await database.connect()
logger.info("✅ 数据库连接成功")
# 根据参数执行相应操作
if args.status:
await show_migration_status()
elif args.version:
await show_current_version()
elif args.reload:
await reload_migrations()
else:
# 默认执行迁移
await execute_migrations()
except Exception as e:
logger.error(f"程序执行异常: {e}")
print(f"\n❌ 程序异常: {e}")
sys.exit(1)
finally:
try:
from database import database
await database.disconnect()
logger.info("✅ 数据库连接已关闭")
except:
pass
if __name__ == "__main__":
# 确保logs目录存在
Path("logs").mkdir(exist_ok=True)
# 运行主函数
asyncio.run(main())
#!/usr/bin/env python3
"""
数据库迁移脚本:为 monthly_history 表添加 institutions_data 列
"""
import asyncio
from sqlalchemy import text
from database import database, engine
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def migrate_database():
"""执行数据库迁移"""
try:
# 连接数据库
await database.connect()
logger.info("已连接到数据库")
# 检查列是否已存在
check_column_query = text("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'monthly_history'
AND column_name = 'institutions_data';
""")
result = await database.fetch_all(check_column_query)
if result:
logger.info("institutions_data 列已存在,无需迁移")
else:
# 添加 institutions_data 列
alter_query = text("""
ALTER TABLE monthly_history
ADD COLUMN institutions_data JSONB;
""")
await database.execute(alter_query)
logger.info("✅ 成功添加 institutions_data 列")
# 验证列是否添加成功
verify_result = await database.fetch_all(check_column_query)
if verify_result:
logger.info("✅ 验证成功:institutions_data 列已添加")
else:
logger.error("❌ 验证失败:institutions_data 列未添加")
await database.disconnect()
logger.info("数据库连接已关闭")
except Exception as e:
logger.error(f"❌ 数据库迁移失败: {e}")
raise
if __name__ == "__main__":
asyncio.run(migrate_database())
"""
数据库迁移系统
提供安全的数据库schema更新机制
特性:
- 事务保护:所有迁移在事务中执行
- 验证机制:迁移前后都有验证步骤
- 回滚支持:每个迁移都有对应的回滚方法
- 备份点:迁移前自动创建备份点记录
- 原子性:迁移要么全部成功,要么全部失败
- 版本跟踪:精确记录每个迁移的执行状态
"""
__version__ = "1.0.0"
__author__ = "Performance System Team"
from .base import Migration, MigrationError
from .manager import MigrationManager, migration_manager
__all__ = [
"Migration",
"MigrationError",
"MigrationManager",
"migration_manager"
]
"""
迁移基础类和工具
提供迁移系统的核心抽象类和异常定义
"""
import asyncio
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from datetime import datetime
from sqlalchemy import text
from loguru import logger
import json
class MigrationError(Exception):
"""迁移异常类"""
def __init__(self, message: str, migration_version: str = None, original_error: Exception = None):
self.message = message
self.migration_version = migration_version
self.original_error = original_error
super().__init__(self.message)
def __str__(self):
if self.migration_version:
return f"Migration {self.migration_version}: {self.message}"
return self.message
class Migration(ABC):
"""迁移基础抽象类
所有具体的迁移都应该继承这个类并实现必要的方法
"""
def __init__(self, version: str, description: str, dependencies: List[str] = None):
"""
初始化迁移
Args:
version: 迁移版本号,建议使用语义化版本号如 "1.0.1"
description: 迁移描述,简要说明这个迁移做了什么
dependencies: 依赖的迁移版本列表,确保迁移顺序正确
"""
self.version = version
self.description = description
self.dependencies = dependencies or []
self.executed_at: Optional[datetime] = None
self.execution_time: Optional[float] = None
# 验证版本号格式
if not version or not isinstance(version, str):
raise MigrationError("迁移版本号不能为空且必须是字符串")
# 验证描述
if not description or not isinstance(description, str):
raise MigrationError("迁移描述不能为空且必须是字符串")
@abstractmethod
async def up(self, db) -> bool:
"""
执行迁移 - 必须实现
Args:
db: 数据库管理器实例
Returns:
bool: 迁移是否成功执行
Raises:
MigrationError: 迁移执行失败时抛出
"""
pass
@abstractmethod
async def down(self, db) -> bool:
"""
回滚迁移 - 必须实现
Args:
db: 数据库管理器实例
Returns:
bool: 回滚是否成功执行
Raises:
MigrationError: 回滚执行失败时抛出
"""
pass
async def validate_before_up(self, db) -> bool:
"""
迁移前验证 - 可选重写
在执行迁移前进行必要的验证,如检查表是否存在、字段是否已存在等
Args:
db: 数据库管理器实例
Returns:
bool: 验证是否通过
"""
return True
async def validate_after_up(self, db) -> bool:
"""
迁移后验证 - 可选重写
在执行迁移后进行验证,确保迁移结果符合预期
Args:
db: 数据库管理器实例
Returns:
bool: 验证是否通过
"""
return True
async def get_rollback_sql(self, db) -> Optional[str]:
"""
获取回滚SQL - 可选重写
返回用于回滚的SQL语句,用于记录和紧急回滚
Args:
db: 数据库管理器实例
Returns:
Optional[str]: 回滚SQL语句,如果不需要则返回None
"""
return None
def get_checksum(self) -> str:
"""
获取迁移校验和
用于验证迁移文件是否被修改,确保迁移的一致性
Returns:
str: 迁移内容的校验和
"""
import hashlib
content = f"{self.version}:{self.description}:{str(self.dependencies)}"
return hashlib.md5(content.encode()).hexdigest()
def __str__(self):
return f"Migration {self.version}: {self.description}"
def __repr__(self):
return f"<Migration(version='{self.version}', description='{self.description}')>"
def __eq__(self, other):
if not isinstance(other, Migration):
return False
return self.version == other.version
def __hash__(self):
return hash(self.version)
def version_to_tuple(version: str) -> tuple:
"""
将版本号转换为可比较的元组
Args:
version: 版本号字符串,如 "1.0.1"
Returns:
tuple: 可比较的版本元组,如 (1, 0, 1)
"""
try:
# 尝试解析语义化版本号
parts = version.split('.')
return tuple(int(part) for part in parts)
except (ValueError, AttributeError):
# 如果不是标准版本号格式,按字符串排序
return (version,)
def compare_versions(version1: str, version2: str) -> int:
"""
比较两个版本号
Args:
version1: 第一个版本号
version2: 第二个版本号
Returns:
int: -1 如果 version1 < version2, 0 如果相等, 1 如果 version1 > version2
"""
tuple1 = version_to_tuple(version1)
tuple2 = version_to_tuple(version2)
if tuple1 < tuple2:
return -1
elif tuple1 > tuple2:
return 1
else:
return 0
"""
迁移加载器 - 自动发现和注册迁移
负责扫描migrations/versions目录,自动加载和注册所有迁移文件
"""
import os
import sys
import importlib
import inspect
from pathlib import Path
from typing import List, Type
from loguru import logger
from .base import Migration
from .manager import migration_manager
class MigrationLoader:
"""迁移加载器"""
def __init__(self, versions_dir: str = None):
"""
初始化迁移加载器
Args:
versions_dir: 迁移版本目录路径,默认为当前目录下的versions
"""
if versions_dir is None:
self.versions_dir = Path(__file__).parent / "versions"
else:
self.versions_dir = Path(versions_dir)
self.loaded_migrations: List[Migration] = []
def discover_migration_files(self) -> List[str]:
"""发现所有迁移文件"""
if not self.versions_dir.exists():
logger.warning(f"迁移目录不存在,创建目录: {self.versions_dir}")
self.versions_dir.mkdir(parents=True, exist_ok=True)
return []
# 扫描迁移文件
migration_files = []
for file_path in self.versions_dir.glob("v*.py"):
if file_path.name != "__init__.py" and not file_path.name.startswith('.'):
migration_files.append(file_path.stem)
# 按文件名排序(版本号排序)
migration_files.sort()
logger.info(f"发现 {len(migration_files)} 个迁移文件: {migration_files}")
return migration_files
def load_migration_from_file(self, module_name: str) -> List[Migration]:
"""从文件加载迁移"""
migrations = []
try:
# 构建完整的模块路径
full_module_name = f"migrations.versions.{module_name}"
# 动态导入模块
if full_module_name in sys.modules:
# 如果模块已经导入,重新加载
module = importlib.reload(sys.modules[full_module_name])
else:
module = importlib.import_module(full_module_name)
# 查找Migration类的子类
for attr_name in dir(module):
attr = getattr(module, attr_name)
# 检查是否是Migration的子类(但不是Migration本身)
if (inspect.isclass(attr) and
issubclass(attr, Migration) and
attr is not Migration):
try:
# 实例化迁移
migration_instance = attr()
migrations.append(migration_instance)
logger.debug(f"从 {module_name} 加载迁移: {migration_instance.version}")
except Exception as e:
logger.error(f"实例化迁移 {attr_name} 失败: {e}")
continue
if not migrations:
logger.warning(f"在文件 {module_name} 中未找到有效的迁移类")
except Exception as e:
logger.error(f"加载迁移文件 {module_name} 失败: {e}")
logger.error(f"错误详情: {type(e).__name__}: {str(e)}")
return migrations
def load_all_migrations(self) -> List[Migration]:
"""加载所有迁移"""
logger.info("🔍 开始扫描和加载迁移文件")
migration_files = self.discover_migration_files()
all_migrations = []
for module_name in migration_files:
migrations = self.load_migration_from_file(module_name)
all_migrations.extend(migrations)
# 按版本号排序
all_migrations.sort(key=lambda m: self._version_to_tuple(m.version))
self.loaded_migrations = all_migrations
logger.info(f"✅ 成功加载 {len(all_migrations)} 个迁移")
return all_migrations
def _version_to_tuple(self, version: str) -> tuple:
"""将版本号转换为可比较的元组"""
try:
return tuple(map(int, version.split('.')))
except:
# 如果不是标准版本号格式,按字符串排序
return (version,)
def register_migrations(self, manager=None) -> int:
"""注册迁移到管理器"""
if manager is None:
manager = migration_manager
registered_count = 0
for migration in self.loaded_migrations:
try:
manager.register(migration)
registered_count += 1
logger.debug(f"注册迁移: {migration.version}")
except Exception as e:
logger.error(f"注册迁移 {migration.version} 失败: {e}")
continue
logger.info(f"✅ 成功注册 {registered_count} 个迁移到管理器")
return registered_count
def validate_migrations(self) -> bool:
"""验证迁移的完整性"""
logger.info("🔍 验证迁移完整性")
if not self.loaded_migrations:
logger.warning("没有加载任何迁移")
return True
# 检查版本号重复
versions = [m.version for m in self.loaded_migrations]
if len(versions) != len(set(versions)):
duplicates = [v for v in versions if versions.count(v) > 1]
logger.error(f"发现重复的迁移版本: {duplicates}")
return False
# 检查依赖关系
version_set = set(versions)
for migration in self.loaded_migrations:
for dep in migration.dependencies:
if dep not in version_set:
logger.error(f"迁移 {migration.version} 依赖的版本 {dep} 不存在")
return False
logger.info("✅ 迁移完整性验证通过")
return True
def get_migration_info(self) -> List[dict]:
"""获取迁移信息"""
return [
{
"version": m.version,
"description": m.description,
"dependencies": m.dependencies,
"checksum": m.get_checksum(),
"class_name": m.__class__.__name__
}
for m in self.loaded_migrations
]
# 全局加载器实例
migration_loader = MigrationLoader()
def load_migrations(manager=None) -> int:
"""
便捷函数:加载并注册所有迁移
Args:
manager: 迁移管理器实例,默认使用全局实例
Returns:
int: 成功注册的迁移数量
"""
try:
# 加载所有迁移
migrations = migration_loader.load_all_migrations()
# 验证迁移完整性
if not migration_loader.validate_migrations():
logger.error("迁移验证失败,停止注册")
return 0
# 注册到管理器
registered_count = migration_loader.register_migrations(manager)
logger.info(f"🎉 迁移加载完成!成功加载并注册 {registered_count} 个迁移")
return registered_count
except Exception as e:
logger.error(f"加载迁移过程中发生异常: {e}")
return 0
def reload_migrations(manager=None) -> int:
"""
重新加载所有迁移(开发环境使用)
Args:
manager: 迁移管理器实例,默认使用全局实例
Returns:
int: 成功注册的迁移数量
"""
if manager is None:
manager = migration_manager
# 清空现有迁移
manager.migrations.clear()
migration_loader.loaded_migrations.clear()
logger.info("🔄 重新加载迁移")
return load_migrations(manager)
"""
迁移系统的数据库表结构定义
定义用于跟踪迁移历史的schema_migrations表
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
Base = declarative_base()
class SchemaMigration(Base):
"""迁移记录表模型"""
__tablename__ = 'schema_migrations'
id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
version = Column(String(50), unique=True, nullable=False, comment='迁移版本号')
description = Column(Text, nullable=False, comment='迁移描述')
executed_at = Column(DateTime, nullable=False, default=func.current_timestamp(), comment='执行时间')
execution_time_ms = Column(Integer, comment='执行耗时(毫秒)')
checksum = Column(String(64), comment='迁移文件校验和')
rollback_sql = Column(Text, comment='回滚SQL语句')
created_by = Column(String(100), default='system', comment='创建者')
# 创建索引
__table_args__ = (
Index('idx_schema_migrations_version', 'version'),
Index('idx_schema_migrations_executed_at', 'executed_at'),
{'comment': '数据库schema迁移记录表'}
)
def __repr__(self):
return f"<SchemaMigration(version='{self.version}', description='{self.description}')>"
# 迁移表的创建SQL(用于手动创建或验证)
CREATE_MIGRATION_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS schema_migrations (
id SERIAL PRIMARY KEY,
version VARCHAR(50) UNIQUE NOT NULL,
description TEXT NOT NULL,
executed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
execution_time_ms INTEGER,
checksum VARCHAR(64),
rollback_sql TEXT,
created_by VARCHAR(100) DEFAULT 'system',
CONSTRAINT unique_version UNIQUE (version)
);
CREATE INDEX IF NOT EXISTS idx_schema_migrations_version
ON schema_migrations(version);
CREATE INDEX IF NOT EXISTS idx_schema_migrations_executed_at
ON schema_migrations(executed_at);
COMMENT ON TABLE schema_migrations IS '数据库schema迁移记录表';
COMMENT ON COLUMN schema_migrations.id IS '主键ID';
COMMENT ON COLUMN schema_migrations.version IS '迁移版本号';
COMMENT ON COLUMN schema_migrations.description IS '迁移描述';
COMMENT ON COLUMN schema_migrations.executed_at IS '执行时间';
COMMENT ON COLUMN schema_migrations.execution_time_ms IS '执行耗时(毫秒)';
COMMENT ON COLUMN schema_migrations.checksum IS '迁移文件校验和';
COMMENT ON COLUMN schema_migrations.rollback_sql IS '回滚SQL语句';
COMMENT ON COLUMN schema_migrations.created_by IS '创建者';
"""
# 查询已执行迁移的SQL
GET_EXECUTED_MIGRATIONS_SQL = """
SELECT version, description, executed_at, execution_time_ms
FROM schema_migrations
ORDER BY executed_at ASC;
"""
# 检查迁移是否已执行的SQL
CHECK_MIGRATION_EXECUTED_SQL = """
SELECT EXISTS (
SELECT 1 FROM schema_migrations
WHERE version = %s
) as executed;
"""
# 插入迁移记录的SQL
INSERT_MIGRATION_RECORD_SQL = """
INSERT INTO schema_migrations
(version, description, execution_time_ms, checksum, rollback_sql, created_by)
VALUES (%s, %s, %s, %s, %s, %s);
"""
# 删除迁移记录的SQL(用于回滚)
DELETE_MIGRATION_RECORD_SQL = """
DELETE FROM schema_migrations
WHERE version = %s;
"""
"""
迁移版本目录
存放所有具体的迁移文件
命名规范:
- 文件名格式: v{version}_{description}.py
- 版本号使用语义化版本号: 1.0.1, 1.1.0, 2.0.0
- 描述使用英文下划线分隔: add_user_avatar, update_score_table
示例:
- v1_0_1_add_user_avatar.py
- v1_0_2_update_score_index.py
- v1_1_0_add_notification_table.py
"""
"""
版本 1.0.1: 为用户表添加头像字段
这是一个安全的迁移示例,展示如何添加新字段而不影响现有数据
迁移内容:
- 为users表添加avatar_url字段 (VARCHAR(500))
- 为现有用户设置默认头像
- 添加字段注释
安全措施:
- 使用IF NOT EXISTS确保幂等性
- 迁移前后都有验证
- 提供完整的回滚方法
"""
from migrations.base import Migration, MigrationError
from sqlalchemy import text
from loguru import logger
class AddUserAvatarMigration(Migration):
"""为用户表添加头像字段的迁移"""
def __init__(self):
super().__init__(
version="1.0.1",
description="为用户表添加头像字段",
dependencies=[] # 无依赖,这是第一个迁移
)
async def validate_before_up(self, db) -> bool:
"""迁移前验证:确保users表存在且没有avatar_url字段"""
try:
# 检查users表是否存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'users'
);
"""))
if not result['exists']:
logger.error("users表不存在,无法执行迁移")
return False
# 检查avatar_url字段是否已存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'avatar_url'
);
"""))
if result['exists']:
logger.warning("avatar_url字段已存在,跳过迁移")
return False # 字段已存在,不需要迁移
# 检查是否有用户数据
user_count = await db.fetch_one(text("SELECT COUNT(*) as count FROM users"))
logger.info(f"当前用户数量: {user_count['count']}")
logger.info("✅ 迁移前验证通过")
return True
except Exception as e:
logger.error(f"迁移前验证失败: {e}")
return False
async def up(self, db) -> bool:
"""执行迁移:添加avatar_url字段"""
try:
logger.info("开始添加avatar_url字段到users表")
# 1. 添加字段(使用IF NOT EXISTS确保安全)
await db.execute(text("""
ALTER TABLE users
ADD COLUMN IF NOT EXISTS avatar_url VARCHAR(500);
"""))
logger.info("✅ avatar_url字段添加成功")
# 2. 添加字段注释
await db.execute(text("""
COMMENT ON COLUMN users.avatar_url IS '用户头像URL地址,最大长度500字符';
"""))
logger.info("✅ 字段注释添加成功")
# 3. 为现有用户设置默认头像(可选)
result = await db.execute(text("""
UPDATE users
SET avatar_url = '/assets/default-avatar.png'
WHERE avatar_url IS NULL;
"""))
logger.info(f"✅ 为现有用户设置默认头像,影响行数: {result}")
# 4. 创建索引(如果需要按头像查询)
await db.execute(text("""
CREATE INDEX IF NOT EXISTS idx_users_avatar_url
ON users(avatar_url)
WHERE avatar_url IS NOT NULL;
"""))
logger.info("✅ 头像字段索引创建成功")
logger.info("🎉 avatar_url字段迁移完成")
return True
except Exception as e:
logger.error(f"添加avatar_url字段失败: {e}")
raise MigrationError(f"迁移执行失败: {e}", self.version, e)
async def down(self, db) -> bool:
"""回滚迁移:移除avatar_url字段"""
try:
logger.info("开始回滚:从users表移除avatar_url字段")
# 1. 删除索引
await db.execute(text("""
DROP INDEX IF EXISTS idx_users_avatar_url;
"""))
logger.info("✅ 头像字段索引删除成功")
# 2. 删除字段
await db.execute(text("""
ALTER TABLE users DROP COLUMN IF EXISTS avatar_url;
"""))
logger.info("✅ avatar_url字段删除成功")
logger.info("🔄 avatar_url字段回滚完成")
return True
except Exception as e:
logger.error(f"回滚avatar_url字段失败: {e}")
raise MigrationError(f"迁移回滚失败: {e}", self.version, e)
async def validate_after_up(self, db) -> bool:
"""迁移后验证:确保字段已正确添加"""
try:
# 1. 验证字段是否存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'avatar_url'
);
"""))
if not result['exists']:
logger.error("avatar_url字段未成功添加")
return False
# 2. 验证字段类型和长度
result = await db.fetch_one(text("""
SELECT data_type, character_maximum_length, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'avatar_url';
"""))
if (result['data_type'] != 'character varying' or
result['character_maximum_length'] != 500 or
result['is_nullable'] != 'YES'):
logger.error(f"avatar_url字段属性不正确: {result}")
return False
# 3. 验证索引是否创建
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM pg_indexes
WHERE tablename = 'users'
AND indexname = 'idx_users_avatar_url'
);
"""))
if not result['exists']:
logger.warning("avatar_url字段索引未创建,但不影响迁移")
# 4. 验证现有用户是否有默认头像
result = await db.fetch_one(text("""
SELECT COUNT(*) as count
FROM users
WHERE avatar_url = '/assets/default-avatar.png';
"""))
logger.info(f"设置默认头像的用户数量: {result['count']}")
logger.info("✅ 迁移后验证通过")
return True
except Exception as e:
logger.error(f"迁移后验证失败: {e}")
return False
async def get_rollback_sql(self, db) -> str:
"""获取回滚SQL语句"""
return """
-- 回滚迁移 v1.0.1: 删除用户头像字段
DROP INDEX IF EXISTS idx_users_avatar_url;
ALTER TABLE users DROP COLUMN IF EXISTS avatar_url;
"""
"""
版本 1.0.2: 为用户表添加最后登录时间字段
这是第二个迁移示例,展示迁移系统的依赖管理和版本控制
迁移内容:
- 为users表添加last_login_at字段 (TIMESTAMP)
- 添加字段注释
- 创建索引以优化查询性能
安全措施:
- 依赖于v1.0.1迁移
- 使用IF NOT EXISTS确保幂等性
- 迁移前后都有验证
- 提供完整的回滚方法
"""
from migrations.base import Migration, MigrationError
from sqlalchemy import text
from loguru import logger
class AddUserLastLoginMigration(Migration):
"""为用户表添加最后登录时间字段的迁移"""
def __init__(self):
super().__init__(
version="1.0.2",
description="为用户表添加最后登录时间字段",
dependencies=["1.0.1"] # 依赖于头像字段迁移
)
async def validate_before_up(self, db) -> bool:
"""迁移前验证:确保users表存在且没有last_login_at字段"""
try:
# 检查users表是否存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = 'users'
);
"""))
if not result['exists']:
logger.error("users表不存在,无法执行迁移")
return False
# 检查依赖的avatar_url字段是否存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'avatar_url'
);
"""))
if not result['exists']:
logger.error("依赖的avatar_url字段不存在,无法执行迁移")
return False
# 检查last_login_at字段是否已存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'last_login_at'
);
"""))
if result['exists']:
logger.warning("last_login_at字段已存在,跳过迁移")
return False # 字段已存在,不需要迁移
logger.info("✅ 迁移前验证通过")
return True
except Exception as e:
logger.error(f"迁移前验证失败: {e}")
return False
async def up(self, db) -> bool:
"""执行迁移:添加last_login_at字段"""
try:
logger.info("开始添加last_login_at字段到users表")
# 1. 添加字段(使用IF NOT EXISTS确保安全)
await db.execute(text("""
ALTER TABLE users
ADD COLUMN IF NOT EXISTS last_login_at TIMESTAMP;
"""))
logger.info("✅ last_login_at字段添加成功")
# 2. 添加字段注释
await db.execute(text("""
COMMENT ON COLUMN users.last_login_at IS '用户最后登录时间';
"""))
logger.info("✅ 字段注释添加成功")
# 3. 创建索引以优化查询性能
await db.execute(text("""
CREATE INDEX IF NOT EXISTS idx_users_last_login_at
ON users(last_login_at)
WHERE last_login_at IS NOT NULL;
"""))
logger.info("✅ 最后登录时间索引创建成功")
logger.info("🎉 last_login_at字段迁移完成")
return True
except Exception as e:
logger.error(f"添加last_login_at字段失败: {e}")
raise MigrationError(f"迁移执行失败: {e}", self.version, e)
async def down(self, db) -> bool:
"""回滚迁移:移除last_login_at字段"""
try:
logger.info("开始回滚:从users表移除last_login_at字段")
# 1. 删除索引
await db.execute(text("""
DROP INDEX IF EXISTS idx_users_last_login_at;
"""))
logger.info("✅ 最后登录时间索引删除成功")
# 2. 删除字段
await db.execute(text("""
ALTER TABLE users DROP COLUMN IF EXISTS last_login_at;
"""))
logger.info("✅ last_login_at字段删除成功")
logger.info("🔄 last_login_at字段回滚完成")
return True
except Exception as e:
logger.error(f"回滚last_login_at字段失败: {e}")
raise MigrationError(f"迁移回滚失败: {e}", self.version, e)
async def validate_after_up(self, db) -> bool:
"""迁移后验证:确保字段已正确添加"""
try:
# 1. 验证字段是否存在
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'last_login_at'
);
"""))
if not result['exists']:
logger.error("last_login_at字段未成功添加")
return False
# 2. 验证字段类型
result = await db.fetch_one(text("""
SELECT data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
AND table_name = 'users'
AND column_name = 'last_login_at';
"""))
if (result['data_type'] != 'timestamp without time zone' or
result['is_nullable'] != 'YES'):
logger.error(f"last_login_at字段属性不正确: {result}")
return False
# 3. 验证索引是否创建
result = await db.fetch_one(text("""
SELECT EXISTS (
SELECT FROM pg_indexes
WHERE tablename = 'users'
AND indexname = 'idx_users_last_login_at'
);
"""))
if not result['exists']:
logger.warning("last_login_at字段索引未创建,但不影响迁移")
logger.info("✅ 迁移后验证通过")
return True
except Exception as e:
logger.error(f"迁移后验证失败: {e}")
return False
async def get_rollback_sql(self, db) -> str:
"""获取回滚SQL语句"""
return """
-- 回滚迁移 v1.0.2: 删除用户最后登录时间字段
DROP INDEX IF EXISTS idx_users_last_login_at;
ALTER TABLE users DROP COLUMN IF EXISTS last_login_at;
"""
......@@ -70,7 +70,6 @@ class InstitutionImageCreate(BaseModel):
"""创建机构图片模型"""
id: str
url: str
upload_time: datetime
# 机构相关模型
......@@ -171,6 +170,10 @@ class UserStatsItem(BaseModel):
performanceScore: float
institutions: List[Dict[str, Any]]
class Config:
# 允许任意类型,提高兼容性
arbitrary_types_allowed = True
class MonthlyHistoryCreate(BaseModel):
"""创建月度历史记录模型"""
......@@ -180,6 +183,7 @@ class MonthlyHistoryCreate(BaseModel):
total_institutions: int = Field(..., ge=0, description="总机构数")
total_images: int = Field(..., ge=0, description="总图片数")
user_stats: List[UserStatsItem] = Field(..., description="用户统计数据")
institutions_data: Optional[List[Dict[str, Any]]] = Field(None, description="机构图片数据")
class MonthlyHistoryResponse(BaseModel):
......@@ -191,8 +195,9 @@ class MonthlyHistoryResponse(BaseModel):
total_institutions: int
total_images: int
user_stats: List[UserStatsItem]
institutions_data: Optional[List[Dict[str, Any]]] = Field(None, description="机构图片数据")
created_at: datetime
class Config:
from_attributes = True
......
......@@ -31,3 +31,6 @@ passlib[bcrypt]>=1.7.4
# 开发和调试工具
python-dotenv>=1.0.0
# 定时任务调度
apscheduler>=3.10.4
......@@ -59,25 +59,34 @@ async def get_institution_with_images(institution_id: str, db: DatabaseManager):
)
@router.get("/", response_model=List[InstitutionResponse], summary="获取所有机构")
@router.get("/", response_model=List[InstitutionResponse], summary="获取机构列表")
async def get_all_institutions(
db: DatabaseManager = Depends(get_database),
current_user: UserResponse = Depends(get_current_active_user)
):
"""获取所有机构列表(包含图片信息)"""
"""获取机构列表(管理员获取所有机构,普通用户只获取自己负责的机构)"""
try:
# 获取所有机构
query = institutions_table.select().order_by(institutions_table.c.created_at)
# 根据用户角色决定查询范围
if current_user.role == 'admin':
# 管理员获取所有机构
query = institutions_table.select().order_by(institutions_table.c.created_at)
else:
# 普通用户只获取自己负责的机构
query = institutions_table.select().where(
institutions_table.c.owner_id == current_user.id
).order_by(institutions_table.c.created_at)
institutions = await db.fetch_all(query)
result = []
for institution in institutions:
inst_with_images = await get_institution_with_images(institution["id"], db)
if inst_with_images:
result.append(inst_with_images)
logger.info(f"用户 {current_user.name}({current_user.role}) 获取到 {len(result)} 个机构")
return result
except Exception as e:
logger.error(f"获取机构列表失败: {e}")
raise HTTPException(status_code=500, detail="获取机构列表失败")
......@@ -268,37 +277,52 @@ async def add_institution_image(
):
"""为机构添加图片"""
try:
logger.info(f"开始添加图片到机构 {institution_id}")
logger.info(f"图片数据: id={image_data.id}, url长度={len(image_data.url)}")
# 检查机构是否存在
existing_inst = await db.fetch_one(
institutions_table.select().where(institutions_table.c.id == institution_id)
)
if not existing_inst:
logger.error(f"机构不存在: {institution_id}")
raise HTTPException(status_code=404, detail="机构不存在")
logger.info(f"找到机构: {existing_inst['name']}")
# 检查图片ID是否已存在
existing_image = await db.fetch_one(
institution_images_table.select().where(institution_images_table.c.id == image_data.id)
)
if existing_image:
logger.error(f"图片ID已存在: {image_data.id}")
raise HTTPException(status_code=400, detail="图片ID已存在")
# 使用当前时间作为上传时间
upload_time = datetime.now()
logger.info(f"使用当前时间作为上传时间: {upload_time}")
# 插入图片记录
query = institution_images_table.insert().values(
id=image_data.id,
institution_id=institution_id,
url=image_data.url,
upload_time=image_data.upload_time
upload_time=upload_time
)
logger.info("准备执行数据库插入操作")
await db.execute(query)
logger.info("图片记录插入成功")
return BaseResponse(message="图片添加成功")
except HTTPException:
except HTTPException as e:
logger.error(f"HTTP异常: {e.detail}")
raise
except Exception as e:
logger.error(f"添加机构图片失败: {e}")
raise HTTPException(status_code=500, detail="添加图片失败")
logger.error(f"添加机构图片失败: {type(e).__name__}: {str(e)}")
logger.error(f"详细错误信息: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"添加图片失败: {str(e)}")
@router.delete("/{institution_id}/images/{image_id}", response_model=BaseResponse, summary="删除机构图片")
......
......@@ -421,3 +421,144 @@ async def clear_database(
except Exception as e:
logger.error(f"清空数据库失败: {e}")
raise HTTPException(status_code=500, detail="清空数据库失败")
# ==================== Schema 迁移管理 API ====================
@router.get("/schema/status", summary="获取数据库schema状态")
async def get_schema_status(
db: DatabaseManager = Depends(get_database)
):
"""获取数据库schema迁移状态"""
try:
from migrations.manager import migration_manager
status = await migration_manager.get_migration_status()
return {
"success": True,
"data": status,
"message": "获取schema状态成功"
}
except Exception as e:
logger.error(f"获取schema状态失败: {e}")
raise HTTPException(status_code=500, detail=f"获取schema状态失败: {str(e)}")
@router.post("/schema/migrate", summary="手动执行数据库schema迁移")
async def manual_migrate_schema(
db: DatabaseManager = Depends(get_database)
):
"""手动执行数据库schema迁移到最新版本"""
try:
from migrations.manager import migration_manager
logger.info("开始手动执行数据库schema迁移")
result = await migration_manager.migrate_to_latest()
if result["success"]:
return {
"success": True,
"data": {
"executed_migrations": result["executed_migrations"],
"total_pending": result.get("total_pending", 0),
"message": result["message"]
},
"message": "数据库迁移成功"
}
else:
raise HTTPException(
status_code=500,
detail={
"message": result["message"],
"failed_migrations": result.get("failed_migrations", [])
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"手动执行迁移失败: {e}")
raise HTTPException(status_code=500, detail=f"迁移执行失败: {str(e)}")
@router.get("/schema/migrations", summary="获取所有迁移信息")
async def get_all_migrations(
db: DatabaseManager = Depends(get_database)
):
"""获取所有迁移的详细信息"""
try:
from migrations.manager import migration_manager
from migrations.loader import migration_loader
# 获取迁移状态
status = await migration_manager.get_migration_status()
# 获取迁移详细信息
migration_info = migration_loader.get_migration_info()
return {
"success": True,
"data": {
"status": status,
"migrations": migration_info
},
"message": "获取迁移信息成功"
}
except Exception as e:
logger.error(f"获取迁移信息失败: {e}")
raise HTTPException(status_code=500, detail=f"获取迁移信息失败: {str(e)}")
@router.post("/schema/reload", summary="重新加载迁移文件")
async def reload_migrations(
db: DatabaseManager = Depends(get_database)
):
"""重新加载迁移文件(开发环境使用)"""
try:
from migrations.loader import reload_migrations
logger.info("开始重新加载迁移文件")
migration_count = reload_migrations()
return {
"success": True,
"data": {
"loaded_migrations": migration_count
},
"message": f"成功重新加载 {migration_count} 个迁移文件"
}
except Exception as e:
logger.error(f"重新加载迁移文件失败: {e}")
raise HTTPException(status_code=500, detail=f"重新加载失败: {str(e)}")
@router.get("/schema/version", summary="获取当前数据库schema版本")
async def get_current_schema_version(
db: DatabaseManager = Depends(get_database)
):
"""获取当前数据库schema版本"""
try:
from migrations.manager import migration_manager
executed_migrations = await migration_manager.get_executed_migrations()
# 获取最新执行的迁移版本
latest_version = executed_migrations[-1] if executed_migrations else "0.0.0"
return {
"success": True,
"data": {
"current_version": latest_version,
"executed_migrations": executed_migrations,
"total_executed": len(executed_migrations)
},
"message": "获取当前版本成功"
}
except Exception as e:
logger.error(f"获取当前版本失败: {e}")
raise HTTPException(status_code=500, detail=f"获取版本失败: {str(e)}")
"""
定时任务调度器
负责执行月度自动保存等定时任务
"""
import asyncio
from datetime import datetime, timedelta
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from loguru import logger
from database import database, monthly_history_table, institution_images_table
from routers.users import users_table
from routers.institutions import institutions_table
class MonthlyScheduler:
"""月度定时任务调度器"""
def __init__(self):
self.scheduler = AsyncIOScheduler()
self.is_running = False
async def start(self):
"""启动调度器"""
if not self.is_running:
# 添加月度自动保存任务 - 每月1号0点执行
self.scheduler.add_job(
self.auto_save_monthly_stats,
CronTrigger(day=1, hour=0, minute=0),
id='monthly_auto_save',
name='月度自动保存统计数据',
replace_existing=True
)
# 添加测试任务 - 每分钟执行一次(仅用于测试)
self.scheduler.add_job(
self.test_scheduler,
CronTrigger(minute='*'),
id='test_scheduler',
name='测试调度器',
replace_existing=True
)
self.scheduler.start()
self.is_running = True
logger.info("🕐 月度定时任务调度器已启动")
logger.info("📅 月度自动保存任务已设置:每月1号0点执行")
async def stop(self):
"""停止调度器"""
if self.is_running:
self.scheduler.shutdown()
self.is_running = False
logger.info("🛑 月度定时任务调度器已停止")
async def test_scheduler(self):
"""测试调度器是否正常工作"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"⏰ 定时任务测试 - 当前时间: {current_time}")
async def auto_save_monthly_stats(self):
"""自动保存上月统计数据"""
try:
logger.info("🚀 开始执行月度自动保存任务...")
# 计算上个月的月份标识
current_date = datetime.now()
if current_date.month == 1:
# 如果当前是1月,上个月是去年12月
last_month_date = current_date.replace(year=current_date.year - 1, month=12)
else:
# 其他情况,上个月就是当前月份-1
last_month_date = current_date.replace(month=current_date.month - 1)
month_key = f"{last_month_date.year}-{str(last_month_date.month).zfill(2)}"
logger.info(f"📊 准备保存 {month_key} 月份的统计数据")
# 检查该月份是否已有记录
existing_history = await database.fetch_one(
monthly_history_table.select().where(
monthly_history_table.c.month == month_key
)
)
if existing_history:
logger.info(f"📋 {month_key} 月份数据已存在,跳过自动保存")
return
# 获取所有普通用户
users_query = users_table.select().where(users_table.c.role == 'user')
users = await database.fetch_all(users_query)
# 获取所有机构
institutions_query = institutions_table.select()
institutions = await database.fetch_all(institutions_query)
# 获取所有机构图片
images_query = institution_images_table.select()
all_images = await database.fetch_all(images_query)
# 为每个机构添加图片数据
institutions_with_images = []
for inst in institutions:
inst_images = [img for img in all_images if img['institution_id'] == inst['id']]
inst_dict = dict(inst)
inst_dict['images'] = [
{
'id': img['id'],
'url': img['url'],
'upload_time': img['upload_time'].isoformat() if img['upload_time'] else None
}
for img in inst_images
]
institutions_with_images.append(inst_dict)
# 构建用户统计数据
user_stats = []
for user in users:
# 获取用户负责的机构
user_institutions = [inst for inst in institutions_with_images if inst['owner_id'] == user['id']]
# 计算统计数据
institution_count = len(user_institutions)
total_images = sum(len(inst['images']) for inst in user_institutions)
# 简化的分数计算(实际应用中可能需要更复杂的逻辑)
interaction_score = min(total_images * 0.5, 10.0) # 每张图片0.5分,最高10分
performance_score = min(total_images * 2.5, 50.0) # 每张图片2.5分,最高50分
user_stat = {
'userId': user['id'],
'userName': user['name'],
'institutionCount': institution_count,
'interactionScore': interaction_score,
'performanceScore': performance_score,
'institutions': [
{
'id': inst['id'],
'name': inst['name'],
'imageCount': len(inst['images'])
}
for inst in user_institutions
]
}
user_stats.append(user_stat)
# 构建机构数据
institutions_data = [
{
'id': inst['id'],
'institutionId': inst['institution_id'],
'name': inst['name'],
'ownerId': inst['owner_id'],
'images': inst['images']
}
for inst in institutions_with_images
]
# 计算总计数据
total_users = len(users)
total_institutions = len(institutions_with_images)
total_images = sum(len(inst['images']) for inst in institutions_with_images)
# 保存到数据库
insert_query = monthly_history_table.insert().values(
month=month_key,
save_time=datetime.now(),
total_users=total_users,
total_institutions=total_institutions,
total_images=total_images,
user_stats=user_stats,
institutions_data=institutions_data
)
result = await database.execute(insert_query)
logger.info(f"✅ {month_key} 月份统计数据自动保存成功")
logger.info(f"📈 保存数据概览: 用户 {total_users} 个, 机构 {total_institutions} 个, 图片 {total_images} 张")
return True
except Exception as e:
logger.error(f"❌ 月度自动保存失败: {e}")
logger.error(f"错误详情: {str(e)}")
import traceback
logger.error(f"错误堆栈: {traceback.format_exc()}")
return False
async def trigger_manual_save(self, target_month: str = None):
"""手动触发保存指定月份的数据(用于测试)"""
try:
if target_month:
logger.info(f"🔧 手动触发保存 {target_month} 月份数据")
# 这里可以添加保存指定月份数据的逻辑
# 暂时使用当前的自动保存逻辑
result = await self.auto_save_monthly_stats()
return result
except Exception as e:
logger.error(f"❌ 手动触发保存失败: {e}")
return False
# 全局调度器实例
monthly_scheduler = MonthlyScheduler()
#!/usr/bin/env python3
"""
数据库表结构更新脚本
为 monthly_history 表添加 institutions_data 字段
"""
import asyncio
import asyncpg
from loguru import logger
async def update_database():
"""更新数据库表结构"""
try:
# 连接数据库
conn = await asyncpg.connect(
host="localhost",
port=5432,
user="performance_user",
password="performance_pass",
database="performance_db"
)
logger.info("✅ 数据库连接成功")
# 检查字段是否已存在
check_query = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'monthly_history'
AND column_name = 'institutions_data'
"""
result = await conn.fetch(check_query)
if result:
logger.info("✅ institutions_data 字段已存在,无需更新")
else:
# 添加字段
alter_query = """
ALTER TABLE monthly_history
ADD COLUMN institutions_data JSONB
"""
await conn.execute(alter_query)
logger.info("✅ 成功添加 institutions_data 字段")
# 添加注释
comment_query = """
COMMENT ON COLUMN monthly_history.institutions_data
IS '机构图片数据,包含完整的机构和图片信息'
"""
await conn.execute(comment_query)
logger.info("✅ 成功添加字段注释")
# 验证更新结果
verify_query = """
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'monthly_history'
AND column_name = 'institutions_data'
"""
result = await conn.fetch(verify_query)
if result:
row = result[0]
logger.info(f"✅ 字段验证成功: {row['column_name']} ({row['data_type']}, nullable: {row['is_nullable']})")
else:
logger.error("❌ 字段验证失败")
await conn.close()
logger.info("✅ 数据库连接已关闭")
except Exception as e:
logger.error(f"❌ 数据库更新失败: {e}")
raise
if __name__ == "__main__":
asyncio.run(update_database())
-- 为月度历史统计表添加机构图片数据字段
-- 执行时间:2025-01-29
-- 添加 institutions_data 字段到 monthly_history 表
ALTER TABLE monthly_history
ADD COLUMN IF NOT EXISTS institutions_data JSONB;
-- 添加字段注释
COMMENT ON COLUMN monthly_history.institutions_data IS '机构图片数据,包含完整的机构和图片信息';
-- 验证字段是否添加成功
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'monthly_history'
AND column_name = 'institutions_data';
......@@ -31,7 +31,7 @@ services:
restart: unless-stopped
environment:
- DATABASE_URL=postgresql://performance_user:performance_pass@postgres:5432/performance_db
- CORS_ORIGINS=http://localhost:5173,http://localhost:8080,http://localhost:4001
- CORS_ORIGINS=*
- API_HOST=0.0.0.0
- API_PORT=8000
ports:
......
......@@ -4,8 +4,8 @@
* 替换原有的 localStorage 存储机制
*/
// API 基础配置
const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:8000'
// API 基础配置 - 使用相对路径,通过Vite代理访问后端
const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || ''
/**
* HTTP 请求封装类
......@@ -280,9 +280,29 @@ export const userApi = {
return apiClient.delete(`/api/users/${userId}`)
},
// 用户登录
// 用户登录 - 特殊处理,不进行token刷新
async login(loginData) {
return apiClient.post('/api/users/login', loginData)
const url = `${apiClient.baseURL}/api/users/login`
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(loginData)
})
if (!response.ok) {
const errorData = await response.json().catch(() => ({}))
throw new Error(errorData.detail || `HTTP ${response.status}: ${response.statusText}`)
}
return await response.json()
} catch (error) {
console.error('登录请求失败:', error)
throw error
}
},
// 刷新token
......@@ -429,6 +449,11 @@ export const historyApi = {
return apiClient.delete(`/api/history/${month}`)
},
// 清空所有历史记录
async clearAll() {
return apiClient.delete('/api/history')
},
// 获取历史统计摘要
async getSummary() {
return apiClient.get('/api/history/stats/summary')
......@@ -437,7 +462,9 @@ export const historyApi = {
// 清理旧历史数据
async cleanup(keepMonths = 12) {
return apiClient.post('/api/history/cleanup', { keep_months: keepMonths })
}
},
}
/**
......
......@@ -25,19 +25,10 @@ export const useAuthStore = defineStore('auth', () => {
* 登录后加载数据
*/
const loadDataAfterLogin = async () => {
try {
const dataStore = useDataStore()
console.log('📊 登录成功,开始加载数据...')
const dataLoaded = await dataStore.loadData()
if (dataLoaded) {
console.log('✅ 数据加载成功')
} else {
console.warn('⚠️ 数据加载失败,使用离线模式')
}
} catch (error) {
console.error('❌ 登录后数据加载失败:', error)
}
const dataStore = useDataStore()
console.log('📊 登录成功,开始加载数据...')
await dataStore.loadData()
console.log('✅ 数据加载成功')
}
/**
......@@ -59,6 +50,13 @@ export const useAuthStore = defineStore('auth', () => {
// 保存tokens到API客户端
apiClient.saveTokens(response.access_token, response.refresh_token)
// 验证token是否正确保存
const savedToken = apiClient.getAccessToken()
if (!savedToken) {
throw new Error('Token保存失败')
}
console.log('🔑 Token已保存,准备加载数据...')
// 登录成功后加载数据
await loadDataAfterLogin()
......@@ -70,7 +68,7 @@ export const useAuthStore = defineStore('auth', () => {
return false
} catch (error) {
console.error('登录请求失败:', error)
return false
throw error
}
}
......
......@@ -82,12 +82,11 @@ const loginForm = reactive({
// 表单验证规则
const loginRules = {
phone: [
{ required: true, message: '请输入手机号', trigger: 'blur' },
{ min: 3, message: '手机号不能少于3位', trigger: 'blur' }
{ required: true, message: '请输入手机号', trigger: 'blur' }
],
password: [
{ required: true, message: '请输入密码', trigger: 'blur' },
{ min: 6, message: '密码不能少于6位', trigger: 'blur' }
{ min: 1, message: '请输入密码', trigger: 'blur' }
]
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment