Commit d869c44f by yiling.shen

修复马山门诊数据导出问题并完善部署脚本

主要修复:
1. 修复export_data.py中硬编码的MySQL连接配置
2. 移除基于病历号前缀的诊所分配逻辑,改为基于数据库表关系
3. 实现患者去重统计,基于最新回访记录计算状态
4. 添加cryptography依赖解决MySQL 8.0认证问题

新增功能:
1. 完整的GitLab CI/CD部署脚本
2. 数据库迁移和完整性检查脚本
3. 自动化部署验证脚本
4. 环境变量配置说明文档

技术改进:
- 患者-诊所关系基于patients.clinic_name字段
- Excel导出统计基于唯一患者而非记录数
- 支持环境变量配置数据库连接
- 完整的Docker部署支持
parent 4ec2561d
Pipeline #3166 failed in 0 seconds
# .gitlab-ci.yml
# 患者画像回访话术系统 - GitLab CI/CD 配置
# 用于自动化测试、构建和部署
deploy_to_production:
# 定义阶段
stages:
- test # 测试阶段
- build # 构建阶段
- deploy # 部署阶段
# 全局变量
variables:
# 应用配置
APP_NAME: "patient-callback-system"
DOCKER_IMAGE: "patient-callback-app"
# 数据库配置(测试环境)
TEST_DB_HOST: "localhost"
TEST_DB_PORT: "3307"
TEST_DB_USER: "callback_user"
TEST_DB_PASSWORD: "dev_password_123"
TEST_DB_NAME: "callback_system"
# 部署环境配置
PROD_DB_HOST: "$PROD_DB_HOST"
PROD_DB_PORT: "$PROD_DB_PORT"
PROD_DB_USER: "$PROD_DB_USER"
PROD_DB_PASSWORD: "$PROD_DB_PASSWORD"
PROD_DB_NAME: "$PROD_DB_NAME"
PROD_APP_URL: "$PROD_APP_URL"
# 缓存配置
cache:
paths:
- .pip-cache/
- node_modules/
# 测试阶段
test:
stage: test
image: python:3.9-slim
services:
- mysql:8.0
variables:
MYSQL_ROOT_PASSWORD: "test_password"
MYSQL_DATABASE: "callback_system"
MYSQL_USER: "callback_user"
MYSQL_PASSWORD: "dev_password_123"
DB_HOST: "mysql"
DB_PORT: "3306"
DB_USER: "callback_user"
DB_PASSWORD: "dev_password_123"
DB_NAME: "callback_system"
before_script:
- pip install --cache-dir .pip-cache -r requirements.txt
- pip install pytest pytest-cov
- echo "等待MySQL启动..."
- sleep 30
- mysql -h mysql -u callback_user -pdev_password_123 -e "SELECT 1;"
script:
- echo "开始运行测试..."
- python -m pytest tests/ -v --cov=. --cov-report=xml
- echo "测试完成"
coverage: '/TOTAL.*\s+(\d+%)$/'
artifacts:
reports:
coverage_report:
coverage_format: cobertura
path: coverage.xml
paths:
- coverage.xml
expire_in: 1 week
only:
- merge_requests
- develop
- main
# 代码质量检查
code_quality:
stage: test
image: python:3.9-slim
before_script:
- pip install --cache-dir .pip-cache flake8 black isort
script:
- echo "检查代码格式..."
- black --check --diff .
- isort --check-only --diff .
- echo "检查代码质量..."
- flake8 . --max-line-length=120 --extend-ignore=E203,W503
artifacts:
paths:
- .flake8-report.txt
expire_in: 1 week
only:
- merge_requests
- develop
- main
# 构建阶段
build:
stage: build
image: docker:20.10.16
services:
- docker:20.10.16-dind
variables:
DOCKER_TLS_CERTDIR: "/certs"
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
script:
- echo "构建Docker镜像..."
- docker build -t $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA .
- docker build -t $CI_REGISTRY_IMAGE:latest .
- echo "推送镜像到GitLab Registry..."
- docker push $CI_REGISTRY_IMAGE:$CI_COMMIT_SHA
- docker push $CI_REGISTRY_IMAGE:latest
artifacts:
paths:
- .dockerignore
expire_in: 1 week
only:
- develop
- main
# 部署到测试环境
deploy_test:
stage: deploy
image: alpine:latest
before_script:
- apk add --no-cache openssh-client
- eval $(ssh-agent -s)
- echo "$TEST_SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add -
- mkdir -p ~/.ssh
- chmod 700 ~/.ssh
- echo "$TEST_SSH_KNOWN_HOSTS" >> ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
script:
- echo "部署到测试环境..."
- ssh $TEST_SSH_USER@$TEST_SSH_HOST "
cd $TEST_APP_DIR &&
git pull origin develop &&
docker-compose down &&
docker-compose pull &&
docker-compose up -d &&
echo '测试环境部署完成'
"
environment:
name: test
url: $TEST_APP_URL
only:
- develop
# --- 在这里添加 tags 部分 ---
# 部署到生产环境
deploy_production:
stage: deploy
image: alpine:latest
before_script:
- apk add --no-cache openssh-client mysql-client
- eval $(ssh-agent -s)
- echo "$PROD_SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add -
- mkdir -p ~/.ssh
- chmod 700 ~/.ssh
- echo "$PROD_SSH_KNOWN_HOSTS" >> ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
script:
- echo "开始生产环境部署..."
# 1. 数据库备份
- echo "执行数据库备份..."
- mysqldump -h $PROD_DB_HOST -P $PROD_DB_PORT -u $PROD_DB_USER -p$PROD_DB_PASSWORD \
--single-transaction --routines --triggers \
$PROD_DB_NAME > backup_$(date +%Y%m%d_%H%M%S).sql
# 2. 上传部署脚本
- echo "上传部署脚本..."
- scp deploy_scripts/deploy.sh $PROD_SSH_USER@$PROD_SSH_HOST:/tmp/
- scp deploy_scripts/migrate_database.sql $PROD_SSH_USER@$PROD_SSH_HOST:/tmp/
# 3. 执行部署
- echo "执行自动化部署..."
- ssh $PROD_SSH_USER@$PROD_SSH_HOST "
chmod +x /tmp/deploy.sh &&
export DB_HOST=$PROD_DB_HOST &&
export DB_PORT=$PROD_DB_PORT &&
export DB_USER=$PROD_DB_USER &&
export DB_PASSWORD=$PROD_DB_PASSWORD &&
export DB_NAME=$PROD_DB_NAME &&
export APP_URL=$PROD_APP_URL &&
export DEPLOY_ENV=production &&
export AUTO_CLEANUP=true &&
/tmp/deploy.sh
"
# 4. 验证部署
- echo "验证部署结果..."
- sleep 30
- curl -f $PROD_APP_URL/api/health || exit 1
- echo "生产环境部署完成!"
environment:
name: production
url: $PROD_APP_URL
when: manual # 手动触发生产环境部署
only:
- main
tags:
- jarvis # 指定使用带有 "jarvis" 标签的 Runner
# ---------------------------
- production
# before_script, script, only 这些部分保持不变
# 回滚生产环境
rollback_production:
stage: deploy
image: alpine:latest
before_script:
- 'which ssh-agent || ( apk add --update --no-cache openssh-client )'
- apk add --no-cache openssh-client
- eval $(ssh-agent -s)
- echo "$SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add -
- echo "$PROD_SSH_PRIVATE_KEY" | tr -d '\r' | ssh-add -
- mkdir -p ~/.ssh
- chmod 700 ~/.ssh
- echo "$SSH_KNOWN_HOSTS" > ~/.ssh/known_hosts
- echo "$PROD_SSH_KNOWN_HOSTS" >> ~/.ssh/known_hosts
- chmod 644 ~/.ssh/known_hosts
script:
- |
ssh -p $SSH_PORT $SSH_USER@$SSH_HOST "
echo '✅ 连接服务器成功,开始执行部署脚本...'
cd customer-recall
git pull origin master
docker compose up -d --build
echo '🚀 部署流程执行完毕!'
- echo "开始生产环境回滚..."
- ssh $PROD_SSH_USER@$PROD_SSH_HOST "
cd $PROD_APP_DIR &&
git checkout HEAD~1 &&
docker-compose down &&
docker-compose up -d &&
echo '生产环境回滚完成'
"
environment:
name: production
url: $PROD_APP_URL
when: manual # 手动触发回滚
only:
- master
\ No newline at end of file
- main
tags:
- production
# 部署后验证
verify_deployment:
stage: deploy
image: alpine:latest
script:
- echo "验证部署结果..."
- apk add --no-cache curl
# 健康检查
- echo "检查应用健康状态..."
- curl -f $PROD_APP_URL/api/health
# 功能测试
- echo "测试导出功能..."
- curl -f $PROD_APP_URL/api/export-data || echo "导出功能需要登录,这是正常的"
- echo "部署验证完成!"
environment:
name: production
url: $PROD_APP_URL
only:
- main
tags:
- production
allow_failure: true # 验证失败不影响部署
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
添加马山门诊测试回访记录的脚本
"""
import pymysql
import json
from datetime import datetime, timedelta
def add_mashan_callbacks():
"""添加马山门诊的测试回访记录"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 添加马山门诊测试回访记录 ===")
# 1. 获取马山门诊的TS0I患者列表
cursor.execute("""
SELECT case_number, patient_name
FROM patients
WHERE case_number LIKE 'TS0I%' AND clinic_name = '马山门诊'
ORDER BY case_number
LIMIT 10
""")
patients = cursor.fetchall()
print(f"找到 {len(patients)} 个马山门诊患者")
# 2. 添加测试回访记录
test_records = [
{
'case_number': patients[0][0] if patients else 'TS0I000001',
'callback_result': '成功',
'operator': '马山门诊-张医生',
'callback_methods': ['电话回访'],
'callback_record': '患者表示恢复良好,对治疗效果满意。'
},
{
'case_number': patients[1][0] if len(patients) > 1 else 'TS0I000002',
'callback_result': '成功',
'operator': '马山门诊-李护士',
'callback_methods': ['微信回访'],
'callback_record': '患者反馈无不适,建议定期复查。'
},
{
'case_number': patients[2][0] if len(patients) > 2 else 'TS0I000003',
'callback_result': '不成功',
'operator': '马山门诊-王医生',
'callback_methods': ['电话回访'],
'callback_record': '患者电话无人接听,需后续跟进。'
},
{
'case_number': patients[3][0] if len(patients) > 3 else 'TS0I000004',
'callback_result': '放弃回访',
'operator': '马山门诊-赵护士',
'callback_methods': ['短信回访'],
'callback_record': '患者明确表示不需要回访服务。'
}
]
# 3. 插入回访记录
insert_sql = """
INSERT INTO callback_records
(case_number, callback_methods, callback_record, callback_result, operator, create_time)
VALUES (%s, %s, %s, %s, %s, %s)
"""
base_time = datetime.now() - timedelta(days=1)
inserted_count = 0
for i, record in enumerate(test_records):
try:
cursor.execute(insert_sql, (
record['case_number'],
json.dumps(record['callback_methods'], ensure_ascii=False),
record['callback_record'],
record['callback_result'],
record['operator'],
base_time + timedelta(hours=i)
))
inserted_count += 1
print(f" 已添加: {record['case_number']} - {record['callback_result']}")
except Exception as e:
print(f" 添加失败 {record['case_number']}: {e}")
# 提交更改
connection.commit()
print(f"\n成功添加 {inserted_count} 条回访记录")
# 4. 验证结果
cursor.execute("""
SELECT COUNT(*)
FROM callback_records
WHERE case_number LIKE 'TS0I%'
""")
total_mashan_callbacks = cursor.fetchone()[0]
print(f"马山门诊回访记录总数: {total_mashan_callbacks}")
# 5. 检查各状态分布
cursor.execute("""
SELECT
callback_result,
COUNT(*) as count
FROM callback_records
WHERE case_number LIKE 'TS0I%'
GROUP BY callback_result
""")
print(f"\n马山门诊回访状态分布:")
for result, count in cursor.fetchall():
print(f" {result}: {count} 条")
cursor.close()
connection.close()
print("\n添加完成!")
except Exception as e:
print(f"添加失败: {e}")
if __name__ == "__main__":
add_mashan_callbacks()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
添加多个诊所的测试回访记录
"""
import pymysql
from datetime import datetime
import random
import json
def add_test_callbacks():
"""添加多个诊所的测试回访记录"""
# 数据库配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 添加多个诊所的测试回访记录 ===")
# 获取各诊所的患者
clinics = [
'河埒门诊',
'东亭门诊',
'红豆门诊',
'通善口腔医院',
'新吴门诊',
'大丰门诊',
'惠山门诊',
'学前街门诊'
]
total_added = 0
for clinic_name in clinics:
print(f"\n处理 {clinic_name}...")
# 获取该诊所的患者
cursor.execute("""
SELECT case_number, patient_name
FROM patients
WHERE clinic_name = %s
LIMIT 3
""", (clinic_name,))
patients = cursor.fetchall()
if not patients:
print(f" {clinic_name}: 没有找到患者")
continue
clinic_added = 0
for patient in patients:
case_number = patient[0]
patient_name = patient[1]
# 随机生成回访结果
results = ['成功', '不成功', '放弃回访']
result = random.choice(results)
# 随机生成回访方式
methods = ['电话', '短信', '微信', '上门']
method_count = random.randint(1, 2)
selected_methods = random.sample(methods, method_count)
# 插入回访记录
insert_sql = """
INSERT INTO callback_records (
case_number, callback_methods, callback_result,
callback_record, operator, create_time, update_time
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
callback_record = f"测试回访记录 - {result}"
operator = "测试操作员"
now = datetime.now()
cursor.execute(insert_sql, (
case_number,
json.dumps(selected_methods), # 使用json.dumps转换为JSON字符串
result,
callback_record,
operator,
now,
now
))
clinic_added += 1
total_added += 1
print(f" 已添加: {case_number} - {result}")
print(f" {clinic_name}: 添加了 {clinic_added} 条记录")
# 提交事务
connection.commit()
print(f"\n=== 添加完成! ===")
print(f"总共添加了 {total_added} 条回访记录")
# 统计各诊所的回访记录
print(f"\n=== 各诊所回访记录统计 ===")
cursor.execute("""
SELECT
p.clinic_name,
COUNT(cr.case_number) as record_count,
SUM(CASE WHEN cr.callback_result = '成功' THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN cr.callback_result = '不成功' THEN 1 ELSE 0 END) as failed_count,
SUM(CASE WHEN cr.callback_result = '放弃回访' THEN 1 ELSE 0 END) as abandon_count
FROM patients p
LEFT JOIN callback_records cr ON p.case_number = cr.case_number
GROUP BY p.clinic_name
ORDER BY record_count DESC
""")
clinic_stats = cursor.fetchall()
for clinic, total, success, failed, abandon in clinic_stats:
if total > 0:
print(f" {clinic}: {total}条 (成功:{success}, 不成功:{failed}, 放弃:{abandon})")
connection.close()
except Exception as e:
print(f"添加回访记录失败: {e}")
if __name__ == "__main__":
add_test_callbacks()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分析JSON文件中各诊所患者的病历号前缀分布
"""
import json
import os
from collections import defaultdict
def analyze_json_distribution():
"""分析JSON文件中各诊所患者的病历号前缀分布"""
json_dir = "诊所患者json"
print("=== JSON文件中各诊所患者病历号前缀分布分析 ===")
for filename in os.listdir(json_dir):
if filename.endswith('.json') and filename != 'conversion_summary.json':
clinic_name = filename.replace('.json', '')
file_path = os.path.join(json_dir, filename)
print(f"\n--- {clinic_name} ---")
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
patients = data
elif isinstance(data, dict) and 'patients' in data:
patients = data['patients']
else:
patients = []
# 统计病历号前缀分布
prefix_count = defaultdict(int)
total_patients = 0
for patient in patients:
if isinstance(patient, dict):
case_number = patient.get('病历号', '')
if case_number:
prefix = case_number[:4] if len(case_number) >= 4 else case_number
prefix_count[prefix] += 1
total_patients += 1
print(f"总患者数: {total_patients}")
print("病历号前缀分布:")
# 按数量排序显示
sorted_prefixes = sorted(prefix_count.items(), key=lambda x: x[1], reverse=True)
for prefix, count in sorted_prefixes:
percentage = (count / total_patients) * 100 if total_patients > 0 else 0
print(f" {prefix}: {count} 人 ({percentage:.1f}%)")
except Exception as e:
print(f"读取 {filename} 失败: {e}")
if __name__ == "__main__":
analyze_json_distribution()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查所有回访记录和患者数据的脚本
"""
import pymysql
import json
def check_all_data():
"""检查所有数据"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 全面数据检查 ===")
# 1. 检查回访记录表结构
cursor.execute("DESCRIBE callback_records")
print("\n回访记录表结构:")
for field in cursor.fetchall():
print(f" {field[0]}: {field[1]} {field[2]} {field[3]}")
# 2. 检查患者表结构
cursor.execute("DESCRIBE patients")
print("\n患者表结构:")
for field in cursor.fetchall():
print(f" {field[0]}: {field[1]} {field[2]} {field[3]}")
# 3. 检查回访记录总数
cursor.execute("SELECT COUNT(*) FROM callback_records")
total_records = cursor.fetchone()[0]
print(f"\n回访记录总数: {total_records}")
# 4. 检查患者总数
cursor.execute("SELECT COUNT(*) FROM patients")
total_patients = cursor.fetchone()[0]
print(f"患者总数: {total_patients}")
# 5. 检查所有回访记录
cursor.execute("""
SELECT
case_number,
callback_result,
operator,
create_time
FROM callback_records
ORDER BY create_time DESC
LIMIT 20
""")
print(f"\n最近20条回访记录:")
for record in cursor.fetchall():
case_number, result, operator, create_time = record
print(f" {case_number}: {result} - {operator} - {create_time}")
# 6. 检查患者表中的诊所分布
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as count
FROM patients
GROUP BY clinic_name
ORDER BY count DESC
""")
print(f"\n患者表中的诊所分布:")
for clinic_name, count in cursor.fetchall():
print(f" {clinic_name}: {count} 人")
# 7. 检查是否有TS0I开头的患者
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name
FROM patients
WHERE case_number LIKE 'TS0I%'
LIMIT 10
""")
ts0i_patients = cursor.fetchall()
print(f"\nTS0I开头的患者 (共{len(ts0i_patients)}人):")
for patient in ts0i_patients:
case_number, name, clinic = patient
print(f" {case_number}: {name} - {clinic}")
# 8. 检查回访记录表中的所有病例号前缀
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM callback_records
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"\n回访记录表中的病例号前缀分布:")
for prefix, count in cursor.fetchall():
print(f" {prefix}: {count} 条")
cursor.close()
connection.close()
except Exception as e:
print(f"查询失败: {e}")
if __name__ == "__main__":
check_all_data()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查17:09的正确导出文件
"""
import pandas as pd
import os
def check_correct_excel():
"""检查17:09的正确导出文件"""
# 查找17:09的导出文件
target_file = "回访记录导出_20250813_170940.xlsx"
if not os.path.exists(target_file):
print(f"未找到目标文件: {target_file}")
return
print(f"检查文件: {target_file}")
try:
# 读取总览表
summary_df = pd.read_excel(target_file, sheet_name='总览')
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 检查各诊所详情表
print(f"\n=== 检查各诊所详情表 ===")
excel_file = pd.ExcelFile(target_file)
print(f"工作表列表: {excel_file.sheet_names}")
# 检查河埒门诊详情表
if '河埒门诊' in excel_file.sheet_names:
try:
helai_df = pd.read_excel(target_file, sheet_name='河埒门诊')
print(f"\n河埒门诊详情表:")
print(f"记录总数: {len(helai_df)}")
print(f"列名: {list(helai_df.columns)}")
# 查找TS0C005909
if '病历号' in helai_df.columns:
ts0c005909_in_helai = helai_df[helai_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_helai.empty:
print(f"✅ TS0C005909 在河埒门诊详情表中找到")
print(ts0c005909_in_helai.to_string(index=False))
else:
print(f"❌ TS0C005909 在河埒门诊详情表中未找到")
else:
print(f"河埒门诊详情表没有'病历号'列")
except Exception as e:
print(f"读取河埒门诊详情表失败: {e}")
else:
print(f"未找到河埒门诊详情表")
# 检查东亭门诊详情表
if '东亭门诊' in excel_file.sheet_names:
try:
dongting_df = pd.read_excel(target_file, sheet_name='东亭门诊')
print(f"\n东亭门诊详情表:")
print(f"记录总数: {len(dongting_df)}")
print(f"列名: {list(dongting_df.columns)}")
# 查找TS0C005909
if '病历号' in dongting_df.columns:
ts0c005909_in_dongting = dongting_df[dongting_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_dongting.empty:
print(f"❌ TS0C005909 错误地在东亭门诊详情表中找到!")
print(ts0c005909_in_dongting.to_string(index=False))
else:
print(f"✅ TS0C005909 在东亭门诊详情表中未找到(正确)")
else:
print(f"东亭门诊详情表没有'病历号'列")
except Exception as e:
print(f"读取东亭门诊详情表失败: {e}")
else:
print(f"未找到东亭门诊详情表")
# 检查所有诊所详情表,查找TS0C005909
print(f"\n=== 在所有诊所详情表中查找TS0C005909 ===")
for sheet_name in excel_file.sheet_names:
if sheet_name not in ['总览', 'Sheet']:
try:
df = pd.read_excel(target_file, sheet_name=sheet_name)
if '病历号' in df.columns:
ts0c005909_found = df[df['病历号'] == 'TS0C005909']
if not ts0c005909_found.empty:
print(f"❌ TS0C005909 在 {sheet_name} 详情表中找到!")
print(f" 记录数: {len(ts0c005909_found)}")
print(f" 数据: {ts0c005909_found.to_string(index=False)}")
else:
print(f"✅ TS0C005909 在 {sheet_name} 详情表中未找到")
else:
print(f"⚠️ {sheet_name} 详情表没有'病历号'列")
except Exception as e:
print(f"读取 {sheet_name} 详情表失败: {e}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_correct_excel()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查Docker中的数据库状态
"""
import pymysql
def check_docker_db():
"""检查Docker中的数据库状态"""
# Docker中的数据库配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("✅ 数据库连接成功!")
# 检查表
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print(f"\n=== 数据库表 ===")
if tables:
for table in tables:
print(f" {table[0]}")
else:
print(" 没有表")
# 检查patients表
if ('patients',) in tables:
cursor.execute("SELECT COUNT(*) FROM patients")
patient_count = cursor.fetchone()[0]
print(f"\n=== patients表 ===")
print(f" 患者总数: {patient_count}")
if patient_count > 0:
cursor.execute("SELECT clinic_name, COUNT(*) FROM patients GROUP BY clinic_name")
clinic_distribution = cursor.fetchall()
print(f" 诊所分布:")
for clinic, count in clinic_distribution:
print(f" {clinic}: {count}人")
else:
print("\n❌ patients表不存在")
# 检查callback_records表
if ('callback_records',) in tables:
cursor.execute("SELECT COUNT(*) FROM callback_records")
callback_count = cursor.fetchone()[0]
print(f"\n=== callback_records表 ===")
print(f" 回访记录总数: {callback_count}")
if callback_count > 0:
cursor.execute("SELECT callback_result, COUNT(*) FROM callback_records GROUP BY callback_result")
result_distribution = cursor.fetchall()
print(f" 回访结果分布:")
for result, count in result_distribution:
print(f" {result}: {count}条")
else:
print("\n❌ callback_records表不存在")
# 检查clinics表
if ('clinics',) in tables:
cursor.execute("SELECT COUNT(*) FROM clinics")
clinic_count = cursor.fetchone()[0]
print(f"\n=== clinics表 ===")
print(f" 诊所总数: {clinic_count}")
if clinic_count > 0:
cursor.execute("SELECT clinic_name FROM clinics")
clinic_names = cursor.fetchall()
print(f" 诊所名称:")
for clinic in clinic_names:
print(f" {clinic[0]}")
else:
print("\n❌ clinics表不存在")
connection.close()
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
if __name__ == "__main__":
check_docker_db()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查Excel导出文件中的数据的脚本
"""
import pandas as pd
import os
def check_excel_data():
"""检查Excel导出文件中的数据"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表
summary_df = pd.read_excel(latest_file, sheet_name='总览')
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 读取马山门诊详细表
mashan_df = pd.read_excel(latest_file, sheet_name='马山门诊')
print(f"\n=== 马山门诊详细数据 ===")
print(f"记录总数: {len(mashan_df)}")
print(mashan_df.to_string(index=False))
# 统计马山门诊各状态数量
if '回访结果' in mashan_df.columns:
status_counts = mashan_df['回访结果'].value_counts()
print(f"\n马山门诊回访状态统计:")
for status, count in status_counts.items():
print(f" {status}: {count} 条")
# 检查成功回访的记录
if '回访结果' in mashan_df.columns:
success_records = mashan_df[mashan_df['回访结果'] == '成功']
print(f"\n马山门诊成功回访记录 ({len(success_records)} 条):")
for _, record in success_records.iterrows():
case_number = record.get('病历号', 'N/A')
operator = record.get('操作员', 'N/A')
create_time = record.get('创建时间', 'N/A')
print(f" {case_number}: {operator} - {create_time}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_excel_data()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查Excel导出文件,确认患者是否被正确分配到对应的诊所详情表中
"""
import pandas as pd
import os
def check_excel_export():
"""检查Excel导出文件"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表
summary_df = pd.read_excel(latest_file, sheet_name='总览')
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 检查各诊所详情表
print(f"\n=== 检查各诊所详情表 ===")
# 检查河埒门诊详情表
try:
helai_df = pd.read_excel(latest_file, sheet_name='河埒门诊')
print(f"\n河埒门诊详情表:")
print(f"记录总数: {len(helai_df)}")
# 查找TS0C005909
ts0c005909_in_helai = helai_df[helai_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_helai.empty:
print(f"✅ TS0C005909 在河埒门诊详情表中找到")
print(ts0c005909_in_helai.to_string(index=False))
else:
print(f"❌ TS0C005909 在河埒门诊详情表中未找到")
except Exception as e:
print(f"读取河埒门诊详情表失败: {e}")
# 检查东亭门诊详情表
try:
dongting_df = pd.read_excel(latest_file, sheet_name='东亭门诊')
print(f"\n东亭门诊详情表:")
print(f"记录总数: {len(dongting_df)}")
# 查找TS0C005909
ts0c005909_in_dongting = dongting_df[dongting_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_dongting.empty:
print(f"❌ TS0C005909 错误地在东亭门诊详情表中找到!")
print(ts0c005909_in_dongting.to_string(index=False))
else:
print(f"✅ TS0C005909 在东亭门诊详情表中未找到(正确)")
except Exception as e:
print(f"读取东亭门诊详情表失败: {e}")
# 检查所有诊所详情表,查找TS0C005909
print(f"\n=== 在所有诊所详情表中查找TS0C005909 ===")
excel_file = pd.ExcelFile(latest_file)
for sheet_name in excel_file.sheet_names:
if sheet_name not in ['总览', 'Sheet']:
try:
df = pd.read_excel(latest_file, sheet_name=sheet_name)
ts0c005909_found = df[df['病历号'] == 'TS0C005909']
if not ts0c005909_found.empty:
print(f"❌ TS0C005909 在 {sheet_name} 详情表中找到!")
print(f" 记录数: {len(ts0c005909_found)}")
print(f" 数据: {ts0c005909_found.to_string(index=False)}")
else:
print(f"✅ TS0C005909 在 {sheet_name} 详情表中未找到")
except Exception as e:
print(f"读取 {sheet_name} 详情表失败: {e}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_excel_export()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最终检查脚本,验证Excel列标题和数据是否正确显示
"""
import pandas as pd
import os
def check_final_excel():
"""最终检查Excel文件"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表
summary_df = pd.read_excel(latest_file, sheet_name='总览', header=1)
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 检查列标题
print(f"\n=== 列标题检查 ===")
print(f"列标题: {list(summary_df.columns)}")
# 检查是否有正确的列标题
expected_columns = ['诊所名称', '已回访患者数', '成功', '不成功', '放弃回访', '成功率']
for expected_col in expected_columns:
if expected_col in summary_df.columns:
print(f"✅ 找到列: {expected_col}")
else:
print(f"❌ 未找到列: {expected_col}")
# 检查数据统计
print(f"\n=== 数据统计验证 ===")
# 验证河埒门诊的数据
helai_row = summary_df[summary_df['诊所名称'] == '河埒门诊']
if not helai_row.empty:
helai_patients = helai_row.iloc[0]['已回访患者数']
helai_success = helai_row.iloc[0]['成功']
helai_rate = helai_row.iloc[0]['成功率']
print(f"河埒门诊:")
print(f" 已回访患者数: {helai_patients}")
print(f" 成功数: {helai_success}")
print(f" 成功率: {helai_rate}")
# 验证成功率计算
if helai_patients > 0:
calculated_rate = (helai_success / helai_patients) * 100
print(f" 计算成功率: {calculated_rate:.1f}%")
if abs(calculated_rate - float(helai_rate.replace('%', ''))) < 0.1:
print(" ✅ 成功率计算正确")
else:
print(" ❌ 成功率计算不正确")
# 验证总计行
total_row = summary_df[summary_df['诊所名称'] == '总计']
if not total_row.empty:
total_patients = total_row.iloc[0]['已回访患者数']
total_success = total_row.iloc[0]['成功']
total_rate = total_row.iloc[0]['成功率']
print(f"\n总计:")
print(f" 总已回访患者数: {total_patients}")
print(f" 总成功数: {total_success}")
print(f" 总成功率: {total_rate}")
# 验证总成功率计算
if total_patients > 0:
calculated_total_rate = (total_success / total_patients) * 100
print(f" 计算总成功率: {calculated_total_rate:.1f}%")
if abs(calculated_total_rate - float(total_rate.replace('%', ''))) < 0.1:
print(" ✅ 总成功率计算正确")
else:
print(" ❌ 总成功率计算不正确")
# 检查详情表
print(f"\n=== 详情表检查 ===")
excel_file = pd.ExcelFile(latest_file)
# 检查河埒门诊详情表
if '河埒门诊' in excel_file.sheet_names:
try:
helai_df = pd.read_excel(latest_file, sheet_name='河埒门诊')
print(f"河埒门诊详情表:")
print(f" 记录数: {len(helai_df)}")
print(f" 列名: {list(helai_df.columns)}")
# 检查是否有病历号列
if '病历号' in helai_df.columns:
unique_patients = helai_df['病历号'].nunique()
print(f" 唯一患者数: {unique_patients}")
# 验证与总览表的一致性
if unique_patients == helai_patients:
print(" ✅ 详情表患者数与总览表一致")
else:
print(" ❌ 详情表患者数与总览表不一致")
else:
print(" ⚠️ 没有'病历号'列")
except Exception as e:
print(f" 读取失败: {e}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_final_excel()
\ No newline at end of file
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查最新的Excel导出文件
"""
import pandas as pd
import os
def check_latest_excel():
"""检查最新的Excel导出文件"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表
summary_df = pd.read_excel(latest_file, sheet_name='总览')
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 检查各诊所详情表
print(f"\n=== 检查各诊所详情表 ===")
excel_file = pd.ExcelFile(latest_file)
print(f"工作表列表: {excel_file.sheet_names}")
# 检查河埒门诊详情表
if '河埒门诊' in excel_file.sheet_names:
try:
helai_df = pd.read_excel(latest_file, sheet_name='河埒门诊')
print(f"\n河埒门诊详情表:")
print(f"记录总数: {len(helai_df)}")
print(f"列名: {list(helai_df.columns)}")
# 查找TS0C005909
if '病历号' in helai_df.columns:
ts0c005909_in_helai = helai_df[helai_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_helai.empty:
print(f"✅ TS0C005909 在河埒门诊详情表中找到")
print(ts0c005909_in_helai.to_string(index=False))
else:
print(f"❌ TS0C005909 在河埒门诊详情表中未找到")
else:
print(f"河埒门诊详情表没有'病历号'列")
except Exception as e:
print(f"读取河埒门诊详情表失败: {e}")
else:
print(f"未找到河埒门诊详情表")
# 检查东亭门诊详情表
if '东亭门诊' in excel_file.sheet_names:
try:
dongting_df = pd.read_excel(latest_file, sheet_name='东亭门诊')
print(f"\n东亭门诊详情表:")
print(f"记录总数: {len(dongting_df)}")
print(f"列名: {list(dongting_df.columns)}")
# 查找TS0C005909
if '病历号' in dongting_df.columns:
ts0c005909_in_dongting = dongting_df[dongting_df['病历号'] == 'TS0C005909']
if not ts0c005909_in_dongting.empty:
print(f"❌ TS0C005909 错误地在东亭门诊详情表中找到!")
print(ts0c005909_in_dongting.to_string(index=False))
else:
print(f"✅ TS0C005909 在东亭门诊详情表中未找到(正确)")
else:
print(f"东亭门诊详情表没有'病历号'列")
except Exception as e:
print(f"读取东亭门诊详情表失败: {e}")
else:
print(f"未找到东亭门诊详情表")
# 检查所有诊所详情表,查找TS0C005909
print(f"\n=== 在所有诊所详情表中查找TS0C005909 ===")
for sheet_name in excel_file.sheet_names:
if sheet_name not in ['总览', 'Sheet']:
try:
df = pd.read_excel(latest_file, sheet_name=sheet_name)
if '病历号' in df.columns:
ts0c005909_found = df[df['病历号'] == 'TS0C005909']
if not ts0c005909_found.empty:
print(f"❌ TS0C005909 在 {sheet_name} 详情表中找到!")
print(f" 记录数: {len(ts0c005909_found)}")
print(f" 数据: {ts0c005909_found.to_string(index=False)}")
else:
print(f"✅ TS0C005909 在 {sheet_name} 详情表中未找到")
else:
print(f"⚠️ {sheet_name} 详情表没有'病历号'列")
except Exception as e:
print(f"读取 {sheet_name} 详情表失败: {e}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_latest_excel()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查马山门诊回访数据的脚本
"""
import pymysql
import json
def check_mashan_data():
"""检查马山门诊的回访数据"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 马山门诊回访数据检查 ===")
# 1. 检查马山门诊的回访记录总数
cursor.execute("""
SELECT COUNT(*) as total_count
FROM callback_records
WHERE case_number LIKE 'TS0I%'
""")
total_count = cursor.fetchone()[0]
print(f"马山门诊回访记录总数: {total_count}")
# 2. 检查各状态的记录数
cursor.execute("""
SELECT
callback_result,
COUNT(*) as count
FROM callback_records
WHERE case_number LIKE 'TS0I%'
GROUP BY callback_result
""")
print("\n各状态记录数:")
for result, count in cursor.fetchall():
print(f" {result}: {count}")
# 3. 检查具体的回访记录
cursor.execute("""
SELECT
case_number,
callback_result,
operator,
create_time,
callback_methods
FROM callback_records
WHERE case_number LIKE 'TS0I%'
ORDER BY create_time DESC
""")
print(f"\n具体回访记录 (共{total_count}条):")
for record in cursor.fetchall():
case_number, result, operator, create_time, methods = record
methods_str = json.loads(methods) if methods else []
print(f" {case_number}: {result} - {operator} - {create_time} - {methods_str}")
# 4. 检查患者表中的马山门诊患者
cursor.execute("""
SELECT COUNT(*) as patient_count
FROM patients
WHERE clinic_name = '马山门诊'
""")
patient_count = cursor.fetchone()[0]
print(f"\n马山门诊患者总数: {patient_count}")
# 5. 检查回访记录表中的所有诊所分布
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM callback_records
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"\n各诊所病例号前缀分布:")
for prefix, count in cursor.fetchall():
clinic_name = get_clinic_name_by_prefix(prefix)
print(f" {prefix}: {count} 条 ({clinic_name})")
cursor.close()
connection.close()
except Exception as e:
print(f"查询失败: {e}")
def get_clinic_name_by_prefix(prefix):
"""根据病例号前缀获取诊所名称"""
prefix_map = {
'TS0G': '大丰门诊',
'TS0C': '东亭门诊',
'JY0A': '河埒门诊',
'TS0B': '红豆门诊',
'TS0E': '惠山门诊',
'TS0I': '马山门诊',
'TS0F': '新吴门诊',
'TS0M': '学前街门诊'
}
return prefix_map.get(prefix, '未知诊所')
if __name__ == "__main__":
check_mashan_data()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查新的Excel总览表,确认统计逻辑是否正确
"""
import pandas as pd
import os
def check_new_summary():
"""检查新的Excel总览表"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表
summary_df = pd.read_excel(latest_file, sheet_name='总览')
print("\n=== 总览表数据 ===")
print(summary_df.to_string(index=False))
# 检查列标题
print(f"\n=== 列标题检查 ===")
print(f"列标题: {list(summary_df.columns)}")
# 检查是否有"已回访患者数"列
if '已回访患者数' in summary_df.columns:
print("✅ 列标题已更新为'已回访患者数'")
else:
print("❌ 列标题未更新")
# 检查数据统计
print(f"\n=== 数据统计检查 ===")
# 读取河埒门诊详情表,验证患者数统计
if '河埒门诊' in pd.ExcelFile(latest_file).sheet_names:
try:
helai_df = pd.read_excel(latest_file, sheet_name='河埒门诊')
print(f"河埒门诊详情表记录数: {len(helai_df)}")
# 检查是否有病历号列
if '病历号' in helai_df.columns:
unique_patients = helai_df['病历号'].nunique()
print(f"河埒门诊详情表中唯一患者数: {unique_patients}")
# 从总览表获取河埒门诊的已回访患者数
helai_summary = summary_df[summary_df.iloc[:, 0] == '河埒门诊']
if not helai_summary.empty:
reported_patients = helai_summary.iloc[0, 1] # 第2列是已回访患者数
print(f"总览表中河埒门诊已回访患者数: {reported_patients}")
if unique_patients == reported_patients:
print("✅ 河埒门诊患者数统计正确")
else:
print("❌ 河埒门诊患者数统计不正确")
else:
print("⚠️ 河埒门诊详情表没有'病历号'列")
except Exception as e:
print(f"读取河埒门诊详情表失败: {e}")
# 检查总计行
total_row = summary_df[summary_df.iloc[:, 0] == '总计']
if not total_row.empty:
total_patients = total_row.iloc[0, 1] # 总已回访患者数
total_success = total_row.iloc[0, 2] # 总成功数
print(f"\n总计行:")
print(f" 总已回访患者数: {total_patients}")
print(f" 总成功数: {total_success}")
if total_patients > 0:
success_rate = (total_success / total_patients) * 100
print(f" 计算成功率: {success_rate:.1f}%")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
check_new_summary()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查未知门诊问题
"""
import pymysql
import os
def check_unknown_clinic():
"""检查未知门诊问题"""
# 数据库配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 检查未知门诊问题 ===")
# 1. 检查总患者数
cursor.execute("SELECT COUNT(*) FROM patients")
total_patients = cursor.fetchone()[0]
print(f"总患者数: {total_patients}")
# 2. 检查总回访记录数
cursor.execute("SELECT COUNT(*) FROM callback_records")
total_callbacks = cursor.fetchone()[0]
print(f"总回访记录数: {total_callbacks}")
# 3. 检查哪些回访记录的患者在patients表中找不到
cursor.execute("""
SELECT cr.case_number, cr.callback_result, cr.operator, cr.create_time
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL
""")
unknown_patients = cursor.fetchall()
print(f"\n找不到诊所信息的患者数量: {len(unknown_patients)}")
if unknown_patients:
print("\n这些患者的回访记录:")
for record in unknown_patients:
case_number, result, operator, create_time = record
print(f" 病历号: {case_number}, 回访结果: {result}, 操作员: {operator}, 时间: {create_time}")
# 4. 检查patients表中的诊所分布
cursor.execute("""
SELECT clinic_name, COUNT(*) as count
FROM patients
GROUP BY clinic_name
ORDER BY count DESC
""")
clinic_distribution = cursor.fetchall()
print(f"\n患者诊所分布:")
for clinic_name, count in clinic_distribution:
print(f" {clinic_name}: {count} 人")
# 5. 检查callback_records表中的诊所分布(通过JOIN)
cursor.execute("""
SELECT
COALESCE(p.clinic_name, '未知诊所') as clinic_name,
COUNT(*) as count
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
GROUP BY COALESCE(p.clinic_name, '未知诊所')
ORDER BY count DESC
""")
callback_clinic_distribution = cursor.fetchall()
print(f"\n回访记录诊所分布:")
for clinic_name, count in callback_clinic_distribution:
print(f" {clinic_name}: {count} 条记录")
cursor.close()
connection.close()
except Exception as e:
print(f"检查失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
check_unknown_clinic()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
清理测试数据
"""
import pymysql
def cleanup_test_data():
"""清理测试数据"""
# 数据库配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 清理测试数据 ===")
# 1. 查找不在patients表中的回访记录
cursor.execute("""
SELECT cr.case_number, cr.callback_result, cr.operator, cr.create_time
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL
""")
test_records = cursor.fetchall()
print(f"找到 {len(test_records)} 条测试数据:")
for record in test_records:
case_number, result, operator, create_time = record
print(f" 病历号: {case_number}, 回访结果: {result}, 操作员: {operator}, 时间: {create_time}")
if test_records:
# 2. 删除这些测试数据
test_case_numbers = [record[0] for record in test_records]
placeholders = ','.join(['%s'] * len(test_case_numbers))
delete_sql = f"DELETE FROM callback_records WHERE case_number IN ({placeholders})"
cursor.execute(delete_sql, test_case_numbers)
deleted_count = cursor.rowcount
print(f"\n✅ 成功删除 {deleted_count} 条测试数据")
# 3. 提交事务
connection.commit()
# 4. 验证删除结果
cursor.execute("SELECT COUNT(*) FROM callback_records")
remaining_count = cursor.fetchone()[0]
print(f"剩余回访记录数: {remaining_count}")
else:
print("✅ 没有找到测试数据")
cursor.close()
connection.close()
except Exception as e:
print(f"清理失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
cleanup_test_data()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
对比数据库和JSON文件中患者数据的脚本
"""
import pymysql
import json
import os
def compare_data():
"""对比数据库和JSON文件中的数据"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 数据库与JSON文件数据对比 ===")
# 1. 获取数据库中各诊所的患者数量
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC
""")
db_clinics = cursor.fetchall()
print("\n数据库中各诊所患者数量:")
for clinic_name, count in db_clinics:
print(f" {clinic_name}: {count} 人")
# 2. 读取JSON文件并统计患者数量
json_dir = "诊所患者json"
json_clinics = {}
for filename in os.listdir(json_dir):
if filename.endswith('.json') and filename != 'conversion_summary.json':
clinic_name = filename.replace('.json', '')
file_path = os.path.join(json_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
patient_count = len(data)
elif isinstance(data, dict) and 'patients' in data:
patient_count = len(data['patients'])
else:
patient_count = 0
json_clinics[clinic_name] = patient_count
print(f" {clinic_name}: {patient_count} 人 (JSON)")
except Exception as e:
print(f" 读取 {filename} 失败: {e}")
# 3. 对比数据
print(f"\n=== 数据对比结果 ===")
# 创建对比表
print(f"{'诊所名称':<15} {'数据库':<10} {'JSON文件':<10} {'差异':<10} {'状态':<10}")
print("-" * 60)
total_db = 0
total_json = 0
for clinic_name, db_count in db_clinics:
json_count = json_clinics.get(clinic_name, 0)
diff = db_count - json_count
status = "✅ 一致" if diff == 0 else f"❌ 差异{diff:+d}"
print(f"{clinic_name:<15} {db_count:<10} {json_count:<10} {diff:+<10} {status:<10}")
total_db += db_count
total_json += json_count
# 4. 检查JSON中有但数据库中没有的诊所
print(f"\n=== JSON中有但数据库中没有的诊所 ===")
for clinic_name in json_clinics:
if not any(db_clinic[0] == clinic_name for db_clinic in db_clinics):
print(f" {clinic_name}: {json_clinics[clinic_name]} 人 (仅在JSON中)")
# 5. 检查数据库中有但JSON中没有的诊所
print(f"\n=== 数据库中有但JSON中没有的诊所 ===")
for clinic_name, db_count in db_clinics:
if clinic_name not in json_clinics:
print(f" {clinic_name}: {db_count} 人 (仅在数据库中)")
# 6. 总体统计
print(f"\n=== 总体统计 ===")
print(f"数据库总患者数: {total_db}")
print(f"JSON文件总患者数: {total_json}")
print(f"总差异: {total_db - total_json:+}")
# 7. 详细检查马山门诊的数据
print(f"\n=== 马山门诊详细检查 ===")
# 检查数据库中的马山门诊患者
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name
FROM patients
WHERE clinic_name = '马山门诊'
ORDER BY case_number
LIMIT 10
""")
db_mashan = cursor.fetchall()
print(f"数据库马山门诊患者 (前10个):")
for case_number, name, clinic in db_mashan:
print(f" {case_number}: {name} - {clinic}")
# 检查JSON中的马山门诊患者
mashan_json_path = os.path.join(json_dir, "马山门诊.json")
try:
with open(mashan_json_path, 'r', encoding='utf-8') as f:
mashan_data = json.load(f)
if isinstance(mashan_data, list):
json_patients = mashan_data
elif isinstance(mashan_data, dict) and 'patients' in mashan_data:
json_patients = mashan_data['patients']
else:
json_patients = []
print(f"\nJSON马山门诊患者 (前10个):")
for i, patient in enumerate(json_patients[:10]):
if isinstance(patient, dict):
case_number = patient.get('病历号', 'N/A')
name = patient.get('姓名', 'N/A')
clinic = patient.get('诊所', 'N/A')
print(f" {case_number}: {name} - {clinic}")
else:
print(f" {i}: {patient}")
# 检查JSON中TS0I开头的患者数量
ts0i_count = 0
for patient in json_patients:
if isinstance(patient, dict):
case_number = patient.get('病历号', '')
if case_number.startswith('TS0I'):
ts0i_count += 1
print(f"\nJSON中TS0I开头的患者数量: {ts0i_count}")
except Exception as e:
print(f"读取马山门诊JSON失败: {e}")
# 8. 检查数据库和JSON中TS0I患者的差异
print(f"\n=== TS0I患者数据对比 ===")
# 数据库中的TS0I患者
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name
FROM patients
WHERE case_number LIKE 'TS0I%'
ORDER BY case_number
LIMIT 10
""")
db_ts0i = cursor.fetchall()
print(f"数据库中TS0I患者 (前10个):")
for case_number, name, clinic in db_ts0i:
print(f" {case_number}: {name} - {clinic}")
# 统计数据库中的TS0I患者总数
cursor.execute("SELECT COUNT(*) FROM patients WHERE case_number LIKE 'TS0I%'")
db_ts0i_total = cursor.fetchone()[0]
print(f"数据库中TS0I患者总数: {db_ts0i_total}")
cursor.close()
connection.close()
except Exception as e:
print(f"对比失败: {e}")
if __name__ == "__main__":
compare_data()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
调试Excel文件结构
"""
import pandas as pd
import os
def debug_excel_structure():
"""调试Excel文件结构"""
# 查找最新的导出文件
export_files = [f for f in os.listdir('.') if f.startswith('回访记录导出_') and f.endswith('.xlsx')]
if not export_files:
print("未找到导出文件")
return
# 按修改时间排序,获取最新的文件
latest_file = max(export_files, key=lambda x: os.path.getmtime(x))
print(f"检查文件: {latest_file}")
try:
# 读取总览表,不指定header
summary_df = pd.read_excel(latest_file, sheet_name='总览', header=None)
print("\n=== Excel原始结构 ===")
print(f"总行数: {len(summary_df)}")
print(f"总列数: {len(summary_df.columns)}")
# 显示前10行
print("\n=== 前10行数据 ===")
for i in range(min(10, len(summary_df))):
row_data = summary_df.iloc[i].tolist()
print(f"第{i+1}行: {row_data}")
# 尝试不同的header设置
print("\n=== 尝试不同的header设置 ===")
# 使用第1行作为header
try:
df1 = pd.read_excel(latest_file, sheet_name='总览', header=0)
print(f"header=0 (第1行): {list(df1.columns)}")
except Exception as e:
print(f"header=0 失败: {e}")
# 使用第2行作为header
try:
df2 = pd.read_excel(latest_file, sheet_name='总览', header=1)
print(f"header=1 (第2行): {list(df2.columns)}")
except Exception as e:
print(f"header=1 失败: {e}")
# 使用第3行作为header
try:
df3 = pd.read_excel(latest_file, sheet_name='总览', header=2)
print(f"header=2 (第3行): {list(df3.columns)}")
except Exception as e:
print(f"header=2 失败: {e}")
# 使用第4行作为header
try:
df4 = pd.read_excel(latest_file, sheet_name='总览', header=3)
print(f"header=3 (第4行): {list(df4.columns)}")
except Exception as e:
print(f"header=3 失败: {e}")
except Exception as e:
print(f"检查失败: {e}")
if __name__ == "__main__":
debug_excel_structure()
\ No newline at end of file
#!/bin/bash
# 患者画像回访话术系统 - 自动化部署脚本
# 用于GitLab CI/CD自动化部署
set -e # 遇到错误立即退出
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 配置变量
DB_HOST="${DB_HOST:-localhost}"
DB_PORT="${DB_PORT:-3306}"
DB_USER="${DB_USER:-callback_user}"
DB_PASSWORD="${DB_PASSWORD:-dev_password_123}"
DB_NAME="${DB_NAME:-callback_system}"
BACKUP_DIR="/backup"
DEPLOY_ENV="${DEPLOY_ENV:-production}"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
log_info "开始部署到 ${DEPLOY_ENV} 环境"
log_info "数据库配置: ${DB_HOST}:${DB_PORT}/${DB_NAME}"
# 第一步:数据库备份
backup_database() {
log_info "开始数据库备份..."
# 创建备份目录
mkdir -p ${BACKUP_DIR}
# 执行数据库备份
if mysqldump -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} \
--single-transaction \
--routines \
--triggers \
--databases ${DB_NAME} \
> ${BACKUP_DIR}/backup_${DB_NAME}_${TIMESTAMP}.sql; then
BACKUP_FILE="${BACKUP_DIR}/backup_${DB_NAME}_${TIMESTAMP}.sql"
BACKUP_SIZE=$(du -h ${BACKUP_FILE} | cut -f1)
log_success "数据库备份完成: ${BACKUP_FILE} (大小: ${BACKUP_SIZE})"
# 验证备份文件
if [ -s ${BACKUP_FILE} ]; then
log_success "备份文件验证成功"
else
log_error "备份文件为空,备份失败"
exit 1
fi
else
log_error "数据库备份失败"
exit 1
fi
}
# 第二步:数据完整性检查
check_data_integrity() {
log_info "开始数据完整性检查..."
# 检查孤立数据
log_info "检查孤立数据..."
ORPHAN_COUNT=$(mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -s -N -e "
SELECT COUNT(*) FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
")
if [ "$ORPHAN_COUNT" -gt 0 ]; then
log_warning "发现 ${ORPHAN_COUNT} 条孤立数据"
# 显示孤立数据详情
mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -e "
SELECT cr.case_number, cr.callback_result, cr.operator, cr.create_time
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
"
# 询问是否继续
if [ "${AUTO_CLEANUP}" = "true" ]; then
log_info "自动清理孤立数据..."
cleanup_orphan_data
else
log_warning "请手动处理孤立数据后再继续部署"
exit 1
fi
else
log_success "数据完整性检查通过,无孤立数据"
fi
# 检查患者-诊所映射
log_info "检查患者-诊所映射..."
mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -e "
SELECT clinic_name, COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC;
"
}
# 清理孤立数据
cleanup_orphan_data() {
log_info "开始清理孤立数据..."
# 备份孤立数据
mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -e "
SELECT cr.* INTO OUTFILE '/tmp/orphan_data_${TIMESTAMP}.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '\"'
LINES TERMINATED BY '\n'
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
"
# 删除孤立数据
DELETED_COUNT=$(mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -s -N -e "
DELETE FROM callback_records
WHERE case_number IN (
SELECT case_number FROM (
SELECT cr.case_number
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL
) AS temp
);
")
log_success "清理完成,删除了 ${DELETED_COUNT} 条孤立数据"
}
# 第三步:代码部署
deploy_code() {
log_info "开始代码部署..."
# 备份当前代码
if [ -d "/app" ]; then
log_info "备份当前代码..."
cp -r /app /app_backup_${TIMESTAMP}
log_success "代码备份完成: /app_backup_${TIMESTAMP}"
fi
# 部署新代码
log_info "部署新代码..."
if [ -d "/tmp/app" ]; then
cp -r /tmp/app/* /app/
log_success "代码部署完成"
else
log_error "新代码目录不存在: /tmp/app"
exit 1
fi
# 设置权限
chmod +x /app/*.py
chmod +x /app/deploy_scripts/*.sh
}
# 第四步:依赖更新
update_dependencies() {
log_info "更新Python依赖..."
if [ -f "/app/requirements.txt" ]; then
pip install -r /app/requirements.txt --upgrade
log_success "依赖更新完成"
else
log_warning "requirements.txt不存在,跳过依赖更新"
fi
}
# 第五步:应用重启
restart_application() {
log_info "重启应用..."
# 如果是Docker环境
if command -v docker &> /dev/null; then
log_info "检测到Docker环境,重启容器..."
# 查找应用容器
CONTAINER_NAME=$(docker ps --filter "ancestor=patient-callback-app" --format "{{.Names}}" | head -1)
if [ -n "$CONTAINER_NAME" ]; then
log_info "重启容器: ${CONTAINER_NAME}"
docker restart ${CONTAINER_NAME}
log_success "容器重启完成"
else
log_warning "未找到应用容器,请手动重启"
fi
else
log_info "非Docker环境,请手动重启应用"
fi
}
# 第六步:部署后验证
verify_deployment() {
log_info "开始部署后验证..."
# 等待应用启动
sleep 10
# 检查应用健康状态
if [ -n "${APP_URL}" ]; then
log_info "检查应用健康状态: ${APP_URL}/api/health"
for i in {1..30}; do
if curl -f -s "${APP_URL}/api/health" > /dev/null; then
log_success "应用健康检查通过"
break
else
log_warning "应用健康检查失败,等待重试... (${i}/30)"
sleep 2
fi
if [ $i -eq 30 ]; then
log_error "应用健康检查失败,部署可能有问题"
return 1
fi
done
fi
# 验证数据库连接
log_info "验证数据库连接..."
if mysql -h ${DB_HOST} -P ${DB_PORT} -u ${DB_USER} -p${DB_PASSWORD} ${DB_NAME} -e "SELECT 1;" > /dev/null 2>&1; then
log_success "数据库连接正常"
else
log_error "数据库连接失败"
return 1
fi
log_success "部署后验证完成"
}
# 第七步:回滚准备
prepare_rollback() {
log_info "准备回滚方案..."
# 创建回滚脚本
cat > /tmp/rollback_${TIMESTAMP}.sh << EOF
#!/bin/bash
# 回滚脚本 - 生成时间: ${TIMESTAMP}
log_error() {
echo -e "\033[0;31m[ERROR]\033[0m \$1"
}
log_success() {
echo -e "\033[0;32m[SUCCESS]\033[0m \$1"
}
echo "开始回滚到部署前状态..."
# 1. 恢复代码
if [ -d "/app_backup_${TIMESTAMP}" ]; then
rm -rf /app
cp -r /app_backup_${TIMESTAMP} /app
log_success "代码回滚完成"
else
log_error "备份目录不存在,无法回滚代码"
fi
# 2. 重启应用
if command -v docker &> /dev/null; then
CONTAINER_NAME=\$(docker ps --filter "ancestor=patient-callback-app" --format "{{.Names}}" | head -1)
if [ -n "\$CONTAINER_NAME" ]; then
docker restart \$CONTAINER_NAME
log_success "应用重启完成"
fi
fi
echo "回滚完成"
EOF
chmod +x /tmp/rollback_${TIMESTAMP}.sh
log_success "回滚脚本已准备: /tmp/rollback_${TIMESTAMP}.sh"
}
# 主函数
main() {
log_info "=== 患者画像回访话术系统部署开始 ==="
# 检查必要参数
if [ -z "${DB_HOST}" ] || [ -z "${DB_USER}" ] || [ -z "${DB_PASSWORD}" ]; then
log_error "缺少必要的环境变量: DB_HOST, DB_USER, DB_PASSWORD"
exit 1
fi
# 执行部署步骤
backup_database
check_data_integrity
deploy_code
update_dependencies
restart_application
verify_deployment
prepare_rollback
log_success "=== 部署完成 ==="
log_info "备份文件: ${BACKUP_FILE}"
log_info "回滚脚本: /tmp/rollback_${TIMESTAMP}.sh"
log_info "如果出现问题,请使用回滚脚本恢复"
}
# 执行主函数
main "$@"
\ No newline at end of file
# GitLab CI/CD 环境变量配置说明
## 🚨 重要提醒
在GitLab项目中设置这些环境变量时,请确保:
1. 生产环境的敏感信息(如数据库密码)设置为"Protected"和"Masked"
2. 只有项目维护者和管理员可以访问这些变量
3. 定期更新密码和密钥
## 🔧 必需的环境变量
### 生产环境数据库配置
| 变量名 | 说明 | 示例值 | 是否必需 |
|--------|------|--------|----------|
| `PROD_DB_HOST` | 生产环境MySQL主机地址 | `192.168.1.100` | ✅ |
| `PROD_DB_PORT` | 生产环境MySQL端口 | `3306` | ✅ |
| `PROD_DB_USER` | 生产环境MySQL用户名 | `callback_user` | ✅ |
| `PROD_DB_PASSWORD` | 生产环境MySQL密码 | `your_production_password` | ✅ |
| `PROD_DB_NAME` | 生产环境数据库名 | `callback_system` | ✅ |
### 生产环境应用配置
| 变量名 | 说明 | 示例值 | 是否必需 |
|--------|------|--------|----------|
| `PROD_APP_URL` | 生产环境应用访问地址 | `https://your-app.com` | ✅ |
| `PROD_SSH_HOST` | 生产环境服务器IP地址 | `192.168.1.100` | ✅ |
| `PROD_SSH_USER` | 生产环境SSH用户名 | `deploy` | ✅ |
| `PROD_SSH_PRIVATE_KEY` | 生产环境SSH私钥 | `-----BEGIN OPENSSH PRIVATE KEY-----...` | ✅ |
| `PROD_SSH_KNOWN_HOSTS` | 生产环境SSH公钥指纹 | `192.168.1.100 ssh-rsa AAAAB3NzaC1...` | ✅ |
| `PROD_APP_DIR` | 生产环境应用目录 | `/opt/patient-callback-system` | ✅ |
### 测试环境配置(可选)
| 变量名 | 说明 | 示例值 | 是否必需 |
|--------|------|--------|----------|
| `TEST_APP_URL` | 测试环境应用访问地址 | `https://test.your-app.com` | ❌ |
| `TEST_SSH_HOST` | 测试环境服务器IP地址 | `192.168.1.101` | ❌ |
| `TEST_SSH_USER` | 测试环境SSH用户名 | `deploy` | ❌ |
| `TEST_SSH_PRIVATE_KEY` | 测试环境SSH私钥 | `-----BEGIN OPENSSH PRIVATE KEY-----...` | ❌ |
| `TEST_SSH_KNOWN_HOSTS` | 测试环境SSH公钥指纹 | `192.168.1.101 ssh-rsa AAAAB3NzaC1...` | ❌ |
| `TEST_APP_DIR` | 测试环境应用目录 | `/opt/patient-callback-system-test` | ❌ |
## 🛠️ 如何设置环境变量
### 1. 在GitLab项目中设置
1. 进入项目 → Settings → CI/CD
2. 展开"Variables"部分
3. 点击"Add Variable"按钮
4. 填写变量信息:
- **Key**: 变量名(如 `PROD_DB_HOST`
- **Value**: 变量值
- **Type**: 选择类型
- **Environment scope**: 选择环境范围
- **Protect variable**: 是否保护变量
- **Mask variable**: 是否掩码变量
### 2. 变量类型说明
- **Variable**: 普通变量
- **File**: 文件类型(如SSH私钥)
- **Protected**: 只在保护分支中可用
- **Masked**: 在日志中隐藏值
### 3. 环境范围说明
- **All (default)**: 所有环境
- **Production**: 仅生产环境
- **Staging**: 仅测试环境
## 🔐 SSH密钥配置
### 1. 生成SSH密钥对
```bash
# 在本地生成SSH密钥对
ssh-keygen -t rsa -b 4096 -C "deploy@your-company.com" -f ~/.ssh/gitlab_deploy
# 生成公钥指纹
ssh-keygen -lf ~/.ssh/gitlab_deploy.pub
```
### 2. 配置服务器
```bash
# 将公钥添加到生产环境服务器
ssh-copy-id -i ~/.ssh/gitlab_deploy.pub deploy@your-production-server
# 测试SSH连接
ssh deploy@your-production-server
```
### 3. 在GitLab中设置
- **Key**: `PROD_SSH_PRIVATE_KEY`
- **Value**: 复制 `~/.ssh/gitlab_deploy` 的完整内容
- **Type**: File
- **Protect variable**: ✅
- **Mask variable**: ✅
## 📋 部署流程说明
### 1. 自动触发
- 推送到 `develop` 分支 → 自动部署到测试环境
- 推送到 `main` 分支 → 触发生产环境部署(需要手动确认)
### 2. 手动触发
- 生产环境部署:在GitLab CI/CD界面手动触发 `deploy_production` 任务
- 生产环境回滚:在GitLab CI/CD界面手动触发 `rollback_production` 任务
### 3. 部署步骤
1. **数据库备份** - 自动备份生产数据库
2. **数据完整性检查** - 检查孤立数据和映射关系
3. **代码部署** - 部署新版本代码
4. **依赖更新** - 更新Python依赖
5. **应用重启** - 重启应用服务
6. **部署验证** - 验证部署结果
7. **回滚准备** - 准备回滚脚本
## ⚠️ 安全注意事项
### 1. 访问控制
- 限制SSH用户权限,只允许必要的操作
- 使用专门的部署用户,不要使用root用户
- 定期轮换SSH密钥
### 2. 数据库安全
- 生产数据库用户只授予必要权限
- 定期更新数据库密码
- 限制数据库访问IP地址
### 3. 网络安全
- 使用防火墙限制访问
- 考虑使用VPN或专用网络
- 监控异常访问日志
## 🚀 快速开始
### 1. 设置基本变量
```bash
# 必需的生产环境变量
PROD_DB_HOST=your-db-host
PROD_DB_PORT=3306
PROD_DB_USER=callback_user
PROD_DB_PASSWORD=your-secure-password
PROD_DB_NAME=callback_system
PROD_APP_URL=https://your-app.com
PROD_SSH_HOST=your-server-ip
PROD_SSH_USER=deploy
```
### 2. 测试连接
```bash
# 测试数据库连接
mysql -h $PROD_DB_HOST -P $PROD_DB_PORT -u $PROD_DB_USER -p$PROD_DB_PASSWORD -e "SELECT 1;"
# 测试SSH连接
ssh $PROD_SSH_USER@$PROD_SSH_HOST "echo 'SSH连接成功'"
```
### 3. 首次部署
1. 推送代码到 `main` 分支
2. 在GitLab CI/CD界面手动触发 `deploy_production`
3. 监控部署日志
4. 验证部署结果
## 📞 故障排除
### 常见问题
1. **SSH连接失败** - 检查SSH密钥和服务器配置
2. **数据库连接失败** - 检查数据库配置和网络连接
3. **权限不足** - 检查用户权限和文件权限
4. **部署脚本执行失败** - 检查脚本权限和执行环境
### 获取帮助
- 查看GitLab CI/CD日志
- 检查服务器系统日志
- 联系系统管理员或DevOps团队
\ No newline at end of file
-- 患者画像回访话术系统 - 数据库迁移脚本
-- 用于GitLab CI/CD自动化部署时的数据库操作
-- 设置字符集
SET NAMES utf8mb4;
SET CHARACTER SET utf8mb4;
SET character_set_connection=utf8mb4;
-- 开始事务
START TRANSACTION;
-- 1. 检查数据库版本
CREATE TABLE IF NOT EXISTS schema_version (
id INT AUTO_INCREMENT PRIMARY KEY,
version VARCHAR(50) NOT NULL,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT
);
-- 2. 检查孤立数据(回访记录中找不到对应患者的记录)
-- 这些数据可能是测试数据,需要清理
SELECT
'孤立数据检查' as check_type,
COUNT(*) as count,
'callback_records表中找不到对应patients记录的条数' as description
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
-- 3. 检查患者-诊所映射完整性
SELECT
'患者-诊所映射检查' as check_type,
COUNT(*) as total_patients,
COUNT(DISTINCT clinic_name) as total_clinics
FROM patients;
-- 4. 检查回访记录分布
SELECT
'回访记录分布检查' as check_type,
COALESCE(p.clinic_name, '未知诊所') as clinic_name,
COUNT(*) as callback_count,
SUM(CASE WHEN cr.callback_result = '成功' THEN 1 ELSE 0 END) as success_count,
SUM(CASE WHEN cr.callback_result = '不成功' THEN 1 ELSE 0 END) as unsuccessful_count,
SUM(CASE WHEN cr.callback_result = '放弃回访' THEN 1 ELSE 0 END) as abandon_count
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
GROUP BY COALESCE(p.clinic_name, '未知诊所')
ORDER BY callback_count DESC;
-- 5. 检查数据一致性(可选:清理孤立数据)
-- 注意:这个操作会删除测试数据,请谨慎使用
-- 如果需要自动清理,取消下面的注释
/*
-- 备份孤立数据到临时表
CREATE TEMPORARY TABLE IF NOT EXISTS temp_orphan_data AS
SELECT cr.* FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
-- 删除孤立数据
DELETE FROM callback_records
WHERE case_number IN (
SELECT case_number FROM temp_orphan_data
);
-- 显示清理结果
SELECT
'孤立数据清理完成' as operation,
(SELECT COUNT(*) FROM temp_orphan_data) as cleaned_count;
*/
-- 6. 验证关键数据
SELECT
'数据验证' as check_type,
(SELECT COUNT(*) FROM patients) as total_patients,
(SELECT COUNT(*) FROM callback_records) as total_callbacks,
(SELECT COUNT(*) FROM clinics) as total_clinics,
(SELECT COUNT(*) FROM users) as total_users;
-- 7. 检查索引和性能
SHOW INDEX FROM callback_records;
SHOW INDEX FROM patients;
-- 8. 更新数据库版本记录
INSERT INTO schema_version (version, description)
VALUES ('1.1.0', '患者画像回访话术系统 - 导出功能优化和数据库连接修复')
ON DUPLICATE KEY UPDATE
version = VALUES(version),
applied_at = CURRENT_TIMESTAMP,
description = VALUES(description);
-- 提交事务
COMMIT;
-- 显示部署完成信息
SELECT
'部署完成' as status,
NOW() as completed_at,
'数据库迁移和验证完成' as message;
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
部署后验证脚本
用于验证患者画像回访话术系统的部署是否成功
"""
import requests
import pymysql
import json
import time
import sys
from datetime import datetime
class DeploymentVerifier:
"""部署验证器"""
def __init__(self, app_url, db_config):
self.app_url = app_url
self.db_config = db_config
self.test_results = []
def log_test(self, test_name, success, message=""):
"""记录测试结果"""
status = "✅ PASS" if success else "❌ FAIL"
timestamp = datetime.now().strftime("%H:%M:%S")
result = f"[{timestamp}] {status} {test_name}"
if message:
result += f" - {message}"
print(result)
self.test_results.append({
'test_name': test_name,
'success': success,
'message': message,
'timestamp': timestamp
})
return success
def test_app_health(self):
"""测试应用健康状态"""
try:
response = requests.get(f"{self.app_url}/api/health", timeout=10)
if response.status_code == 200:
return self.log_test("应用健康检查", True, "应用正常响应")
else:
return self.log_test("应用健康检查", False, f"状态码: {response.status_code}")
except Exception as e:
return self.log_test("应用健康检查", False, f"连接失败: {e}")
def test_database_connection(self):
"""测试数据库连接"""
try:
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
# 测试基本查询
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result and result[0] == 1:
cursor.close()
connection.close()
return self.log_test("数据库连接", True, "数据库连接正常")
else:
cursor.close()
connection.close()
return self.log_test("数据库连接", False, "查询结果异常")
except Exception as e:
return self.log_test("数据库连接", False, f"连接失败: {e}")
def test_data_integrity(self):
"""测试数据完整性"""
try:
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
# 检查孤立数据
cursor.execute("""
SELECT COUNT(*) FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL
""")
orphan_count = cursor.fetchone()[0]
if orphan_count == 0:
result = self.log_test("数据完整性检查", True, "无孤立数据")
else:
result = self.log_test("数据完整性检查", False, f"发现 {orphan_count} 条孤立数据")
# 检查患者-诊所映射
cursor.execute("SELECT COUNT(*) FROM patients")
patient_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM callback_records")
callback_count = cursor.fetchone()[0]
self.log_test("数据统计", True, f"患者: {patient_count}, 回访记录: {callback_count}")
cursor.close()
connection.close()
return result
except Exception as e:
return self.log_test("数据完整性检查", False, f"检查失败: {e}")
def test_export_functionality(self):
"""测试导出功能(需要登录)"""
try:
# 尝试访问导出接口,应该返回401未登录
response = requests.get(f"{self.app_url}/api/export-data", timeout=10)
if response.status_code == 401:
return self.log_test("导出功能访问控制", True, "正确返回未登录状态")
else:
return self.log_test("导出功能访问控制", False, f"状态码: {response.status_code}")
except Exception as e:
return self.log_test("导出功能访问控制", False, f"测试失败: {e}")
def test_clinic_statistics(self):
"""测试诊所统计功能"""
try:
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
# 检查马山门诊的回访统计
cursor.execute("""
SELECT
COALESCE(p.clinic_name, '未知诊所') as clinic_name,
COUNT(*) as callback_count,
SUM(CASE WHEN cr.callback_result = '成功' THEN 1 ELSE 0 END) as success_count
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.clinic_name = '马山门诊' OR p.clinic_name IS NULL
GROUP BY COALESCE(p.clinic_name, '未知诊所')
""")
results = cursor.fetchall()
for clinic_name, callback_count, success_count in results:
if clinic_name == '马山门诊':
if success_count > 0:
return self.log_test("马山门诊统计", True, f"成功回访: {success_count} 个")
else:
return self.log_test("马山门诊统计", False, "成功回访数为0")
elif clinic_name == '未知诊所':
return self.log_test("马山门诊统计", False, f"仍有 {callback_count} 条未知诊所数据")
cursor.close()
connection.close()
return self.log_test("马山门诊统计", False, "未找到马山门诊数据")
except Exception as e:
return self.log_test("马山门诊统计", False, f"检查失败: {e}")
def test_environment_variables(self):
"""测试环境变量配置"""
try:
connection = pymysql.connect(**self.db_config)
cursor = connection.cursor()
# 检查数据库字符集
cursor.execute("SHOW VARIABLES LIKE 'character_set_database'")
charset_result = cursor.fetchone()
if charset_result and 'utf8mb4' in charset_result[1]:
charset_ok = True
else:
charset_ok = False
cursor.close()
connection.close()
if charset_ok:
return self.log_test("环境变量配置", True, "字符集配置正确")
else:
return self.log_test("环境变量配置", False, "字符集配置可能有问题")
except Exception as e:
return self.log_test("环境变量配置", False, f"检查失败: {e}")
def run_all_tests(self):
"""运行所有测试"""
print("🚀 开始部署后验证...")
print("=" * 50)
# 等待应用完全启动
print("⏳ 等待应用启动...")
time.sleep(5)
# 运行测试
tests = [
("应用健康检查", self.test_app_health),
("数据库连接", self.test_database_connection),
("数据完整性", self.test_data_integrity),
("导出功能访问控制", self.test_export_functionality),
("马山门诊统计", self.test_clinic_statistics),
("环境变量配置", self.test_environment_variables)
]
passed_tests = 0
total_tests = len(tests)
for test_name, test_func in tests:
if test_func():
passed_tests += 1
# 输出测试结果摘要
print("=" * 50)
print(f"📊 测试结果摘要:")
print(f" 总测试数: {total_tests}")
print(f" 通过测试: {passed_tests}")
print(f" 失败测试: {total_tests - passed_tests}")
print(f" 成功率: {(passed_tests / total_tests) * 100:.1f}%")
if passed_tests == total_tests:
print("🎉 所有测试通过!部署成功!")
return True
else:
print("⚠️ 部分测试失败,请检查部署配置")
return False
def main():
"""主函数"""
# 配置信息(从环境变量或命令行参数获取)
import os
app_url = os.environ.get('APP_URL', 'http://localhost:4002')
db_config = {
'host': os.environ.get('DB_HOST', 'localhost'),
'port': int(os.environ.get('DB_PORT', 3307)),
'user': os.environ.get('DB_USER', 'callback_user'),
'password': os.environ.get('DB_PASSWORD', 'dev_password_123'),
'database': os.environ.get('DB_NAME', 'callback_system'),
'charset': 'utf8mb4'
}
print(f"🔧 验证配置:")
print(f" 应用地址: {app_url}")
print(f" 数据库: {db_config['host']}:{db_config['port']}/{db_config['database']}")
print()
# 创建验证器并运行测试
verifier = DeploymentVerifier(app_url, db_config)
success = verifier.run_all_tests()
# 退出码
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
查找特定患者在数据库中的位置
"""
import pymysql
def find_patient(case_number):
"""查找特定患者在数据库中的位置"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print(f"=== 查找患者 {case_number} ===")
# 1. 在patients表中查找
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name,
created_at
FROM patients
WHERE case_number = %s
""", (case_number,))
patient_record = cursor.fetchone()
if patient_record:
case_num, name, clinic, created_at = patient_record
print(f"在patients表中找到:")
print(f" 病历号: {case_num}")
print(f" 姓名: {name}")
print(f" 诊所: {clinic}")
print(f" 创建时间: {created_at}")
else:
print(f"在patients表中未找到患者 {case_number}")
# 2. 在callback_records表中查找
cursor.execute("""
SELECT
case_number,
callback_result,
operator,
create_time
FROM callback_records
WHERE case_number = %s
""", (case_number,))
callback_records = cursor.fetchall()
if callback_records:
print(f"\n在callback_records表中找到 {len(callback_records)} 条回访记录:")
for record in callback_records:
case_num, result, operator, create_time = record
print(f" 病历号: {case_num}")
print(f" 回访结果: {result}")
print(f" 操作员: {operator}")
print(f" 创建时间: {create_time}")
else:
print(f"\n在callback_records表中未找到患者 {case_number} 的回访记录")
# 3. 检查所有TS0C开头的患者分布
print(f"\n=== 所有TS0C开头患者的诊所分布 ===")
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as count
FROM patients
WHERE case_number LIKE 'TS0C%'
GROUP BY clinic_name
ORDER BY count DESC
""")
ts0c_distribution = cursor.fetchall()
for clinic_name, count in ts0c_distribution:
print(f" {clinic_name}: {count} 人")
# 4. 检查TS0C005909附近的患者
print(f"\n=== TS0C005909附近的患者 ===")
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name
FROM patients
WHERE case_number LIKE 'TS0C%'
ORDER BY case_number
""")
ts0c_patients = cursor.fetchall()
# 找到TS0C005909的位置
target_index = -1
for i, (case_num, name, clinic) in enumerate(ts0c_patients):
if case_num == case_number:
target_index = i
break
if target_index != -1:
print(f"TS0C005909 在TS0C患者列表中的位置: {target_index + 1}")
# 显示前后各5个患者
start_index = max(0, target_index - 5)
end_index = min(len(ts0c_patients), target_index + 6)
print(f"前后患者列表:")
for i in range(start_index, end_index):
case_num, name, clinic = ts0c_patients[i]
marker = " >>> " if i == target_index else " "
print(f"{marker}{case_num}: {name} - {clinic}")
else:
print(f"未找到TS0C005909在TS0C患者列表中的位置")
cursor.close()
connection.close()
except Exception as e:
print(f"查找失败: {e}")
if __name__ == "__main__":
find_patient("TS0C005909")
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修复马山门诊患者诊所分配的脚本
"""
import pymysql
import json
def fix_mashan_patients():
"""修复马山门诊患者的诊所分配"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 修复马山门诊患者诊所分配 ===")
# 1. 查找所有TS0I开头的患者
cursor.execute("""
SELECT
case_number,
patient_name,
clinic_name
FROM patients
WHERE case_number LIKE 'TS0I%'
""")
ts0i_patients = cursor.fetchall()
print(f"找到 {len(ts0i_patients)} 个TS0I开头的患者:")
for patient in ts0i_patients:
case_number, name, clinic = patient
print(f" {case_number}: {name} - 当前诊所: {clinic}")
# 2. 更新这些患者的诊所为马山门诊
if ts0i_patients:
cursor.execute("""
UPDATE patients
SET clinic_name = '马山门诊'
WHERE case_number LIKE 'TS0I%'
""")
updated_count = cursor.rowcount
print(f"\n已更新 {updated_count} 个患者的诊所为马山门诊")
# 提交更改
connection.commit()
# 3. 验证更新结果
cursor.execute("""
SELECT COUNT(*)
FROM patients
WHERE case_number LIKE 'TS0I%' AND clinic_name = '马山门诊'
""")
verified_count = cursor.fetchone()[0]
print(f"验证结果: {verified_count} 个患者已正确分配到马山门诊")
# 4. 检查马山门诊的总患者数
cursor.execute("""
SELECT COUNT(*)
FROM patients
WHERE clinic_name = '马山门诊'
""")
total_mashan = cursor.fetchone()[0]
print(f"马山门诊总患者数: {total_mashan}")
# 5. 检查其他诊所的TS0I患者是否还有剩余
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as count
FROM patients
WHERE case_number LIKE 'TS0I%'
GROUP BY clinic_name
""")
print(f"\nTS0I患者在各诊所的分布:")
for clinic_name, count in cursor.fetchall():
print(f" {clinic_name}: {count} 人")
cursor.close()
connection.close()
print("\n修复完成!")
except Exception as e:
print(f"修复失败: {e}")
if __name__ == "__main__":
fix_mashan_patients()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
修复数据库中的患者分配,使其与JSON文件保持一致
"""
import pymysql
import json
import os
def fix_patient_distribution():
"""修复数据库中的患者分配"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
# 根据JSON文件确定的正确诊所分配
correct_distribution = {
'TS0G': '大丰门诊', # 大丰门诊主要前缀
'TS0F': '东亭门诊', # 东亭门诊主要前缀
'TS0E': '惠山门诊', # 惠山门诊主要前缀
'TS0L': '新吴门诊', # 新吴门诊主要前缀
'TS0C': '河埒门诊', # 河埒门诊主要前缀
'TS0I': '红豆门诊', # 红豆门诊主要前缀
'TS0J': '马山门诊', # 马山门诊主要前缀
'TS0K': '通善口腔医院', # 通善口腔医院主要前缀
'TS0M': '学前街门诊', # 学前街门诊主要前缀
'TS0B': '红豆门诊', # 红豆门诊次要前缀
'JY0A': '河埒门诊' # 河埒门诊次要前缀
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 修复数据库中的患者分配 ===")
print("根据JSON文件确定的正确诊所分配:")
for prefix, clinic in correct_distribution.items():
print(f" {prefix} -> {clinic}")
# 1. 统计修复前的分布
print(f"\n=== 修复前的患者分布 ===")
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC
""")
before_distribution = cursor.fetchall()
for clinic_name, count in before_distribution:
print(f" {clinic_name}: {count} 人")
# 2. 修复患者分配
print(f"\n=== 开始修复患者分配 ===")
updated_count = 0
for prefix, correct_clinic in correct_distribution.items():
# 查找所有以该前缀开头的患者
cursor.execute("""
SELECT COUNT(*)
FROM patients
WHERE case_number LIKE %s
""", (f"{prefix}%",))
total_patients = cursor.fetchone()[0]
if total_patients > 0:
# 更新这些患者的诊所分配
cursor.execute("""
UPDATE patients
SET clinic_name = %s
WHERE case_number LIKE %s
""", (correct_clinic, f"{prefix}%"))
updated_patients = cursor.rowcount
updated_count += updated_patients
print(f" {prefix} -> {correct_clinic}: 更新了 {updated_patients} 个患者")
# 提交更改
connection.commit()
print(f"\n总共更新了 {updated_count} 个患者的诊所分配")
# 3. 统计修复后的分布
print(f"\n=== 修复后的患者分布 ===")
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC
""")
after_distribution = cursor.fetchall()
for clinic_name, count in after_distribution:
print(f" {clinic_name}: {count} 人")
# 4. 验证关键诊所的数据
print(f"\n=== 关键诊所数据验证 ===")
# 验证马山门诊 (应该主要是TS0J)
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM patients
WHERE clinic_name = '马山门诊'
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"马山门诊病历号前缀分布:")
for prefix, count in cursor.fetchall():
print(f" {prefix}: {count} 人")
# 验证红豆门诊 (应该主要是TS0I)
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM patients
WHERE clinic_name = '红豆门诊'
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"\n红豆门诊病历号前缀分布:")
for prefix, count in cursor.fetchall():
print(f" {prefix}: {count} 人")
cursor.close()
connection.close()
print(f"\n修复完成!")
except Exception as e:
print(f"修复失败: {e}")
if __name__ == "__main__":
fix_patient_distribution()
\ No newline at end of file
# 生产环境安全上线操作指南
## 🚨 重要提醒
在生产环境执行任何操作前,必须先进行完整备份!
## 第一步:生产环境数据库备份
### 1.1 创建备份脚本
```bash
#!/bin/bash
# 生产环境数据库备份脚本
# 文件名: backup_production_db.sh
# 设置变量
DB_HOST="生产环境MySQL主机"
DB_USER="生产环境用户名"
DB_NAME="生产环境数据库名"
BACKUP_DIR="/backup"
DATE=$(date +%Y%m%d_%H%M%S)
# 创建备份目录
mkdir -p $BACKUP_DIR
# 执行备份
mysqldump -h $DB_HOST -u $DB_USER -p \
--single-transaction \
--routines \
--triggers \
--databases $DB_NAME \
> $BACKUP_DIR/production_backup_$DATE.sql
echo "✅ 生产环境数据库备份完成: production_backup_$DATE.sql"
echo "📁 备份文件位置: $BACKUP_DIR/production_backup_$DATE.sql"
```
### 1.2 手动备份命令(推荐)
```bash
# 如果无法使用脚本,直接执行以下命令
mysqldump -h [生产环境主机] -u [用户名] -p \
--single-transaction \
--routines \
--triggers \
--databases callback_system \
> production_backup_$(date +%Y%m%d_%H%M%S).sql
```
## 第二步:生产环境数据完整性检查
### 2.1 检查孤立数据
```sql
-- 检查是否有找不到诊所信息的回访记录
SELECT
cr.case_number,
cr.callback_result,
cr.operator,
cr.create_time
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
WHERE p.case_number IS NULL;
```
### 2.2 检查患者-诊所映射
```sql
-- 检查每个诊所的患者数量
SELECT
clinic_name,
COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC;
```
### 2.3 检查回访记录分布
```sql
-- 检查回访记录的诊所分布
SELECT
COALESCE(p.clinic_name, '未知诊所') as clinic_name,
COUNT(*) as callback_count
FROM callback_records cr
LEFT JOIN patients p ON cr.case_number = p.case_number
GROUP BY COALESCE(p.clinic_name, '未知诊所')
ORDER BY callback_count DESC;
```
## 第三步:代码部署
### 3.1 部署前检查清单
- [ ] 生产环境数据库已备份
- [ ] 测试环境功能验证通过
- [ ] 代码变更已审查
- [ ] 回滚方案已准备
### 3.2 部署步骤
1. **停止生产环境应用**
2. **备份当前代码版本**
3. **部署新代码**
4. **重启应用**
5. **验证功能正常**
## 第四步:上线后验证
### 4.1 功能验证
- [ ] 应用正常启动
- [ ] 数据库连接正常
- [ ] 导出功能正常
- [ ] 数据统计准确
### 4.2 数据验证
- [ ] 回访记录数量正确
- [ ] 患者-诊所映射正确
- [ ] 统计结果合理
## 第五步:监控和回滚准备
### 5.1 监控指标
- 应用响应时间
- 数据库连接状态
- 导出功能成功率
- 数据统计准确性
### 5.2 回滚方案
如果发现问题,立即回滚:
1. 停止新版本应用
2. 恢复旧版本代码
3. 重启应用
4. 验证功能正常
## 📞 紧急联系方式
- 技术支持:[联系方式]
- 数据库管理员:[联系方式]
- 项目负责人:[联系方式]
## ⚠️ 注意事项
1. **备份是必须的** - 没有备份不要执行任何操作
2. **先在测试环境验证** - 确保所有功能正常
3. **准备回滚方案** - 出现问题能快速恢复
4. **监控上线效果** - 及时发现问题
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
从JSON文件重新构建患者-诊所对应关系的脚本
清空现有数据,完全基于JSON文件重建
"""
import pymysql
import json
import os
from datetime import datetime
def rebuild_patients_from_json():
"""从JSON文件重新构建患者-诊所对应关系"""
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3307,
'user': 'callback_user',
'password': 'dev_password_123',
'database': 'callback_system',
'charset': 'utf8mb4'
}
try:
# 连接数据库
connection = pymysql.connect(**db_config)
cursor = connection.cursor()
print("=== 从JSON文件重新构建患者-诊所对应关系 ===")
# 1. 清空现有患者数据
print("\n1. 清空现有患者数据...")
cursor.execute("DELETE FROM patients")
deleted_count = cursor.rowcount
print(f" 删除了 {deleted_count} 条现有患者记录")
# 2. 重置自增ID
cursor.execute("ALTER TABLE patients AUTO_INCREMENT = 1")
print(" 重置了自增ID")
# 3. 从JSON文件读取并插入患者数据
json_dir = "诊所患者json"
total_inserted = 0
print(f"\n2. 从JSON文件读取患者数据...")
for filename in os.listdir(json_dir):
if filename.endswith('.json') and filename != 'conversion_summary.json':
clinic_name = filename.replace('.json', '')
file_path = os.path.join(json_dir, filename)
print(f"\n 处理 {clinic_name}...")
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
patients = data
elif isinstance(data, dict) and 'patients' in data:
patients = data['patients']
else:
patients = []
# 插入患者数据
insert_sql = """
INSERT INTO patients (
case_number, patient_name, clinic_name,
gender, age, created_at, updated_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
clinic_inserted = 0
for patient in patients:
if isinstance(patient, dict):
case_number = patient.get('病历号', '')
name = patient.get('姓名', '')
gender = patient.get('性别', '')
age = patient.get('年龄', None)
if case_number: # 只插入有病历号的患者
try:
cursor.execute(insert_sql, (
case_number,
name if name else None,
clinic_name,
gender if gender else None,
age if age else None,
datetime.now(),
datetime.now()
))
clinic_inserted += 1
except Exception as e:
print(f" 插入患者 {case_number} 失败: {e}")
total_inserted += clinic_inserted
print(f" {clinic_name}: 插入了 {clinic_inserted} 个患者")
except Exception as e:
print(f" 读取 {filename} 失败: {e}")
# 4. 提交更改
connection.commit()
print(f"\n3. 数据重建完成!")
print(f" 总共插入了 {total_inserted} 个患者")
# 5. 验证重建结果
print(f"\n4. 验证重建结果...")
# 统计各诊所患者数量
cursor.execute("""
SELECT
clinic_name,
COUNT(*) as patient_count
FROM patients
GROUP BY clinic_name
ORDER BY patient_count DESC
""")
print(f"\n各诊所患者数量:")
for clinic_name, count in cursor.fetchall():
print(f" {clinic_name}: {count} 人")
# 统计总患者数
cursor.execute("SELECT COUNT(*) FROM patients")
total_patients = cursor.fetchone()[0]
print(f"\n总患者数: {total_patients}")
# 6. 检查关键诊所的数据
print(f"\n5. 关键诊所数据检查...")
# 检查马山门诊
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM patients
WHERE clinic_name = '马山门诊'
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"\n马山门诊病历号前缀分布:")
for prefix, count in cursor.fetchall():
print(f" {prefix}: {count} 人")
# 检查红豆门诊
cursor.execute("""
SELECT
SUBSTRING(case_number, 1, 4) as prefix,
COUNT(*) as count
FROM patients
WHERE clinic_name = '红豆门诊'
GROUP BY SUBSTRING(case_number, 1, 4)
ORDER BY count DESC
""")
print(f"\n红豆门诊病历号前缀分布:")
for prefix, count in cursor.fetchall():
print(f" {prefix}: {count} 人")
cursor.close()
connection.close()
print(f"\n=== 重建完成! ===")
print(f"现在数据库中的患者数据完全基于JSON文件,没有任何多余数据。")
except Exception as e:
print(f"重建失败: {e}")
if __name__ == "__main__":
rebuild_patients_from_json()
\ No newline at end of file
......@@ -4,6 +4,7 @@ Flask-CORS==4.0.0
# 数据库
PyMySQL==1.1.0
cryptography==41.0.7
# 数据处理
pandas==2.1.4
......
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
启动Docker容器的Python脚本
"""
import subprocess
import time
import os
def run_command(command):
"""运行命令"""
print(f"执行命令: {command}")
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
print(f"✅ 命令执行成功")
return result.stdout
except subprocess.CalledProcessError as e:
print(f"❌ 命令执行失败: {e}")
print(f"错误输出: {e.stderr}")
return None
def start_mysql():
"""启动MySQL容器"""
print("=== 启动MySQL容器 ===")
# 检查网络是否存在
result = run_command("docker network ls | grep patient_callback_network")
if not result:
print("创建网络...")
run_command("docker network create patient_callback_network")
# 启动MySQL容器
mysql_cmd = (
'docker run -d --name patient_callback_mysql '
'--network patient_callback_network '
'-e MYSQL_ROOT_PASSWORD=callback_system_2024 '
'-e MYSQL_DATABASE=callback_system '
'-e MYSQL_USER=callback_user '
'-e MYSQL_PASSWORD=dev_password_123 '
'-e MYSQL_ROOT_HOST=% '
'-e MYSQL_CHARACTER_SET_SERVER=utf8mb4 '
'-e MYSQL_COLLATION_SERVER=utf8mb4_unicode_ci '
'-p 3307:3306 '
'-v mysql_data:/var/lib/mysql '
'mysql:8.0 '
'--default-authentication-plugin=mysql_native_password '
'--character-set-server=utf8mb4 '
'--collation-server=utf8mb4_unicode_ci '
'--init-connect="SET NAMES utf8mb4" '
'--skip-character-set-client-handshake'
)
result = run_command(mysql_cmd)
if result:
print("等待MySQL启动...")
time.sleep(10)
return True
return False
def start_app():
"""启动应用容器"""
print("=== 启动应用容器 ===")
app_cmd = (
'docker run -d --name patient_callback_app '
'--network patient_callback_network '
'-p 4002:5000 '
'-e DB_HOST=patient_callback_mysql '
'-e DB_PORT=3306 '
'-e DB_USER=callback_user '
'-e DB_PASSWORD=dev_password_123 '
'-e DB_NAME=callback_system '
'-e DB_CHARSET=utf8mb4 '
'patient-callback-app'
)
result = run_command(app_cmd)
if result:
print("等待应用启动...")
time.sleep(15)
return True
return False
def check_status():
"""检查容器状态"""
print("=== 检查容器状态 ===")
run_command("docker ps")
def main():
"""主函数"""
print("🚀 启动患者画像回访话术系统Docker容器")
print("=" * 50)
# 清理旧容器
print("清理旧容器...")
run_command("docker rm -f patient_callback_mysql patient_callback_app")
# 启动MySQL
if start_mysql():
print("✅ MySQL启动成功")
# 启动应用
if start_app():
print("✅ 应用启动成功")
# 检查状态
check_status()
print("\n🎉 所有容器启动完成!")
print("访问地址: http://localhost:4002")
else:
print("❌ 应用启动失败")
else:
print("❌ MySQL启动失败")
if __name__ == "__main__":
main()
\ No newline at end of file
@echo off
echo 启动MySQL容器...
docker run -d --name patient_callback_mysql --network patient_callback_network -e MYSQL_ROOT_PASSWORD=callback_system_2024 -e MYSQL_DATABASE=callback_system -e MYSQL_USER=callback_user -e MYSQL_PASSWORD=dev_password_123 -e MYSQL_ROOT_HOST=%% -e MYSQL_CHARACTER_SET_SERVER=utf8mb4 -e MYSQL_COLLATION_SERVER=utf8mb4_unicode_ci -p 3307:3306 -v mysql_data:/var/lib/mysql -v ./init.sql:/docker-entrypoint-initdb.d/init.sql:ro mysql:8.0 --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --init-connect='SET NAMES utf8mb4' --skip-character-set-client-handshake
echo MySQL容器启动完成!
pause
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试Docker中的导出功能
"""
import requests
import json
def test_docker_export():
"""测试Docker中的导出功能"""
base_url = "http://localhost:4002"
print("=== 测试Docker中的导出功能 ===")
# 1. 测试健康检查
try:
response = requests.get(f"{base_url}/api/health", timeout=10)
if response.status_code == 200:
print("✅ 应用健康检查通过")
else:
print(f"❌ 应用健康检查失败: {response.status_code}")
return
except Exception as e:
print(f"❌ 无法连接到应用: {e}")
return
# 2. 测试导出功能
try:
print("\n正在测试导出功能...")
response = requests.get(f"{base_url}/api/export-data", timeout=30) # 使用正确的端点
if response.status_code == 200:
print("✅ 导出功能正常")
# 检查响应头
content_type = response.headers.get('content-type', '')
if 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' in content_type:
print("✅ 返回的是Excel文件")
# 保存文件
filename = f"回访记录导出_Docker测试_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
with open(filename, 'wb') as f:
f.write(response.content)
print(f"✅ 文件已保存: {filename}")
else:
print(f"⚠️ 返回的内容类型: {content_type}")
print(f"响应内容: {response.text[:200]}...")
else:
print(f"❌ 导出功能失败: {response.status_code}")
print(f"响应内容: {response.text}")
except Exception as e:
print(f"❌ 导出功能测试失败: {e}")
# 3. 测试数据统计
try:
print("\n正在测试数据统计...")
response = requests.get(f"{base_url}/api/stats", timeout=10)
if response.status_code == 200:
stats = response.json()
print("✅ 数据统计正常")
print(f"统计信息: {json.dumps(stats, indent=2, ensure_ascii=False)}")
else:
print(f"❌ 数据统计失败: {response.status_code}")
except Exception as e:
print(f"❌ 数据统计测试失败: {e}")
if __name__ == "__main__":
from datetime import datetime
test_docker_export()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简单测试导出功能
"""
import requests
import json
def test_export():
"""测试导出功能"""
base_url = "http://localhost:4002"
print("=== 测试导出功能 ===")
# 1. 健康检查
try:
health_response = requests.get(f"{base_url}/api/health", timeout=10)
if health_response.status_code == 200:
print("✅ 应用健康检查通过")
else:
print(f"❌ 应用健康检查失败: {health_response.status_code}")
return
except Exception as e:
print(f"❌ 应用健康检查异常: {e}")
return
# 2. 尝试导出(应该返回401未登录)
print("\n正在测试导出功能(未登录状态)...")
try:
export_response = requests.get(f"{base_url}/api/export-data", timeout=30)
if export_response.status_code == 401:
print("✅ 导出功能正常(正确返回未登录状态)")
print("现在请通过浏览器访问 http://localhost:4002")
print("使用 admin/admin123 登录,然后测试导出功能")
else:
print(f"⚠️ 导出功能返回状态码: {export_response.status_code}")
print(f"响应内容: {export_response.text}")
except Exception as e:
print(f"❌ 导出功能测试异常: {e}")
if __name__ == "__main__":
test_export()
\ No newline at end of file
This source diff could not be displayed because it is too large. You can view the blob instead.
# 患者画像回访话术系统 - 开发对话记录
# 患者画像回访话术系统 - 开发对话记录
## 📋 项目概述
**项目名称**: 患者画像回访话术系统
**技术栈**: Flask + MySQL + Docker + HTML/JavaScript/CSS
**开发时间**: 2025年8月
**项目规模**: 4133个患者,10个门诊,15个用户
## 🎯 主要问题和解决过程
### 1. 导出功能字段数量不匹配
**问题描述**: `❌ 导出失败: 导出过程出错: not enough values to unpack (expected 8, got 7)`
**原因分析**: SQL查询返回7个字段,但代码中期望8个字段
**解决方案**:
- 调整数据解包从8个字段改为7个字段
- 修复详情页数据填充,移除第8列(AI反馈)
- 确保字段数量完全匹配
**修复后的字段结构**:
```python
# SQL查询返回7个字段
1. case_number - 病例号
2. latest_result - 回访状态
3. latest_callback_time - 回访时间
4. latest_operator - 操作员
5. methods - 回访方式
6. failure_remark - 失败原因
7. abandon_remark - 放弃原因
```
### 2. SQL语法错误修复
**问题描述**: 多个SQL语法错误
- `(1054, "Unknown column 'ai_feedback_type' in 'field list'")`
- `(1060, "Duplicate column name 'callback_methods'")`
- `(1064, "You have an error in your SQL syntax...")`
**解决方案**:
- 移除不存在的`ai_feedback_type`
- 删除重复的`callback_methods`字段
- 移除尾随逗号
**修复后的SQL查询**:
```sql
SELECT
p.case_number,
cr.callback_result as latest_result,
cr.create_time as latest_callback_time,
cr.operator as latest_operator,
cr.callback_methods as methods,
cr.failure_remark,
cr.abandon_remark
FROM patients p
INNER JOIN (
SELECT
case_number,
callback_result,
create_time,
operator,
callback_methods,
failure_remark,
abandon_remark,
ROW_NUMBER() OVER (PARTITION BY case_number ORDER BY create_time DESC) as rn
FROM callback_records
) cr ON p.case_number = cr.case_number AND cr.rn = 1
WHERE p.clinic_name = %s
ORDER BY p.case_number
```
### 3. 下载功能问题
**问题描述**: "无法从网站上提取文件"
**原因分析**:
- Flask应用在Docker容器中运行,文件保存在`/app`目录
- 下载路由没有正确访问Docker容器内的文件
- 文件路径配置有问题
**解决方案**:
- 修复下载路由中的文件路径问题
- 添加Docker容器内路径检测
- 增加调试日志
### 4. Flask应用启动问题
**问题描述**:
- `(unicode error) 'utf-8' codec can't decode bytes`
- `expected an indented block (auth_system.py, line 366)`
- `unexpected indent (auth_system.py, line 1363)`
**解决方案**:
- 修复文件编码问题
- 修复缩进错误
- 使用已知工作的版本替换问题文件
### 5. 代码恢复操作
**最终解决方案**: 恢复到git仓库的稳定版本
```bash
# 丢弃所有未提交的更改
git restore .
# 清理所有未跟踪的文件
git clean -fd
# 重新启动Docker服务
docker-compose -p patient-callback up -d
```
## 🏗️ 项目核心架构
### 后端架构 (Flask)
- **主文件**: `auth_system.py` - Flask应用主程序
- **数据库模型**: `callback_record_mysql.py` - 回访记录数据模型
- **数据库配置**: `database_config.py` - MySQL连接配置
- **用户管理**: `user_manager.py` - 用户权限管理
### 数据库结构
```sql
-- 核心表结构
clinics (门诊表)
- id, clinic_name, clinic_id
patients (患者表)
- id, case_number, clinic_name, name, phone
callback_records (回访记录表)
- id, case_number, callback_result, create_time, operator, callback_methods, failure_remark, abandon_remark
users (用户表)
- id, username, password_hash, role, clinic_id
```
### 前端架构
- **患者画像页面**: `patient_profiles/clinic_*/patients/*.html`
- **门诊首页**: `patient_profiles/clinic_*/index.html`
- **管理后台**: `dashboard.html`, `user_management.html`
- **登录页面**: `login.html`
### 核心功能模块
1. **用户认证系统** - 基于session的用户登录和权限控制
2. **患者画像管理** - 4133个患者页面的动态生成和管理
3. **回访记录系统** - 记录、查询、统计回访数据
4. **Excel导出功能** - 批量导出诊所回访数据
5. **门诊权限控制** - 不同用户只能访问指定门诊
## 🐳 Docker部署指南
### 启动完整服务
```bash
# 启动所有服务 (Flask + MySQL)
docker-compose -p patient-callback up -d
# 查看服务状态
docker-compose -p patient-callback ps
# 查看日志
docker logs patient_callback_app
```
### 单独重启服务
```bash
# 重启Flask应用
docker-compose -p patient-callback restart patient_callback_app
# 重启MySQL
docker-compose -p patient-callback restart patient_callback_mysql
```
### 停止服务
```bash
docker-compose -p patient-callback down
```
### Docker配置文件
- `docker-compose.yml` - 主配置文件
- `Dockerfile` - Flask应用镜像
- `Dockerfile.mysql` - MySQL镜像配置
## 🔧 开发环境快速操作
### 代码更新到容器
```bash
# 复制文件到容器
docker cp 文件名.py patient_callback_app:/app/
# 重启应用
docker-compose -p patient-callback restart patient_callback_app
```
### 数据库操作
```bash
# 进入MySQL容器
docker exec -it patient_callback_mysql mysql -u callback_user -p
# 执行SQL脚本
docker exec patient_callback_mysql mysql -u callback_user -p callback_system < script.sql
```
### 容器内文件操作
```bash
# 查看容器内文件
docker exec patient_callback_app ls -la /app/
# 在容器内执行Python脚本
docker exec patient_callback_app python /app/script.py
```
## 📊 项目数据规模
- **患者总数**: 4133人
- **门诊数量**: 10个
- **用户数量**: 15个
- **回访记录**: 动态增长
## 🎯 核心业务逻辑
### 患者-门诊关联
- 通过`patients`表建立关系,不再依赖病历号前缀
- 使用权威的`诊所患者json`文件夹作为数据源
- 建立真正的数据库关系映射
### 回访状态管理
- 基于最新回访记录的状态更新
- 使用`ROW_NUMBER()`窗口函数获取最新记录
- 支持成功、失败、放弃等状态
### 权限控制
- 用户只能访问指定门诊的患者数据
- 基于`clinic_id`的访问控制
- 管理员可以访问所有数据
### 数据导出
- 按门诊分组统计
- 总览页:显示回访成功率、已回访患者数
- 详情页:只显示有回访记录的患者
## 🚀 Git仓库部署步骤
### 基本操作
```bash
# 1. 添加所有更改
git add .
# 2. 提交更改
git commit -m "描述你的更改内容"
# 3. 推送到远程仓库
git push origin master
# 4. 如果需要拉取最新代码
git pull origin master
```
### 重要说明
- 项目配置了GitLab CI/CD
- 推送到master分支会自动部署
- 建议在推送前先测试本地功能
## 💡 新对话中的快速恢复
在新对话中,你可以这样开始:
```
"我正在开发患者画像回访话术系统,这是一个Flask + MySQL + Docker项目。
核心架构:
- 后端:Flask + PyMySQL
- 数据库:MySQL (4133患者,10门诊,15用户)
- 前端:HTML + JavaScript + CSS
- 部署:Docker Compose
主要功能:用户认证、患者画像、回访记录、Excel导出、门诊权限控制
当前状态:系统正常运行,刚修复了导出功能的字段数量问题。
请帮我继续开发..."
```
## 📝 开发注意事项
1. **文件编码**: 确保所有Python文件使用UTF-8编码
2. **缩进问题**: 注意Python的缩进要求,避免混合使用空格和制表符
3. **Docker路径**: 在容器内操作时,文件路径使用`/app/`
4. **数据库连接**: 使用环境变量配置数据库连接
5. **权限控制**: 确保用户只能访问指定门诊的数据
## 🔍 常见问题排查
### Flask应用启动失败
1. 检查文件编码是否为UTF-8
2. 检查Python语法和缩进
3. 查看Docker日志:`docker logs patient_callback_app`
### 数据库连接问题
1. 检查MySQL容器是否正常运行
2. 验证数据库配置信息
3. 确认网络连接正常
### 导出功能问题
1. 检查SQL查询语法
2. 验证字段数量匹配
3. 确认文件路径配置正确
---
**文档创建时间**: 2025年8月13日
**文档版本**: v1.0
**维护人员**: AI助手
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment