Commit 162d4220 by luoqi

feat:测试数据库迁移

parent 0741776b
No preview for this file type
......@@ -28,7 +28,7 @@ def create_app(config_name=None):
# 初始化扩展
db.init_app(app)
migrate = Migrate(app, db)
migrate = Migrate(app, db, directory='database/migrations')
# 注册蓝图
from routes import register_blueprints
......
This source diff could not be displayed because it is too large. You can view the blob instead.
# A generic, single database configuration.
[alembic]
# template used to generate migration files
# file_template = %%(rev)s_%%(slug)s
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic,flask_migrate
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[logger_flask_migrate]
level = INFO
handlers =
qualname = flask_migrate
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
import logging
from logging.config import fileConfig
from flask import current_app
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
logger = logging.getLogger('alembic.env')
def get_engine():
try:
# this works with Flask-SQLAlchemy<3 and Alchemical
return current_app.extensions['migrate'].db.get_engine()
except (TypeError, AttributeError):
# this works with Flask-SQLAlchemy>=3
return current_app.extensions['migrate'].db.engine
def get_engine_url():
try:
return get_engine().url.render_as_string(hide_password=False).replace(
'%', '%%')
except AttributeError:
return str(get_engine().url).replace('%', '%%')
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option('sqlalchemy.url', get_engine_url())
target_db = current_app.extensions['migrate'].db
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def get_metadata():
if hasattr(target_db, 'metadatas'):
return target_db.metadatas[None]
return target_db.metadata
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=get_metadata(), literal_binds=True
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
# this callback is used to prevent an auto-migration from being generated
# when there are no changes to the schema
# reference: http://alembic.zzzcomputing.com/en/latest/cookbook.html
def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
logger.info('No changes in schema detected.')
conf_args = current_app.extensions['migrate'].configure_args
if conf_args.get("process_revision_directives") is None:
conf_args["process_revision_directives"] = process_revision_directives
connectable = get_engine()
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=get_metadata(),
**conf_args
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
-- 患者画像回访话术系统数据库初始化脚本
-- 更新日期: 2025-09-06
-- 说明: 此文件与 database/models.py 保持同步,包含完整的表结构和初始数据
-- 设置字符集
SET NAMES utf8mb4;
......@@ -15,21 +17,27 @@ USE `callback_system`;
CREATE TABLE IF NOT EXISTS `callback_records` (
`record_id` int NOT NULL AUTO_INCREMENT COMMENT '记录ID',
`case_number` varchar(50) NOT NULL COMMENT '病历号',
`callback_methods` json DEFAULT NULL COMMENT '回访方式',
`callback_record` text COMMENT '回访记录',
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`status` varchar(20) DEFAULT 'pending' COMMENT '回访状态',
`callback_date` datetime DEFAULT NULL COMMENT '回访时间',
`callback_staff` varchar(100) DEFAULT NULL COMMENT '回访人员',
`patient_name` varchar(100) DEFAULT NULL COMMENT '患者姓名',
`patient_phone` varchar(20) DEFAULT NULL COMMENT '患者电话',
`clinic_name` varchar(100) DEFAULT NULL COMMENT '诊所名称',
`callback_methods` json NOT NULL COMMENT '回访方式(JSON格式)',
`callback_record` text NOT NULL COMMENT '回访记录内容',
`callback_result` varchar(50) NOT NULL COMMENT '回访结果(成功/不成功/放弃回访)',
`next_appointment_time` varchar(50) DEFAULT NULL COMMENT '下次预约时间',
`failure_reason` varchar(200) DEFAULT NULL COMMENT '不成功原因',
`failure_note` text COMMENT '不成功备注',
`abandon_reason` varchar(200) DEFAULT NULL COMMENT '放弃回访原因',
`abandon_note` text COMMENT '放弃回访备注',
`ai_feedback_type` varchar(200) DEFAULT NULL COMMENT 'AI反馈类型',
`ai_feedback_note` text COMMENT 'AI反馈备注',
`operator` varchar(100) NOT NULL COMMENT '操作员',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`callback_status` varchar(50) NOT NULL DEFAULT '已回访' COMMENT '回访状态',
`failure_reason_note` text COMMENT '不成功备注',
`abandon_reason_note` text COMMENT '放弃回访备注',
PRIMARY KEY (`record_id`),
UNIQUE KEY `case_number` (`case_number`),
KEY `idx_status` (`status`),
KEY `idx_callback_date` (`callback_date`),
KEY `idx_clinic_name` (`clinic_name`)
KEY `idx_case_number` (`case_number`),
KEY `idx_create_time` (`create_time`),
KEY `idx_operator` (`operator`),
KEY `idx_callback_result` (`callback_result`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='回访记录表';
-- 创建用户管理表
......@@ -37,6 +45,7 @@ CREATE TABLE IF NOT EXISTS `users` (
`user_id` int NOT NULL AUTO_INCREMENT COMMENT '用户ID',
`username` varchar(50) NOT NULL COMMENT '用户名',
`password_hash` varchar(255) NOT NULL COMMENT '密码哈希',
`salt` varchar(32) DEFAULT NULL,
`user_type` varchar(20) DEFAULT 'user' COMMENT '用户类型',
`clinic_access` json DEFAULT NULL COMMENT '诊所访问权限',
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
......@@ -90,6 +99,7 @@ CREATE TABLE IF NOT EXISTS `patients` (
`patient_phone` varchar(20) DEFAULT NULL COMMENT '患者电话',
`gender` varchar(10) DEFAULT NULL COMMENT '性别',
`age` int DEFAULT NULL COMMENT '年龄',
`clinic_id` int DEFAULT NULL COMMENT '诊所ID',
`clinic_name` varchar(100) DEFAULT NULL COMMENT '诊所名称',
`diagnosis` json DEFAULT NULL COMMENT '诊断信息',
`treatment_date` date DEFAULT NULL COMMENT '治疗日期',
......@@ -99,7 +109,9 @@ CREATE TABLE IF NOT EXISTS `patients` (
UNIQUE KEY `case_number` (`case_number`),
KEY `idx_patient_name` (`patient_name`),
KEY `idx_clinic_name` (`clinic_name`),
KEY `idx_treatment_date` (`treatment_date`)
KEY `idx_treatment_date` (`treatment_date`),
KEY `fk_patients_clinic_id` (`clinic_id`),
CONSTRAINT `fk_patients_clinic_id` FOREIGN KEY (`clinic_id`) REFERENCES `clinics` (`clinic_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='患者信息表';
-- 创建操作日志表
......@@ -120,5 +132,26 @@ CREATE TABLE IF NOT EXISTS `operation_logs` (
KEY `idx_created_at` (`created_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='操作日志表';
-- 添加外键约束(如果不存在)
-- 注意:由于使用了 IF NOT EXISTS,如果表已存在,外键约束可能需要单独添加
-- 重置外键检查
SET FOREIGN_KEY_CHECKS = 1;
-- ==========================================
-- 数据完整性说明
-- ==========================================
-- 1. 所有表都使用 utf8mb4 字符集,支持完整的 Unicode 字符
-- 2. 时间戳字段自动更新,确保数据追踪
-- 3. 外键约束确保数据一致性
-- 4. 索引优化查询性能
-- 5. 初始数据包含系统运行必需的基础信息
-- ==========================================
-- 维护说明
-- ==========================================
-- 此文件应与 database/models.py 保持同步
-- 如需修改表结构,请同时更新:
-- 1. database/models.py 中的模型定义
-- 2. 生成新的 Flask-Migrate 迁移文件
-- 3. 更新此 init.sql 文件
\ No newline at end of file
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
"""Production baseline migration
Revision ID: 001_production_baseline
Revises:
Create Date: 2025-09-06 12:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '001_production_baseline'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
# This is a baseline migration for existing production database
# All tables already exist, so this migration does nothing
pass
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
# This is a baseline migration, cannot downgrade
pass
# ### end Alembic commands ###
"""Add new features: callback fields and clinic_id
Revision ID: 9bd0518d29b3
Revises: 001_production_baseline
Create Date: 2025-09-06 14:26:24.719045
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '9bd0518d29b3'
down_revision = '001_production_baseline'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
# Add new fields to callback_records table
with op.batch_alter_table('callback_records', schema=None) as batch_op:
batch_op.add_column(sa.Column('next_appointment_time', sa.Text(), nullable=True, comment='下次预约时间'))
batch_op.add_column(sa.Column('failure_reason', sa.Text(), nullable=True, comment='不成功的原因'))
batch_op.add_column(sa.Column('abandon_reason', sa.Text(), nullable=True, comment='放弃回访的原因'))
batch_op.add_column(sa.Column('ai_feedback_type', sa.String(length=100), nullable=True, comment='AI错误反馈类型'))
batch_op.add_column(sa.Column('failure_reason_note', sa.Text(), nullable=True, comment='不成功备注'))
batch_op.add_column(sa.Column('abandon_reason_note', sa.Text(), nullable=True, comment='放弃回访备注'))
batch_op.add_column(sa.Column('ai_feedback_note', sa.Text(), nullable=True, comment='AI反馈备注'))
batch_op.add_column(sa.Column('callback_status', sa.String(length=50), nullable=False, server_default='已回访', comment='回访状态'))
# Add clinic_id to patients table
with op.batch_alter_table('patients', schema=None) as batch_op:
batch_op.add_column(sa.Column('clinic_id', sa.Integer(), nullable=True, comment='诊所ID'))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
# Remove clinic_id from patients table
with op.batch_alter_table('patients', schema=None) as batch_op:
batch_op.drop_column('clinic_id')
# Remove new fields from callback_records table
with op.batch_alter_table('callback_records', schema=None) as batch_op:
batch_op.drop_column('callback_status')
batch_op.drop_column('ai_feedback_note')
batch_op.drop_column('abandon_reason_note')
batch_op.drop_column('failure_reason_note')
batch_op.drop_column('ai_feedback_type')
batch_op.drop_column('abandon_reason')
batch_op.drop_column('failure_reason')
batch_op.drop_column('next_appointment_time')
# ### end Alembic commands ###
......@@ -90,7 +90,7 @@ class Patient(db.Model):
def to_dict(self):
"""转换为字典格式,兼容原有的JSON数据格式"""
return {
base_dict = {
'姓名': self.patient_name,
'年龄': self.age,
'性别': self.gender,
......@@ -103,6 +103,13 @@ class Patient(db.Model):
'更新时间': self.updated_at.isoformat() if self.updated_at else None
}
# 展开diagnosis JSON字段中的内容,使其与原JSON文件格式兼容
if self.diagnosis and isinstance(self.diagnosis, dict):
# 直接将diagnosis中的所有字段添加到返回字典中
base_dict.update(self.diagnosis)
return base_dict
def __repr__(self):
return f'<Patient {self.patient_name} ({self.case_number})>'
......@@ -167,6 +174,32 @@ class CallbackRecord(db.Model):
def __repr__(self):
return f'<CallbackRecord {self.case_number} - {self.callback_result}>'
class OperationLog(db.Model):
"""操作日志表"""
__tablename__ = 'operation_logs'
log_id = db.Column(db.Integer, primary_key=True, comment='日志ID')
user_id = db.Column(db.Integer, comment='用户ID')
username = db.Column(db.String(50), comment='用户名')
operation = db.Column(db.String(100), nullable=False, comment='操作类型')
target_type = db.Column(db.String(50), comment='目标类型')
target_id = db.Column(db.String(100), comment='目标ID')
operation_details = db.Column(db.JSON, comment='操作详情')
ip_address = db.Column(db.String(45), comment='IP地址')
user_agent = db.Column(db.String(255), comment='用户代理')
created_at = db.Column(db.DateTime, default=datetime.utcnow, comment='创建时间')
# 索引
__table_args__ = (
db.Index('idx_user_id', 'user_id'),
db.Index('idx_operation', 'operation'),
db.Index('idx_created_at', 'created_at'),
{'comment': '操作日志表'}
)
def __repr__(self):
return f'<OperationLog {self.username} - {self.operation}>'
# 数据库初始化函数
def init_db(app):
"""初始化数据库"""
......
......@@ -50,51 +50,21 @@ log_info "📋 第一步:等待数据库就绪..."
python database/scripts/wait-for-db.py
log_success "数据库连接就绪"
# 第二步:初始化Flask-Migrate(如果需要)
log_info "📋 第二步:检查Flask-Migrate状态..."
# 检查migrations目录是否存在
if [ ! -d "migrations" ]; then
log_info "初始化Flask-Migrate..."
export FLASK_APP=app.py
flask db init
log_success "Flask-Migrate初始化完成"
else
log_info "Flask-Migrate已初始化"
fi
# 第三步:检查是否需要生成初始迁移
log_info "📋 第三步:检查迁移文件..."
# 第二步:执行数据库迁移
log_info "📋 第二步:执行数据库迁移..."
# 检查是否有迁移文件
MIGRATION_FILES=$(find migrations/versions -name "*.py" 2>/dev/null | wc -l)
# 使用专门的迁移管理器
python database/scripts/migration_manager.py
if [ "$MIGRATION_FILES" -eq 0 ]; then
log_info "生成初始迁移文件..."
export FLASK_APP=app.py
flask db migrate -m "Initial migration"
log_success "初始迁移文件生成完成"
if [ $? -eq 0 ]; then
log_success "数据库迁移完成"
else
log_info "发现 $MIGRATION_FILES 个迁移文件"
log_error "数据库迁移失败"
exit 1
fi
# 第四步:执行数据库迁移
log_info "📋 第四步:执行数据库迁移..."
export FLASK_APP=app.py
# 检查数据库当前状态
log_info "检查数据库迁移状态..."
flask db current || log_warning "无法获取当前迁移状态(可能是首次运行)"
# 执行迁移
log_info "执行数据库迁移..."
flask db upgrade
log_success "数据库迁移完成"
# 第五步:数据导入(如果需要)
log_info "📋 第五步:检查数据导入需求..."
# 第三步:数据导入(如果需要)
log_info "📋 第三步:检查数据导入需求..."
# 检查是否有患者数据需要导入
if [ -f "database/scripts/safe_import_patients.py" ] && [ -d "data/patients/clinics" ]; then
......@@ -105,8 +75,8 @@ else
log_info "无需数据导入"
fi
# 第步:启动Flask应用
log_info "📋 第步:启动Flask应用..."
# 第步:启动Flask应用
log_info "📋 第步:启动Flask应用..."
log_info "🌐 应用将在端口 5000 启动"
log_info "🔗 健康检查端点: http://localhost:5000/api/health"
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
数据库迁移管理器
负责处理Flask-Migrate的初始化、迁移生成和执行
"""
import os
import sys
import subprocess
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root))
def run_command(command, description):
"""执行命令并处理错误"""
print(f"🔄 {description}...")
try:
result = subprocess.run(
command,
shell=True,
check=True,
capture_output=True,
text=True,
cwd=project_root
)
print(f"✅ {description} 成功")
if result.stdout:
print(f"输出: {result.stdout}")
return True
except subprocess.CalledProcessError as e:
print(f"❌ {description} 失败")
print(f"错误: {e.stderr}")
return False
def check_migrations_directory():
"""检查migrations目录是否存在"""
migrations_dir = project_root / "database" / "migrations"
return migrations_dir.exists()
def check_migration_files():
"""检查是否有迁移文件"""
versions_dir = project_root / "database" / "migrations" / "versions"
if not versions_dir.exists():
return 0
py_files = list(versions_dir.glob("*.py"))
return len(py_files)
def initialize_migrations():
"""初始化Flask-Migrate"""
if not check_migrations_directory():
print("📋 初始化Flask-Migrate...")
os.environ['FLASK_APP'] = 'app.py'
return run_command('flask db init', '初始化Flask-Migrate')
else:
print("✅ Flask-Migrate已初始化")
return True
def stamp_current_state():
"""标记当前数据库状态(用于已有数据库)"""
print("📋 标记当前数据库状态为最新版本...")
os.environ['FLASK_APP'] = 'app.py'
# 先检查是否有迁移文件
migration_count = check_migration_files()
if migration_count == 0:
print("⚠️ 没有迁移文件,需要先生成初始迁移")
return False
# 标记数据库为最新版本(head),这样就不会重复执行已有的迁移
return run_command('flask db stamp head', '标记数据库状态为最新版本')
def generate_migration(message="Auto migration"):
"""生成迁移文件"""
print(f"📋 生成迁移文件: {message}")
os.environ['FLASK_APP'] = 'app.py'
return run_command(f'flask db migrate -m "{message}"', '生成迁移文件')
def upgrade_database():
"""执行数据库迁移"""
print("📋 执行数据库迁移...")
os.environ['FLASK_APP'] = 'app.py'
return run_command('flask db upgrade', '执行数据库迁移')
def get_current_revision():
"""获取当前迁移版本"""
print("📋 检查当前迁移状态...")
os.environ['FLASK_APP'] = 'app.py'
return run_command('flask db current', '获取当前迁移状态')
def check_alembic_version_table():
"""检查是否存在alembic_version表"""
try:
from app import create_app
from database.models import db
from sqlalchemy import text
app = create_app()
with app.app_context():
# 尝试查询alembic_version表
result = db.session.execute(text("SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = 'alembic_version'"))
count = result.scalar()
return count > 0
except Exception as e:
print(f"⚠️ 检查alembic_version表失败: {e}")
return False
def main():
"""主函数 - 完整的迁移流程"""
print("🚀 数据库迁移管理器启动")
print(f"📍 项目根目录: {project_root}")
# 步骤1: 初始化Flask-Migrate
if not initialize_migrations():
print("❌ Flask-Migrate初始化失败")
return False
# 步骤2: 检查迁移文件
migration_count = check_migration_files()
print(f"📊 发现 {migration_count} 个迁移文件")
# 步骤3: 检查是否为首次部署(线上已有数据库但无alembic_version表)
has_alembic_table = check_alembic_version_table()
if not has_alembic_table and migration_count > 0:
print("🔍 检测到首次部署到已有数据库,执行stamp操作...")
# 标记数据库为最新版本,避免重复创建已存在的表
if not stamp_current_state():
print("❌ 数据库状态标记失败")
return False
print("✅ 数据库状态标记完成")
else:
# 步骤4: 获取当前状态
get_current_revision()
# 步骤5: 执行迁移
if not upgrade_database():
print("❌ 数据库迁移失败")
return False
print("🎉 数据库迁移完成")
return True
if __name__ == '__main__':
success = main()
sys.exit(0 if success else 1)
......@@ -36,6 +36,7 @@ def test_connection(config, max_retries=30, retry_interval=2):
"""
print(f"🔍 等待数据库连接就绪...")
print(f"📍 数据库地址: {config['host']}:{config['port']}")
if 'database' in config:
print(f"📍 数据库名称: {config['database']}")
print(f"📍 最大重试次数: {max_retries}")
......
......@@ -18,7 +18,7 @@ services:
MYSQL_COLLATION_SERVER: utf8mb4_unicode_ci
volumes:
- mysql_data:/var/lib/mysql
- ./database/migrations/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
# - ./database/migrations/init.sql:/docker-entrypoint-initdb.d/init.sql:ro # 已使用Flask-Migrate,不需要init.sql
ports:
- "3306:3306" # Host port 3306 maps to container port 3306
networks:
......@@ -41,7 +41,7 @@ services:
build: .
container_name: patient_callback_app
ports:
- "4002:5000" # Host port 4002 maps to container port 5000
- "4002:5001" # Host port 4002 maps to container port 5001
environment:
- DB_HOST=mysql
- DB_PORT=3306
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment